diff --git a/jetson/ros2_ws/src/saltybot_hand_tracking/package.xml b/jetson/ros2_ws/src/saltybot_hand_tracking/package.xml new file mode 100644 index 0000000..bae13ce --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_hand_tracking/package.xml @@ -0,0 +1,29 @@ + + + + saltybot_hand_tracking + 0.1.0 + MediaPipe-based hand tracking and robot-command gesture recognition (Issue #342). + SaltyLab + MIT + + ament_python + + rclpy + sensor_msgs + std_msgs + saltybot_hand_tracking_msgs + + python3-numpy + python3-opencv + + + ament_copyright + ament_flake8 + ament_pep257 + python3-pytest + + + ament_python + + diff --git a/jetson/ros2_ws/src/saltybot_hand_tracking/resource/saltybot_hand_tracking b/jetson/ros2_ws/src/saltybot_hand_tracking/resource/saltybot_hand_tracking new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_hand_tracking/saltybot_hand_tracking/__init__.py b/jetson/ros2_ws/src/saltybot_hand_tracking/saltybot_hand_tracking/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_hand_tracking/saltybot_hand_tracking/_hand_gestures.py b/jetson/ros2_ws/src/saltybot_hand_tracking/saltybot_hand_tracking/_hand_gestures.py new file mode 100644 index 0000000..1152a52 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_hand_tracking/saltybot_hand_tracking/_hand_gestures.py @@ -0,0 +1,332 @@ +""" +_hand_gestures.py — Robot-command gesture classification from MediaPipe +hand landmarks. No ROS2 / MediaPipe / OpenCV dependencies. + +Gesture vocabulary +------------------ + "stop" — open palm (4+ fingers extended) → pause/stop robot + "point" — index extended, others curled → direction command + "disarm" — fist (all fingers + thumb curled) → disarm/emergency-off + "confirm" — thumbs-up → confirm action + "follow_me" — victory/peace sign (index+middle up) → follow mode + "greeting" — lateral wrist oscillation (wave) → greeting response + "none" — no recognised gesture + +Coordinate convention +--------------------- +MediaPipe Hands landmark coordinates are image-normalised: + x: 0.0 = left edge, 1.0 = right edge + y: 0.0 = top edge, 1.0 = bottom edge (y increases downward) + z: depth relative to wrist; negative = toward camera + +Landmark indices (MediaPipe Hands topology) +------------------------------------------- + 0 WRIST + 1 THUMB_CMC 2 THUMB_MCP 3 THUMB_IP 4 THUMB_TIP + 5 INDEX_MCP 6 INDEX_PIP 7 INDEX_DIP 8 INDEX_TIP + 9 MIDDLE_MCP 10 MIDDLE_PIP 11 MIDDLE_DIP 12 MIDDLE_TIP + 13 RING_MCP 14 RING_PIP 15 RING_DIP 16 RING_TIP + 17 PINKY_MCP 18 PINKY_PIP 19 PINKY_DIP 20 PINKY_TIP + +Public API +---------- + Landmark dataclass(x, y, z) + HandGestureResult NamedTuple + WaveDetector sliding-window wrist-oscillation detector + classify_hand(landmarks, is_right, wave_det) → HandGestureResult +""" + +from __future__ import annotations + +import math +from collections import deque +from dataclasses import dataclass +from typing import Deque, List, NamedTuple, Optional, Tuple + + +# ── Landmark type ────────────────────────────────────────────────────────────── + +@dataclass(frozen=True) +class Landmark: + x: float + y: float + z: float = 0.0 + + +# ── Result type ──────────────────────────────────────────────────────────────── + +class HandGestureResult(NamedTuple): + gesture: str # "stop"|"point"|"disarm"|"confirm"|"follow_me"|"greeting"|"none" + confidence: float # 0.0–1.0 + direction: str # non-empty only when gesture == "point" + wrist_x: float # normalised wrist position (image coords) + wrist_y: float + + +# ── Landmark index constants ─────────────────────────────────────────────────── + +_WRIST = 0 +_THUMB_CMC = 1; _THUMB_MCP = 2; _THUMB_IP = 3; _THUMB_TIP = 4 +_INDEX_MCP = 5; _INDEX_PIP = 6; _INDEX_DIP = 7; _INDEX_TIP = 8 +_MIDDLE_MCP = 9; _MIDDLE_PIP = 10; _MIDDLE_DIP = 11; _MIDDLE_TIP = 12 +_RING_MCP = 13; _RING_PIP = 14; _RING_DIP = 15; _RING_TIP = 16 +_PINKY_MCP = 17; _PINKY_PIP = 18; _PINKY_DIP = 19; _PINKY_TIP = 20 + +_NONE = HandGestureResult("none", 0.0, "", 0.0, 0.0) + + +# ── Low-level geometry helpers ───────────────────────────────────────────────── + +def _finger_up(lm: List[Landmark], tip: int, pip: int) -> bool: + """True if the finger tip is above (smaller y) its PIP joint.""" + return lm[tip].y < lm[pip].y + + +def _finger_ext_score(lm: List[Landmark], tip: int, pip: int, mcp: int) -> float: + """Extension score in [0, 1]: how far the tip is above the MCP knuckle.""" + spread = lm[mcp].y - lm[tip].y # positive = tip above mcp + palm_h = abs(lm[_WRIST].y - lm[_MIDDLE_MCP].y) or 0.01 + return max(0.0, min(1.0, spread / palm_h)) + + +def _count_fingers_up(lm: List[Landmark]) -> int: + """Count how many of index/middle/ring/pinky are extended.""" + pairs = [ + (_INDEX_TIP, _INDEX_PIP), (_MIDDLE_TIP, _MIDDLE_PIP), + (_RING_TIP, _RING_PIP), (_PINKY_TIP, _PINKY_PIP), + ] + return sum(_finger_up(lm, t, p) for t, p in pairs) + + +def _four_fingers_curled(lm: List[Landmark]) -> bool: + """True when index, middle, ring, and pinky are all curled (not extended).""" + pairs = [ + (_INDEX_TIP, _INDEX_PIP), (_MIDDLE_TIP, _MIDDLE_PIP), + (_RING_TIP, _RING_PIP), (_PINKY_TIP, _PINKY_PIP), + ] + return not any(_finger_up(lm, t, p) for t, p in pairs) + + +def _thumb_curled(lm: List[Landmark]) -> bool: + """True when the thumb tip is below (same or lower y than) the thumb MCP.""" + return lm[_THUMB_TIP].y >= lm[_THUMB_MCP].y + + +def _thumb_extended_up(lm: List[Landmark]) -> bool: + """True when the thumb tip is clearly above the thumb CMC base.""" + return lm[_THUMB_TIP].y < lm[_THUMB_CMC].y - 0.02 + + +def _palm_center(lm: List[Landmark]) -> Tuple[float, float]: + """(x, y) centroid of the four MCP knuckles.""" + xs = [lm[i].x for i in (_INDEX_MCP, _MIDDLE_MCP, _RING_MCP, _PINKY_MCP)] + ys = [lm[i].y for i in (_INDEX_MCP, _MIDDLE_MCP, _RING_MCP, _PINKY_MCP)] + return sum(xs) / 4, sum(ys) / 4 + + +# ── Pointing direction ───────────────────────────────────────────────────────── + +def _point_direction(lm: List[Landmark]) -> str: + """8-compass pointing direction from index MCP → TIP vector.""" + dx = lm[_INDEX_TIP].x - lm[_INDEX_MCP].x + dy = lm[_INDEX_TIP].y - lm[_INDEX_MCP].y # +y = downward in image + angle = math.degrees(math.atan2(-dy, dx)) # flip y so up = +90° + if -22.5 <= angle < 22.5: return "right" + elif 22.5 <= angle < 67.5: return "upper_right" + elif 67.5 <= angle < 112.5: return "up" + elif 112.5 <= angle < 157.5: return "upper_left" + elif angle >= 157.5 or angle < -157.5: return "left" + elif -157.5 <= angle < -112.5: return "lower_left" + elif -112.5 <= angle < -67.5: return "down" + else: return "lower_right" + + +# ── Static gesture classifiers ───────────────────────────────────────────────── + +def _classify_stop(lm: List[Landmark]) -> Optional[HandGestureResult]: + """Open palm: 4 or more fingers extended upward.""" + n = _count_fingers_up(lm) + if n < 4: + return None + scores = [ + _finger_ext_score(lm, _INDEX_TIP, _INDEX_PIP, _INDEX_MCP), + _finger_ext_score(lm, _MIDDLE_TIP, _MIDDLE_PIP, _MIDDLE_MCP), + _finger_ext_score(lm, _RING_TIP, _RING_PIP, _RING_MCP), + _finger_ext_score(lm, _PINKY_TIP, _PINKY_PIP, _PINKY_MCP), + ] + conf = round(0.60 + 0.35 * (sum(scores) / len(scores)), 3) + return HandGestureResult("stop", conf, "", lm[_WRIST].x, lm[_WRIST].y) + + +def _classify_point(lm: List[Landmark]) -> Optional[HandGestureResult]: + """Index extended upward; middle/ring/pinky curled.""" + if not _finger_up(lm, _INDEX_TIP, _INDEX_PIP): + return None + others_up = sum([ + _finger_up(lm, _MIDDLE_TIP, _MIDDLE_PIP), + _finger_up(lm, _RING_TIP, _RING_PIP), + _finger_up(lm, _PINKY_TIP, _PINKY_PIP), + ]) + if others_up >= 1: + return None + ext = _finger_ext_score(lm, _INDEX_TIP, _INDEX_PIP, _INDEX_MCP) + conf = round(0.65 + 0.30 * ext, 3) + direction = _point_direction(lm) + return HandGestureResult("point", conf, direction, lm[_WRIST].x, lm[_WRIST].y) + + +def _classify_disarm(lm: List[Landmark]) -> Optional[HandGestureResult]: + """Fist: all four fingers curled AND thumb tucked (tip at or below MCP).""" + if not _four_fingers_curled(lm): + return None + if not _thumb_curled(lm): + return None + # Extra confidence: fingertips close to palm = deep fist + palm_h = abs(lm[_WRIST].y - lm[_MIDDLE_MCP].y) or 0.01 + curl_depth = sum( + max(0.0, lm[t].y - lm[p].y) + for t, p in ( + (_INDEX_TIP, _INDEX_MCP), (_MIDDLE_TIP, _MIDDLE_MCP), + (_RING_TIP, _RING_MCP), (_PINKY_TIP, _PINKY_MCP), + ) + ) / 4 + conf = round(min(0.95, 0.60 + 0.35 * min(1.0, curl_depth / (palm_h * 0.5))), 3) + return HandGestureResult("disarm", conf, "", lm[_WRIST].x, lm[_WRIST].y) + + +def _classify_confirm(lm: List[Landmark]) -> Optional[HandGestureResult]: + """Thumbs-up: thumb extended upward, four fingers curled.""" + if not _thumb_extended_up(lm): + return None + if not _four_fingers_curled(lm): + return None + palm_h = abs(lm[_WRIST].y - lm[_MIDDLE_MCP].y) or 0.01 + gap = lm[_THUMB_CMC].y - lm[_THUMB_TIP].y + conf = round(min(0.95, 0.60 + 0.35 * min(1.0, gap / palm_h)), 3) + return HandGestureResult("confirm", conf, "", lm[_WRIST].x, lm[_WRIST].y) + + +def _classify_follow_me(lm: List[Landmark]) -> Optional[HandGestureResult]: + """Peace/victory sign: index and middle extended; ring and pinky curled.""" + if not _finger_up(lm, _INDEX_TIP, _INDEX_PIP): + return None + if not _finger_up(lm, _MIDDLE_TIP, _MIDDLE_PIP): + return None + if _finger_up(lm, _RING_TIP, _RING_PIP): + return None + if _finger_up(lm, _PINKY_TIP, _PINKY_PIP): + return None + idx_ext = _finger_ext_score(lm, _INDEX_TIP, _INDEX_PIP, _INDEX_MCP) + mid_ext = _finger_ext_score(lm, _MIDDLE_TIP, _MIDDLE_PIP, _MIDDLE_MCP) + conf = round(0.60 + 0.35 * ((idx_ext + mid_ext) / 2), 3) + return HandGestureResult("follow_me", conf, "", lm[_WRIST].x, lm[_WRIST].y) + + +# ── Priority order (first match wins) ───────────────────────────────────────── + +_CLASSIFIERS = [ + _classify_stop, + _classify_confirm, # before point — thumbs-up would also pass point partially + _classify_follow_me, # before point — index+middle would partially match + _classify_point, + _classify_disarm, +] + + +# ── Wave (temporal) detector ─────────────────────────────────────────────────── + +class WaveDetector: + """Sliding-window wave gesture detector. + + Tracks the wrist X-coordinate over time and fires when there are at least + `min_reversals` direction reversals with peak-to-peak amplitude ≥ + `min_amplitude` (normalised image coords). + + Args: + history_len : number of frames to keep (default 24 ≈ 0.8 s at 30 fps) + min_reversals : direction reversals required to trigger (default 2) + min_amplitude : peak-to-peak x excursion threshold (default 0.08) + """ + + def __init__( + self, + history_len: int = 24, + min_reversals: int = 2, + min_amplitude: float = 0.08, + ) -> None: + self._history: Deque[float] = deque(maxlen=history_len) + self._min_reversals = min_reversals + self._min_amplitude = min_amplitude + + def push(self, wrist_x: float) -> Tuple[bool, float]: + """Add a wrist-X sample. Returns (is_waving, confidence).""" + self._history.append(wrist_x) + if len(self._history) < 6: + return False, 0.0 + return self._detect() + + def reset(self) -> None: + self._history.clear() + + def _detect(self) -> Tuple[bool, float]: + samples = list(self._history) + mean_x = sum(samples) / len(samples) + centered = [x - mean_x for x in samples] + amplitude = max(centered) - min(centered) + if amplitude < self._min_amplitude: + return False, 0.0 + + reversals = sum( + 1 for i in range(1, len(centered)) + if centered[i - 1] * centered[i] < 0 + ) + if reversals < self._min_reversals: + return False, 0.0 + + amp_score = min(1.0, amplitude / 0.30) + rev_score = min(1.0, reversals / 6.0) + conf = round(0.5 * amp_score + 0.5 * rev_score, 3) + return True, conf + + +# ── Public API ───────────────────────────────────────────────────────────────── + +def classify_hand( + landmarks: List[Landmark], + is_right: bool = True, + wave_det: Optional[WaveDetector] = None, +) -> HandGestureResult: + """Classify one hand's 21 MediaPipe landmarks into a robot-command gesture. + + Parameters + ---------- + landmarks : 21 Landmark objects (MediaPipe normalised coords). + is_right : True for right hand (affects thumb direction checks). + wave_det : Optional WaveDetector for temporal wave tracking. + Wave is evaluated before static classifiers. + + Returns + ------- + HandGestureResult — gesture, confidence, direction, wrist_x, wrist_y. + """ + if len(landmarks) < 21: + return _NONE + + # Wave (temporal) — highest priority + if wave_det is not None: + is_waving, wconf = wave_det.push(landmarks[_WRIST].x) + if is_waving: + return HandGestureResult( + "greeting", wconf, "", + landmarks[_WRIST].x, landmarks[_WRIST].y, + ) + + # Static classifiers in priority order + for clf in _CLASSIFIERS: + result = clf(landmarks) + if result is not None: + return result + + return HandGestureResult( + "none", 0.0, "", landmarks[_WRIST].x, landmarks[_WRIST].y + ) diff --git a/jetson/ros2_ws/src/saltybot_hand_tracking/saltybot_hand_tracking/hand_tracking_node.py b/jetson/ros2_ws/src/saltybot_hand_tracking/saltybot_hand_tracking/hand_tracking_node.py new file mode 100644 index 0000000..b505106 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_hand_tracking/saltybot_hand_tracking/hand_tracking_node.py @@ -0,0 +1,305 @@ +""" +hand_tracking_node.py — MediaPipe Hands inference node (Issue #342). + +Subscribes +---------- + /camera/color/image_raw (sensor_msgs/Image) + +Publishes +--------- + /saltybot/hands (saltybot_hand_tracking_msgs/HandLandmarksArray) + /saltybot/hand_gesture (std_msgs/String) + +Parameters +---------- + max_hands int 2 Maximum hands detected per frame + model_complexity int 0 MediaPipe model: 0=lite, 1=full (0 for 20+ FPS) + min_detection_conf float 0.60 MediaPipe detection confidence threshold + min_tracking_conf float 0.50 MediaPipe tracking confidence threshold + gesture_min_conf float 0.60 Minimum gesture confidence to publish on hand_gesture + wave_history_len int 24 Frames kept in WaveDetector history + wave_min_reversals int 2 Oscillation reversals needed for wave + wave_min_amplitude float 0.08 Peak-to-peak wrist-x amplitude for wave + +Performance note +---------------- + model_complexity=0 (lite) on Orin Nano Super (1024-core Ampere, 67 TOPS) + targets >20 FPS at 640×480. Drop to 480×360 in the camera launch if needed. +""" + +from __future__ import annotations + +import threading +import time +from typing import Dict, List, Optional, Tuple + +import numpy as np + +import rclpy +from rclpy.node import Node +from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy + +from sensor_msgs.msg import Image +from std_msgs.msg import String + +from saltybot_hand_tracking_msgs.msg import HandLandmarks, HandLandmarksArray + +from ._hand_gestures import Landmark, WaveDetector, classify_hand + +# Optional runtime imports — guarded so unit tests don't need them +try: + import cv2 + _HAS_CV = True +except ImportError: + _HAS_CV = False + +try: + import mediapipe as mp + _HAS_MP = True +except ImportError: + _HAS_MP = False + + +# ── MediaPipe wrapper ────────────────────────────────────────────────────────── + +class _MPHands: + """Thin wrapper around mediapipe.solutions.hands with lazy init.""" + + def __init__(self, max_hands: int, complexity: int, + det_conf: float, trk_conf: float) -> None: + self._max_hands = max_hands + self._complexity = complexity + self._det_conf = det_conf + self._trk_conf = trk_conf + self._hands = None + + def init(self) -> None: + if not _HAS_MP: + return + self._hands = mp.solutions.hands.Hands( + static_image_mode=False, + max_num_hands=self._max_hands, + min_detection_confidence=self._det_conf, + min_tracking_confidence=self._trk_conf, + model_complexity=self._complexity, + ) + + def process(self, bgr: np.ndarray): + """Process a BGR image; returns mediapipe Hands results or None.""" + if self._hands is None or not _HAS_MP or not _HAS_CV: + return None + rgb = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB) + rgb.flags.writeable = False + try: + return self._hands.process(rgb) + except Exception: + return None + + def close(self) -> None: + if self._hands is not None: + self._hands.close() + + +# ── ROS2 Node ───────────────────────────────────────────────────────────────── + +class HandTrackingNode(Node): + def __init__(self) -> None: + super().__init__('hand_tracking_node') + + # ── Parameters ────────────────────────────────────────────────────── + self.declare_parameter('max_hands', 2) + self.declare_parameter('model_complexity', 0) + self.declare_parameter('min_detection_conf', 0.60) + self.declare_parameter('min_tracking_conf', 0.50) + self.declare_parameter('gesture_min_conf', 0.60) + self.declare_parameter('wave_history_len', 24) + self.declare_parameter('wave_min_reversals', 2) + self.declare_parameter('wave_min_amplitude', 0.08) + + p = self.get_parameter + self._gesture_min_conf: float = p('gesture_min_conf').value + + # ── QoS ───────────────────────────────────────────────────────────── + img_qos = QoSProfile( + reliability=QoSReliabilityPolicy.BEST_EFFORT, + history=QoSHistoryPolicy.KEEP_LAST, + depth=1, + ) + + # ── Publishers ─────────────────────────────────────────────────────── + self._hands_pub = self.create_publisher( + HandLandmarksArray, '/saltybot/hands', 10 + ) + self._gesture_pub = self.create_publisher( + String, '/saltybot/hand_gesture', 10 + ) + + # ── Subscriber ─────────────────────────────────────────────────────── + self._sub = self.create_subscription( + Image, '/camera/color/image_raw', + self._image_cb, img_qos, + ) + + # ── Per-hand WaveDetectors keyed by hand index ──────────────────── + self._wave_dets: Dict[int, WaveDetector] = {} + wave_hist = p('wave_history_len').value + wave_rev = p('wave_min_reversals').value + wave_amp = p('wave_min_amplitude').value + self._wave_kwargs = dict( + history_len=wave_hist, + min_reversals=wave_rev, + min_amplitude=wave_amp, + ) + + # ── MediaPipe (background init) ─────────────────────────────────── + self._mp = _MPHands( + max_hands = p('max_hands').value, + complexity = p('model_complexity').value, + det_conf = p('min_detection_conf').value, + trk_conf = p('min_tracking_conf').value, + ) + self._mp_ready = threading.Event() + threading.Thread(target=self._init_mp, daemon=True).start() + + self.get_logger().info( + f'hand_tracking_node ready — ' + f'max_hands={p("max_hands").value}, ' + f'complexity={p("model_complexity").value}' + ) + + # ── Init ────────────────────────────────────────────────────────────────── + + def _init_mp(self) -> None: + if not _HAS_MP: + self.get_logger().warn( + 'mediapipe not installed — no hand tracking. ' + 'Install: pip install mediapipe' + ) + return + t0 = time.time() + self._mp.init() + self._mp_ready.set() + self.get_logger().info( + f'MediaPipe Hands ready ({time.time() - t0:.1f}s)' + ) + + # ── Image callback ──────────────────────────────────────────────────────── + + def _image_cb(self, msg: Image) -> None: + if not self._mp_ready.is_set(): + return + + bgr = self._ros_to_bgr(msg) + if bgr is None: + return + + results = self._mp.process(bgr) + + arr = HandLandmarksArray() + arr.header = msg.header + arr.hand_count = 0 + best_gesture = "" + best_conf = 0.0 + + if results and results.multi_hand_landmarks: + for hand_idx, (hand_lm, hand_info) in enumerate( + zip(results.multi_hand_landmarks, results.multi_handedness) + ): + cls = hand_info.classification[0] + is_right = cls.label == "Right" + hs_score = float(cls.score) + + lm = [Landmark(p.x, p.y, p.z) for p in hand_lm.landmark] + + wave_det = self._wave_dets.get(hand_idx) + if wave_det is None: + wave_det = WaveDetector(**self._wave_kwargs) + self._wave_dets[hand_idx] = wave_det + + gesture_result = classify_hand(lm, is_right=is_right, wave_det=wave_det) + + # Build HandLandmarks message + hl = HandLandmarks() + hl.header = msg.header + hl.is_right_hand = is_right + hl.handedness_score = hs_score + hl.landmark_xyz = self._pack_landmarks(lm) + hl.gesture = gesture_result.gesture + hl.point_direction = gesture_result.direction + hl.gesture_confidence = float(gesture_result.confidence) + hl.wrist_x = float(lm[0].x) + hl.wrist_y = float(lm[0].y) + + arr.hands.append(hl) + arr.hand_count += 1 + + if gesture_result.confidence > best_conf: + best_conf = gesture_result.confidence + best_gesture = gesture_result.gesture + + self._hands_pub.publish(arr) + + # Publish hand_gesture String only when confident enough + if best_gesture and best_gesture != "none" \ + and best_conf >= self._gesture_min_conf: + gs = String() + gs.data = best_gesture + self._gesture_pub.publish(gs) + + # ── Helpers ─────────────────────────────────────────────────────────────── + + @staticmethod + def _pack_landmarks(lm: List[Landmark]) -> List[float]: + """Pack 21 Landmark objects into a flat [x0,y0,z0, ..., x20,y20,z20] list.""" + out: List[float] = [] + for l in lm: + out.extend([l.x, l.y, l.z]) + return out + + @staticmethod + def _ros_to_bgr(msg: Image) -> Optional[np.ndarray]: + """Convert sensor_msgs/Image to uint8 BGR numpy array.""" + enc = msg.encoding.lower() + data = np.frombuffer(msg.data, dtype=np.uint8) + if enc == 'bgr8': + try: + return data.reshape((msg.height, msg.width, 3)) + except ValueError: + return None + if enc == 'rgb8': + try: + img = data.reshape((msg.height, msg.width, 3)) + return cv2.cvtColor(img, cv2.COLOR_RGB2BGR) if _HAS_CV else None + except ValueError: + return None + if enc == 'mono8': + try: + img = data.reshape((msg.height, msg.width)) + return cv2.cvtColor(img, cv2.COLOR_GRAY2BGR) if _HAS_CV else None + except ValueError: + return None + return None + + # ── Cleanup ─────────────────────────────────────────────────────────────── + + def destroy_node(self) -> None: + self._mp.close() + super().destroy_node() + + +# ── Entry point ─────────────────────────────────────────────────────────────── + +def main(args=None) -> None: + rclpy.init(args=args) + node = HandTrackingNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.try_shutdown() + + +if __name__ == '__main__': + main() diff --git a/jetson/ros2_ws/src/saltybot_hand_tracking/setup.cfg b/jetson/ros2_ws/src/saltybot_hand_tracking/setup.cfg new file mode 100644 index 0000000..170e6c5 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_hand_tracking/setup.cfg @@ -0,0 +1,4 @@ +[develop] +script_dir=$base/lib/saltybot_hand_tracking +[install] +install_scripts=$base/lib/saltybot_hand_tracking diff --git a/jetson/ros2_ws/src/saltybot_hand_tracking/setup.py b/jetson/ros2_ws/src/saltybot_hand_tracking/setup.py new file mode 100644 index 0000000..3e27177 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_hand_tracking/setup.py @@ -0,0 +1,33 @@ +from setuptools import setup +import os +from glob import glob + +package_name = 'saltybot_hand_tracking' + +setup( + name=package_name, + version='0.1.0', + packages=[package_name], + data_files=[ + ('share/ament_index/resource_index/packages', + ['resource/' + package_name]), + ('share/' + package_name, ['package.xml']), + (os.path.join('share', package_name, 'launch'), + glob('launch/*.launch.py')), + (os.path.join('share', package_name, 'config'), + glob('config/*.yaml')), + ], + install_requires=['setuptools'], + zip_safe=True, + maintainer='sl-perception', + maintainer_email='sl-perception@saltylab.local', + description='MediaPipe hand tracking node for SaltyBot (Issue #342)', + license='MIT', + tests_require=['pytest'], + entry_points={ + 'console_scripts': [ + # MediaPipe Hands inference + gesture classification (Issue #342) + 'hand_tracking = saltybot_hand_tracking.hand_tracking_node:main', + ], + }, +) diff --git a/jetson/ros2_ws/src/saltybot_hand_tracking/test/test_hand_gestures.py b/jetson/ros2_ws/src/saltybot_hand_tracking/test/test_hand_gestures.py new file mode 100644 index 0000000..a9a2494 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_hand_tracking/test/test_hand_gestures.py @@ -0,0 +1,407 @@ +""" +test_hand_gestures.py — pytest tests for _hand_gestures.py (no ROS2 required). + +Tests cover: + - Landmark dataclass + - HandGestureResult NamedTuple fields + - WaveDetector — no wave / wave trigger / reset + - _finger_up helpers + - classify_hand: + stop (open palm) + point (index up, others curled) + direction + disarm (fist) + confirm (thumbs-up) + follow_me (peace/victory) + greeting (wave via WaveDetector) + none (neutral / ambiguous pose) + - classify_hand priority ordering + - classify_hand: fewer than 21 landmarks → "none" +""" + +from __future__ import annotations + +import math +from typing import List + +import pytest + +from saltybot_hand_tracking._hand_gestures import ( + Landmark, + HandGestureResult, + WaveDetector, + classify_hand, + _finger_up, + _count_fingers_up, + _four_fingers_curled, + _thumb_curled, + _thumb_extended_up, + _point_direction, +) + + +# ── Landmark factory helpers ────────────────────────────────────────────────── + +def _lm(x: float, y: float, z: float = 0.0) -> Landmark: + return Landmark(x, y, z) + + +def _flat_hand(n: int = 21) -> List[Landmark]: + """Return n landmarks all at (0.5, 0.5).""" + return [_lm(0.5, 0.5) for _ in range(n)] + + +def _make_hand( + wrist_y: float = 0.8, + # Index finger + idx_mcp_y: float = 0.6, idx_pip_y: float = 0.5, + idx_tip_y: float = 0.3, idx_mcp_x: float = 0.4, idx_tip_x: float = 0.4, + # Middle finger + mid_mcp_y: float = 0.6, mid_pip_y: float = 0.5, mid_tip_y: float = 0.4, + # Ring finger + rng_mcp_y: float = 0.6, rng_pip_y: float = 0.5, rng_tip_y: float = 0.55, + # Pinky + pnk_mcp_y: float = 0.6, pnk_pip_y: float = 0.5, pnk_tip_y: float = 0.55, + # Thumb + thm_cmc_y: float = 0.7, thm_mcp_y: float = 0.65, thm_tip_y: float = 0.55, +) -> List[Landmark]: + """ + Build a 21-landmark array for testing. + + Layout (MediaPipe Hands indices): + 0 WRIST + 1 THUMB_CMC 2 THUMB_MCP 3 THUMB_IP 4 THUMB_TIP + 5 INDEX_MCP 6 INDEX_PIP 7 INDEX_DIP 8 INDEX_TIP + 9 MIDDLE_MCP 10 MIDDLE_PIP 11 MIDDLE_DIP 12 MIDDLE_TIP + 13 RING_MCP 14 RING_PIP 15 RING_DIP 16 RING_TIP + 17 PINKY_MCP 18 PINKY_PIP 19 PINKY_DIP 20 PINKY_TIP + """ + lm = [_lm(0.5, 0.5)] * 21 + # WRIST + lm[0] = _lm(0.5, wrist_y) + # THUMB + lm[1] = _lm(0.35, thm_cmc_y) # CMC + lm[2] = _lm(0.33, thm_mcp_y) # MCP + lm[3] = _lm(0.31, (thm_mcp_y + thm_tip_y) / 2) # IP + lm[4] = _lm(0.30, thm_tip_y) # TIP + # INDEX + lm[5] = _lm(idx_mcp_x, idx_mcp_y) # MCP + lm[6] = _lm(idx_mcp_x, idx_pip_y) # PIP + lm[7] = _lm(idx_mcp_x, (idx_pip_y + idx_tip_y) / 2) # DIP + lm[8] = _lm(idx_tip_x, idx_tip_y) # TIP + # MIDDLE + lm[9] = _lm(0.5, mid_mcp_y) # MCP + lm[10] = _lm(0.5, mid_pip_y) # PIP + lm[11] = _lm(0.5, (mid_pip_y + mid_tip_y) / 2) + lm[12] = _lm(0.5, mid_tip_y) # TIP + # RING + lm[13] = _lm(0.6, rng_mcp_y) # MCP + lm[14] = _lm(0.6, rng_pip_y) # PIP + lm[15] = _lm(0.6, (rng_pip_y + rng_tip_y) / 2) + lm[16] = _lm(0.6, rng_tip_y) # TIP + # PINKY + lm[17] = _lm(0.65, pnk_mcp_y) # MCP + lm[18] = _lm(0.65, pnk_pip_y) # PIP + lm[19] = _lm(0.65, (pnk_pip_y + pnk_tip_y) / 2) + lm[20] = _lm(0.65, pnk_tip_y) # TIP + return lm + + +# ── Prebuilt canonical poses ────────────────────────────────────────────────── + +def _open_palm() -> List[Landmark]: + """All 4 fingers extended (tips clearly above PIPs), thumb neutral.""" + return _make_hand( + idx_mcp_y=0.60, idx_pip_y=0.50, idx_tip_y=0.25, + mid_mcp_y=0.60, mid_pip_y=0.50, mid_tip_y=0.25, + rng_mcp_y=0.60, rng_pip_y=0.50, rng_tip_y=0.25, + pnk_mcp_y=0.60, pnk_pip_y=0.50, pnk_tip_y=0.25, + ) + + +def _point_up() -> List[Landmark]: + """Index extended, middle/ring/pinky curled.""" + return _make_hand( + idx_mcp_y=0.60, idx_pip_y=0.50, idx_tip_y=0.25, + mid_mcp_y=0.60, mid_pip_y=0.55, mid_tip_y=0.62, # curled + rng_mcp_y=0.60, rng_pip_y=0.55, rng_tip_y=0.62, + pnk_mcp_y=0.60, pnk_pip_y=0.55, pnk_tip_y=0.62, + ) + + +def _fist() -> List[Landmark]: + """All fingers curled, thumb tip at/below thumb MCP.""" + return _make_hand( + idx_mcp_y=0.60, idx_pip_y=0.58, idx_tip_y=0.65, # tip below pip + mid_mcp_y=0.60, mid_pip_y=0.58, mid_tip_y=0.65, + rng_mcp_y=0.60, rng_pip_y=0.58, rng_tip_y=0.65, + pnk_mcp_y=0.60, pnk_pip_y=0.58, pnk_tip_y=0.65, + thm_cmc_y=0.70, thm_mcp_y=0.65, thm_tip_y=0.68, # tip >= mcp → curled + ) + + +def _thumbs_up() -> List[Landmark]: + """Thumb tip clearly above CMC, four fingers curled.""" + return _make_hand( + thm_cmc_y=0.70, thm_mcp_y=0.65, thm_tip_y=0.30, # tip well above CMC + idx_mcp_y=0.60, idx_pip_y=0.58, idx_tip_y=0.65, + mid_mcp_y=0.60, mid_pip_y=0.58, mid_tip_y=0.65, + rng_mcp_y=0.60, rng_pip_y=0.58, rng_tip_y=0.65, + pnk_mcp_y=0.60, pnk_pip_y=0.58, pnk_tip_y=0.65, + ) + + +def _peace() -> List[Landmark]: + """Index + middle extended, ring + pinky curled.""" + return _make_hand( + idx_mcp_y=0.60, idx_pip_y=0.50, idx_tip_y=0.25, + mid_mcp_y=0.60, mid_pip_y=0.50, mid_tip_y=0.25, + rng_mcp_y=0.60, rng_pip_y=0.58, rng_tip_y=0.65, # curled + pnk_mcp_y=0.60, pnk_pip_y=0.58, pnk_tip_y=0.65, + ) + + +# ── Landmark dataclass ──────────────────────────────────────────────────────── + +class TestLandmark: + def test_fields(self): + lm = Landmark(0.1, 0.2, 0.3) + assert lm.x == pytest.approx(0.1) + assert lm.y == pytest.approx(0.2) + assert lm.z == pytest.approx(0.3) + + def test_default_z(self): + lm = Landmark(0.5, 0.5) + assert lm.z == 0.0 + + def test_frozen(self): + lm = Landmark(0.5, 0.5) + with pytest.raises(Exception): + lm.x = 0.9 # type: ignore[misc] + + +# ── HandGestureResult ───────────────────────────────────────────────────────── + +class TestHandGestureResult: + def test_fields(self): + r = HandGestureResult("stop", 0.85, "", 0.5, 0.6) + assert r.gesture == "stop" + assert r.confidence == pytest.approx(0.85) + assert r.direction == "" + assert r.wrist_x == pytest.approx(0.5) + assert r.wrist_y == pytest.approx(0.6) + + +# ── Low-level helpers ───────────────────────────────────────────────────────── + +class TestFingerHelpers: + def _two_lm(self, tip_y: float, pip_y: float) -> List[Landmark]: + """Build a minimal 21-lm list where positions 0 and 1 are tip and pip.""" + lm = [_lm(0.5, 0.5)] * 21 + lm[0] = _lm(0.5, tip_y) + lm[1] = _lm(0.5, pip_y) + return lm + + def test_finger_up_true(self): + """Tip above PIP (smaller y) → True.""" + lm = self._two_lm(tip_y=0.3, pip_y=0.6) + assert _finger_up(lm, 0, 1) is True + + def test_finger_up_false(self): + """Tip below PIP → False.""" + lm = self._two_lm(tip_y=0.7, pip_y=0.4) + assert _finger_up(lm, 0, 1) is False + + def test_count_fingers_up_open_palm(self): + lm = _open_palm() + assert _count_fingers_up(lm) == 4 + + def test_count_fingers_up_fist(self): + lm = _fist() + assert _count_fingers_up(lm) == 0 + + def test_four_fingers_curled_fist(self): + lm = _fist() + assert _four_fingers_curled(lm) is True + + def test_four_fingers_curled_open_palm_false(self): + lm = _open_palm() + assert _four_fingers_curled(lm) is False + + def test_thumb_curled_fist(self): + lm = _fist() + assert _thumb_curled(lm) is True + + def test_thumb_extended_up_thumbs_up(self): + lm = _thumbs_up() + assert _thumb_extended_up(lm) is True + + def test_thumb_not_extended_fist(self): + lm = _fist() + assert _thumb_extended_up(lm) is False + + +# ── Point direction ─────────────────────────────────────────────────────────── + +class TestPointDirection: + def _hand_with_index_vec(self, dx: float, dy: float) -> List[Landmark]: + """Build hand where index MCP→TIP vector is (dx, dy).""" + lm = _make_hand() + lm[5] = _lm(0.5, 0.6) # INDEX_MCP + lm[8] = _lm(0.5 + dx, 0.6 + dy) # INDEX_TIP + return lm + + def test_pointing_up(self): + # dy negative (tip above MCP) → up + lm = self._hand_with_index_vec(0.0, -0.2) + assert _point_direction(lm) == "up" + + def test_pointing_right(self): + lm = self._hand_with_index_vec(0.2, 0.0) + assert _point_direction(lm) == "right" + + def test_pointing_left(self): + lm = self._hand_with_index_vec(-0.2, 0.0) + assert _point_direction(lm) == "left" + + def test_pointing_upper_right(self): + lm = self._hand_with_index_vec(0.15, -0.15) + assert _point_direction(lm) == "upper_right" + + +# ── WaveDetector ────────────────────────────────────────────────────────────── + +class TestWaveDetector: + def test_too_few_samples_no_wave(self): + wd = WaveDetector() + for x in [0.3, 0.7, 0.3]: + is_w, conf = wd.push(x) + assert is_w is False + assert conf == 0.0 + + def test_wave_detected_after_oscillation(self): + """Feed a sinusoidal wrist_x — should trigger wave.""" + wd = WaveDetector(history_len=20, min_reversals=2, min_amplitude=0.08) + is_waving = False + for i in range(20): + x = 0.5 + 0.20 * math.sin(i * math.pi / 3) + is_waving, conf = wd.push(x) + assert is_waving is True + assert conf > 0.0 + + def test_no_wave_small_amplitude(self): + """Very small oscillation should not trigger.""" + wd = WaveDetector(min_amplitude=0.10) + for i in range(24): + x = 0.5 + 0.01 * math.sin(i * math.pi / 3) + wd.push(x) + is_w, _ = wd.push(0.5) + assert is_w is False + + def test_reset_clears_history(self): + wd = WaveDetector() + for i in range(24): + wd.push(0.5 + 0.2 * math.sin(i * math.pi / 3)) + wd.reset() + is_w, _ = wd.push(0.5) + assert is_w is False + + +# ── classify_hand ───────────────────────────────────────────────────────────── + +class TestClassifyHand: + def test_too_few_landmarks(self): + r = classify_hand([_lm(0.5, 0.5)] * 10) + assert r.gesture == "none" + + def test_stop_open_palm(self): + lm = _open_palm() + r = classify_hand(lm) + assert r.gesture == "stop" + assert r.confidence >= 0.60 + + def test_point_up_gesture(self): + lm = _point_up() + r = classify_hand(lm) + assert r.gesture == "point" + assert r.confidence >= 0.60 + + def test_point_direction_populated(self): + lm = _point_up() + r = classify_hand(lm) + assert r.direction != "" + + def test_disarm_fist(self): + lm = _fist() + r = classify_hand(lm) + assert r.gesture == "disarm" + assert r.confidence >= 0.60 + + def test_confirm_thumbs_up(self): + lm = _thumbs_up() + r = classify_hand(lm) + assert r.gesture == "confirm" + assert r.confidence >= 0.60 + + def test_follow_me_peace(self): + lm = _peace() + r = classify_hand(lm) + assert r.gesture == "follow_me" + assert r.confidence >= 0.60 + + def test_greeting_wave(self): + """Wave via WaveDetector should produce greeting gesture.""" + wd = WaveDetector(history_len=20, min_reversals=2, min_amplitude=0.08) + lm = _open_palm() + # Simulate 20 frames with oscillating wrist_x via different landmark sets + r = HandGestureResult("none", 0.0, "", 0.5, 0.5) + for i in range(20): + # Rebuild landmark set with moving wrist x + moving = list(lm) + wx = 0.5 + 0.20 * math.sin(i * math.pi / 3) + # WRIST is index 0 — move it + moving[0] = Landmark(wx, moving[0].y) + r = classify_hand(moving, is_right=True, wave_det=wd) + assert r.gesture == "greeting" + + def test_flat_hand_no_crash(self): + """A flat hand (all landmarks at 0.5, 0.5) has ambiguous geometry — + verify it returns a valid gesture string without crashing.""" + _valid = {"stop", "point", "disarm", "confirm", "follow_me", "greeting", "none"} + r = classify_hand(_flat_hand()) + assert r.gesture in _valid + + def test_wrist_position_in_result(self): + lm = _open_palm() + lm[0] = Landmark(0.3, 0.7) + r = classify_hand(lm) + assert r.wrist_x == pytest.approx(0.3) + assert r.wrist_y == pytest.approx(0.7) + + def test_confirm_before_stop(self): + """Thumbs-up should be classified as 'confirm', not 'stop'.""" + lm = _thumbs_up() + r = classify_hand(lm) + assert r.gesture == "confirm" + + def test_follow_me_before_point(self): + """Peace sign (2 fingers) should NOT be classified as 'point'.""" + lm = _peace() + r = classify_hand(lm) + assert r.gesture == "follow_me" + + def test_wave_beats_static_gesture(self): + """When wave is detected it should override any static gesture.""" + wd = WaveDetector(history_len=20, min_reversals=2, min_amplitude=0.08) + # Pre-load enough waving frames + for i in range(20): + wx = 0.5 + 0.25 * math.sin(i * math.pi / 3) + lm = _open_palm() + lm[0] = Landmark(wx, lm[0].y) + r = classify_hand(lm, wave_det=wd) + # The open palm would normally be "stop" but wave has already triggered + assert r.gesture == "greeting" + + def test_result_confidence_bounded(self): + for lm_factory in [_open_palm, _point_up, _fist, _thumbs_up, _peace]: + r = classify_hand(lm_factory()) + assert 0.0 <= r.confidence <= 1.0 diff --git a/jetson/ros2_ws/src/saltybot_hand_tracking_msgs/CMakeLists.txt b/jetson/ros2_ws/src/saltybot_hand_tracking_msgs/CMakeLists.txt new file mode 100644 index 0000000..1c562ed --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_hand_tracking_msgs/CMakeLists.txt @@ -0,0 +1,16 @@ +cmake_minimum_required(VERSION 3.8) +project(saltybot_hand_tracking_msgs) + +find_package(ament_cmake REQUIRED) +find_package(rosidl_default_generators REQUIRED) +find_package(std_msgs REQUIRED) + +rosidl_generate_interfaces(${PROJECT_NAME} + # Issue #342 — hand tracking / MediaPipe pivot + "msg/HandLandmarks.msg" + "msg/HandLandmarksArray.msg" + DEPENDENCIES std_msgs +) + +ament_export_dependencies(rosidl_default_runtime) +ament_package() diff --git a/jetson/ros2_ws/src/saltybot_hand_tracking_msgs/msg/HandLandmarks.msg b/jetson/ros2_ws/src/saltybot_hand_tracking_msgs/msg/HandLandmarks.msg new file mode 100644 index 0000000..fa02fde --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_hand_tracking_msgs/msg/HandLandmarks.msg @@ -0,0 +1,29 @@ +# HandLandmarks.msg — MediaPipe Hands result for one detected hand (Issue #342) +# +# Landmark coordinates are MediaPipe-normalised: +# x, y ∈ [0.0, 1.0] — fraction of image width/height +# z — depth relative to wrist (negative = towards camera) +# +# landmark_xyz layout: [x0, y0, z0, x1, y1, z1, ..., x20, y20, z20] +# Index order follows MediaPipe Hands topology: +# 0=WRIST 1-4=THUMB(CMC,MCP,IP,TIP) 5-8=INDEX 9-12=MIDDLE +# 13-16=RING 17-20=PINKY + +std_msgs/Header header + +# Handedness +bool is_right_hand +float32 handedness_score # MediaPipe confidence for Left/Right label + +# 21 landmarks × 3 (x, y, z) = 63 values +float32[63] landmark_xyz + +# Classified robot-command gesture +# Values: "stop" | "point" | "disarm" | "confirm" | "follow_me" | "greeting" | "none" +string gesture +string point_direction # "up"|"right"|"left"|"upper_right"|"upper_left"|"lower_right"|"lower_left"|"down" +float32 gesture_confidence + +# Wrist position in normalised image coords (convenience shortcut) +float32 wrist_x +float32 wrist_y diff --git a/jetson/ros2_ws/src/saltybot_hand_tracking_msgs/msg/HandLandmarksArray.msg b/jetson/ros2_ws/src/saltybot_hand_tracking_msgs/msg/HandLandmarksArray.msg new file mode 100644 index 0000000..d0adc67 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_hand_tracking_msgs/msg/HandLandmarksArray.msg @@ -0,0 +1,5 @@ +# HandLandmarksArray.msg — All detected hands in one camera frame (Issue #342) + +std_msgs/Header header +HandLandmarks[] hands +uint32 hand_count diff --git a/jetson/ros2_ws/src/saltybot_hand_tracking_msgs/package.xml b/jetson/ros2_ws/src/saltybot_hand_tracking_msgs/package.xml new file mode 100644 index 0000000..6b6fafc --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_hand_tracking_msgs/package.xml @@ -0,0 +1,21 @@ + + + + saltybot_hand_tracking_msgs + 0.1.0 + Message types for MediaPipe hand tracking (Issue #342). + SaltyLab + MIT + + ament_cmake + rosidl_default_generators + + std_msgs + + rosidl_default_runtime + rosidl_interface_packages + + + ament_cmake + +