Merge pull request 'feat(social): hand gesture pointing direction node (Issue #221)' (#226) from sl-perception/issue-221-pointing into main
Some checks failed
Some checks failed
This commit is contained in:
commit
213d7fe13d
@ -0,0 +1,180 @@
|
|||||||
|
"""
|
||||||
|
_pointing_ray.py — 3-D pointing ray computation from hand landmarks + depth (no ROS2 deps).
|
||||||
|
|
||||||
|
Given MediaPipe Hands 21-landmark output, a depth image (float32, metres),
|
||||||
|
and camera intrinsic parameters, computes:
|
||||||
|
- 3-D position of the INDEX_MCP (knuckle) — ray origin
|
||||||
|
- 3-D direction (unit vector) INDEX_MCP → INDEX_TIP
|
||||||
|
- Estimated pointing target (origin + direction * range)
|
||||||
|
|
||||||
|
Coordinate frame: camera_color_optical_frame (standard ROS optical frame,
|
||||||
|
+X right, +Y down, +Z into the scene).
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import math
|
||||||
|
from typing import List, Optional, Tuple
|
||||||
|
|
||||||
|
|
||||||
|
# MediaPipe Hands landmark indices (matches gesture_classifier.py)
|
||||||
|
_WRIST = 0
|
||||||
|
_INDEX_MCP = 5
|
||||||
|
_INDEX_TIP = 8
|
||||||
|
|
||||||
|
|
||||||
|
def unproject(
|
||||||
|
u: float, v: float, depth_m: float,
|
||||||
|
fx: float, fy: float, cx: float, cy: float,
|
||||||
|
) -> Tuple[float, float, float]:
|
||||||
|
"""
|
||||||
|
Back-project pixel (u, v) at depth depth_m to a 3-D point in the camera frame.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
u, v : pixel coordinates (float, origin top-left)
|
||||||
|
depth_m : depth in metres
|
||||||
|
fx, fy : focal lengths (pixels)
|
||||||
|
cx, cy : principal point (pixels)
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
(X, Y, Z) in metres, camera_color_optical_frame
|
||||||
|
"""
|
||||||
|
X = (u - cx) * depth_m / fx
|
||||||
|
Y = (v - cy) * depth_m / fy
|
||||||
|
Z = depth_m
|
||||||
|
return X, Y, Z
|
||||||
|
|
||||||
|
|
||||||
|
def sample_depth(
|
||||||
|
depth_img,
|
||||||
|
u: float,
|
||||||
|
v: float,
|
||||||
|
window: int = 5,
|
||||||
|
d_min: float = 0.1,
|
||||||
|
d_max: float = 8.0,
|
||||||
|
) -> float:
|
||||||
|
"""
|
||||||
|
Return the median of valid depth values in a (window × window) patch centred at (u, v).
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
depth_img : np.ndarray (H, W) float32, depth in metres
|
||||||
|
u, v : centre pixel (float)
|
||||||
|
window : patch side length (pixels)
|
||||||
|
d_min : minimum valid depth (m)
|
||||||
|
d_max : maximum valid depth (m)
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
float : median depth, or NaN if no valid samples in the patch
|
||||||
|
"""
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
h, w = depth_img.shape[:2]
|
||||||
|
half = window // 2
|
||||||
|
u0 = max(0, int(round(u)) - half)
|
||||||
|
u1 = min(w, int(round(u)) + half + 1)
|
||||||
|
v0 = max(0, int(round(v)) - half)
|
||||||
|
v1 = min(h, int(round(v)) + half + 1)
|
||||||
|
|
||||||
|
patch = depth_img[v0:v1, u0:u1]
|
||||||
|
valid = patch[(patch > d_min) & (patch < d_max)]
|
||||||
|
if valid.size == 0:
|
||||||
|
return float('nan')
|
||||||
|
return float(np.median(valid))
|
||||||
|
|
||||||
|
|
||||||
|
def compute_pointing_ray(
|
||||||
|
landmarks: List[Tuple[float, float, float]],
|
||||||
|
depth_img,
|
||||||
|
fx: float,
|
||||||
|
fy: float,
|
||||||
|
cx: float,
|
||||||
|
cy: float,
|
||||||
|
img_w: int,
|
||||||
|
img_h: int,
|
||||||
|
ref_distance: float = 2.0,
|
||||||
|
depth_window: int = 5,
|
||||||
|
) -> Optional[dict]:
|
||||||
|
"""
|
||||||
|
Compute a 3-D pointing ray from MediaPipe Hands landmarks and a depth image.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
landmarks : list of 21 (nx, ny, nz) normalised MediaPipe landmarks
|
||||||
|
(nx ∈ [0,1] left→right, ny ∈ [0,1] top→bottom)
|
||||||
|
depth_img : np.ndarray (H, W) float32, depth in metres
|
||||||
|
fx, fy : focal lengths (pixels)
|
||||||
|
cx, cy : principal point (pixels)
|
||||||
|
img_w, img_h : image dimensions (pixels)
|
||||||
|
ref_distance : fallback range when depth is unavailable (m)
|
||||||
|
depth_window : window size for depth median sampling (pixels)
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
dict with keys:
|
||||||
|
origin_3d : (X, Y, Z) INDEX_MCP in camera frame (m)
|
||||||
|
direction_3d : (dx, dy, dz) unit vector MCP → TIP
|
||||||
|
target_3d : (X, Y, Z) estimated pointing target (m)
|
||||||
|
range_m : distance from origin to target (m)
|
||||||
|
mcp_uv : (u, v) pixel coords of INDEX_MCP
|
||||||
|
tip_uv : (u, v) pixel coords of INDEX_TIP
|
||||||
|
or None if the ray cannot be computed (both depths invalid).
|
||||||
|
"""
|
||||||
|
mcp_lm = landmarks[_INDEX_MCP]
|
||||||
|
tip_lm = landmarks[_INDEX_TIP]
|
||||||
|
|
||||||
|
mcp_u, mcp_v = mcp_lm[0] * img_w, mcp_lm[1] * img_h
|
||||||
|
tip_u, tip_v = tip_lm[0] * img_w, tip_lm[1] * img_h
|
||||||
|
|
||||||
|
d_mcp = sample_depth(depth_img, mcp_u, mcp_v, depth_window)
|
||||||
|
d_tip = sample_depth(depth_img, tip_u, tip_v, depth_window)
|
||||||
|
|
||||||
|
# Need at least one valid depth
|
||||||
|
if math.isnan(d_mcp) and math.isnan(d_tip):
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Fill missing depth with the other measurement
|
||||||
|
if math.isnan(d_mcp):
|
||||||
|
d_mcp = d_tip
|
||||||
|
if math.isnan(d_tip):
|
||||||
|
d_tip = d_mcp
|
||||||
|
|
||||||
|
mcp_3d = unproject(mcp_u, mcp_v, d_mcp, fx, fy, cx, cy)
|
||||||
|
tip_3d = unproject(tip_u, tip_v, d_tip, fx, fy, cx, cy)
|
||||||
|
|
||||||
|
# Ray direction: MCP → TIP
|
||||||
|
dx = tip_3d[0] - mcp_3d[0]
|
||||||
|
dy = tip_3d[1] - mcp_3d[1]
|
||||||
|
dz = tip_3d[2] - mcp_3d[2]
|
||||||
|
length = math.sqrt(dx * dx + dy * dy + dz * dz)
|
||||||
|
|
||||||
|
if length < 1e-4:
|
||||||
|
# Degenerate: MCP and TIP project to the same 3-D point
|
||||||
|
# Fall back to the image-plane direction extended at ref_distance
|
||||||
|
px = (tip_u - mcp_u) / (img_w or 1)
|
||||||
|
py = (tip_v - mcp_v) / (img_h or 1)
|
||||||
|
dx, dy, dz = px, py, 1.0
|
||||||
|
length = math.sqrt(dx * dx + dy * dy + dz * dz) or 1.0
|
||||||
|
|
||||||
|
dx /= length
|
||||||
|
dy /= length
|
||||||
|
dz /= length
|
||||||
|
|
||||||
|
range_m = ref_distance
|
||||||
|
target_3d = (
|
||||||
|
mcp_3d[0] + dx * range_m,
|
||||||
|
mcp_3d[1] + dy * range_m,
|
||||||
|
mcp_3d[2] + dz * range_m,
|
||||||
|
)
|
||||||
|
|
||||||
|
return {
|
||||||
|
'origin_3d': mcp_3d,
|
||||||
|
'direction_3d': (dx, dy, dz),
|
||||||
|
'target_3d': target_3d,
|
||||||
|
'range_m': range_m,
|
||||||
|
'mcp_uv': (mcp_u, mcp_v),
|
||||||
|
'tip_uv': (tip_u, tip_v),
|
||||||
|
}
|
||||||
@ -0,0 +1,264 @@
|
|||||||
|
"""
|
||||||
|
pointing_node.py — Hand gesture pointing direction (Issue #221).
|
||||||
|
|
||||||
|
Converts a 'point' gesture into a precise 3-D ray using MediaPipe Hands
|
||||||
|
re-run on the D435i colour stream and the registered depth image.
|
||||||
|
|
||||||
|
Subscribes:
|
||||||
|
/social/gestures saltybot_social_msgs/GestureArray
|
||||||
|
/camera/color/image_raw sensor_msgs/Image (D435i colour)
|
||||||
|
/camera/depth/image_rect_raw sensor_msgs/Image (D435i depth, float32 m)
|
||||||
|
/camera/color/camera_info sensor_msgs/CameraInfo
|
||||||
|
|
||||||
|
Publishes:
|
||||||
|
/saltybot/pointing_target saltybot_social_msgs/PointingTarget (5 Hz)
|
||||||
|
|
||||||
|
Algorithm
|
||||||
|
---------
|
||||||
|
1. Cache the most recent 'point' Gesture from /social/gestures.
|
||||||
|
2. Synchronise colour + depth frames (ApproximateTimeSynchronizer, 50 ms slop).
|
||||||
|
3. When a 'point' gesture was received within `gesture_timeout_s`, run
|
||||||
|
MediaPipe Hands on the colour frame.
|
||||||
|
4. Locate the hand closest to the gesture anchor (hand_x, hand_y).
|
||||||
|
5. Call compute_pointing_ray(): unproject INDEX_MCP and INDEX_TIP using depth,
|
||||||
|
form the unit direction vector, extend to `ref_distance_m`.
|
||||||
|
6. A 5 Hz timer publishes the latest PointingTarget (is_active=false if stale).
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
ref_distance_m float 2.0 Default range when depth is valid (m)
|
||||||
|
gesture_timeout_s float 1.0 Max age of a cached 'point' gesture (s)
|
||||||
|
min_confidence float 0.50 MediaPipe Hands min detection confidence
|
||||||
|
model_complexity int 0 MediaPipe model complexity (0=lite)
|
||||||
|
publish_hz float 5.0 Output publication rate (Hz)
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import math
|
||||||
|
import time
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy
|
||||||
|
|
||||||
|
import message_filters
|
||||||
|
import numpy as np
|
||||||
|
from cv_bridge import CvBridge
|
||||||
|
|
||||||
|
from sensor_msgs.msg import Image, CameraInfo
|
||||||
|
from saltybot_social_msgs.msg import GestureArray, PointingTarget
|
||||||
|
|
||||||
|
from ._pointing_ray import compute_pointing_ray
|
||||||
|
|
||||||
|
# Optional MediaPipe — absent in CI
|
||||||
|
try:
|
||||||
|
import cv2
|
||||||
|
import mediapipe as mp
|
||||||
|
_HAS_MP = True
|
||||||
|
except ImportError:
|
||||||
|
_HAS_MP = False
|
||||||
|
|
||||||
|
|
||||||
|
_SENSOR_QOS = QoSProfile(
|
||||||
|
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||||
|
history=HistoryPolicy.KEEP_LAST,
|
||||||
|
depth=4,
|
||||||
|
)
|
||||||
|
_LATCHED_QOS = QoSProfile(
|
||||||
|
reliability=ReliabilityPolicy.RELIABLE,
|
||||||
|
history=HistoryPolicy.KEEP_LAST,
|
||||||
|
depth=1,
|
||||||
|
durability=DurabilityPolicy.TRANSIENT_LOCAL,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class PointingNode(Node):
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__('pointing_node')
|
||||||
|
|
||||||
|
self.declare_parameter('ref_distance_m', 2.0)
|
||||||
|
self.declare_parameter('gesture_timeout_s', 1.0)
|
||||||
|
self.declare_parameter('min_confidence', 0.50)
|
||||||
|
self.declare_parameter('model_complexity', 0)
|
||||||
|
self.declare_parameter('publish_hz', 5.0)
|
||||||
|
|
||||||
|
self._ref_dist = self.get_parameter('ref_distance_m').value
|
||||||
|
self._gesture_tmo = self.get_parameter('gesture_timeout_s').value
|
||||||
|
self._min_conf = self.get_parameter('min_confidence').value
|
||||||
|
self._complexity = self.get_parameter('model_complexity').value
|
||||||
|
publish_hz = self.get_parameter('publish_hz').value
|
||||||
|
|
||||||
|
self._bridge = CvBridge()
|
||||||
|
self._camera_info: Optional[CameraInfo] = None
|
||||||
|
|
||||||
|
# Cached state
|
||||||
|
self._last_gesture = None # most recent Gesture with type='point'
|
||||||
|
self._last_gesture_t = 0.0 # time.monotonic() stamp
|
||||||
|
self._last_result: Optional[dict] = None
|
||||||
|
|
||||||
|
# MediaPipe Hands (lazy init)
|
||||||
|
self._hands = None
|
||||||
|
if _HAS_MP:
|
||||||
|
self._hands = mp.solutions.hands.Hands(
|
||||||
|
static_image_mode=False,
|
||||||
|
max_num_hands=2,
|
||||||
|
min_detection_confidence=self._min_conf,
|
||||||
|
min_tracking_confidence=0.5,
|
||||||
|
model_complexity=self._complexity,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Subscribers
|
||||||
|
self.create_subscription(
|
||||||
|
GestureArray, '/social/gestures', self._on_gestures,
|
||||||
|
QoSProfile(depth=10),
|
||||||
|
)
|
||||||
|
self.create_subscription(
|
||||||
|
CameraInfo, '/camera/color/camera_info', self._on_camera_info,
|
||||||
|
_LATCHED_QOS,
|
||||||
|
)
|
||||||
|
|
||||||
|
color_sub = message_filters.Subscriber(
|
||||||
|
self, Image, '/camera/color/image_raw', qos_profile=_SENSOR_QOS)
|
||||||
|
depth_sub = message_filters.Subscriber(
|
||||||
|
self, Image, '/camera/depth/image_rect_raw', qos_profile=_SENSOR_QOS)
|
||||||
|
self._sync = message_filters.ApproximateTimeSynchronizer(
|
||||||
|
[color_sub, depth_sub], queue_size=4, slop=0.05)
|
||||||
|
self._sync.registerCallback(self._on_frame)
|
||||||
|
|
||||||
|
# Publisher + timer
|
||||||
|
self._pub = self.create_publisher(PointingTarget, '/saltybot/pointing_target', 10)
|
||||||
|
self.create_timer(1.0 / publish_hz, self._tick)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f'pointing_node ready — mediapipe={"on" if _HAS_MP else "off"} '
|
||||||
|
f'ref={self._ref_dist}m timeout={self._gesture_tmo}s'
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Callbacks ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _on_gestures(self, msg: GestureArray) -> None:
|
||||||
|
for g in msg.gestures:
|
||||||
|
if g.gesture_type == 'point':
|
||||||
|
self._last_gesture = g
|
||||||
|
self._last_gesture_t = time.monotonic()
|
||||||
|
break # first point gesture wins
|
||||||
|
|
||||||
|
def _on_camera_info(self, msg: CameraInfo) -> None:
|
||||||
|
self._camera_info = msg
|
||||||
|
|
||||||
|
def _on_frame(self, color_msg: Image, depth_msg: Image) -> None:
|
||||||
|
if self._camera_info is None:
|
||||||
|
return
|
||||||
|
age = time.monotonic() - self._last_gesture_t
|
||||||
|
if age > self._gesture_tmo:
|
||||||
|
return # no recent point gesture — skip expensive MediaPipe call
|
||||||
|
|
||||||
|
try:
|
||||||
|
bgr = self._bridge.imgmsg_to_cv2(color_msg, 'bgr8')
|
||||||
|
depth = self._bridge.imgmsg_to_cv2(depth_msg, 'passthrough').astype(np.float32)
|
||||||
|
except Exception as exc:
|
||||||
|
self.get_logger().error(f'cv_bridge: {exc}', throttle_duration_sec=5.0)
|
||||||
|
return
|
||||||
|
|
||||||
|
if depth.max() > 100.0: # uint16 mm → float32 m
|
||||||
|
depth /= 1000.0
|
||||||
|
|
||||||
|
lms = self._run_mediapipe(bgr)
|
||||||
|
if lms is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Select hand closest to gesture anchor
|
||||||
|
g = self._last_gesture
|
||||||
|
best_lm = self._pick_hand(lms, g.hand_x, g.hand_y)
|
||||||
|
if best_lm is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
h, w = bgr.shape[:2]
|
||||||
|
K = np.array(self._camera_info.k).reshape(3, 3)
|
||||||
|
fx, fy, cx, cy = K[0, 0], K[1, 1], K[0, 2], K[1, 2]
|
||||||
|
|
||||||
|
result = compute_pointing_ray(
|
||||||
|
landmarks = best_lm,
|
||||||
|
depth_img = depth,
|
||||||
|
fx=fx, fy=fy, cx=cx, cy=cy,
|
||||||
|
img_w=w, img_h=h,
|
||||||
|
ref_distance = self._ref_dist,
|
||||||
|
)
|
||||||
|
if result is not None:
|
||||||
|
self._last_result = result
|
||||||
|
|
||||||
|
# ── MediaPipe helpers ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _run_mediapipe(self, bgr: np.ndarray):
|
||||||
|
"""Run MediaPipe Hands on a BGR frame; returns list of landmark lists or None."""
|
||||||
|
if self._hands is None:
|
||||||
|
return None
|
||||||
|
import cv2
|
||||||
|
rgb = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
|
||||||
|
rgb.flags.writeable = False
|
||||||
|
results = self._hands.process(rgb)
|
||||||
|
if not results.multi_hand_landmarks:
|
||||||
|
return None
|
||||||
|
return [
|
||||||
|
[(lm.x, lm.y, lm.z) for lm in hand.landmark]
|
||||||
|
for hand in results.multi_hand_landmarks
|
||||||
|
]
|
||||||
|
|
||||||
|
def _pick_hand(self, landmark_sets, anchor_x: float, anchor_y: float):
|
||||||
|
"""Return the landmark set whose INDEX_TIP is closest to (anchor_x, anchor_y)."""
|
||||||
|
best, best_dist = None, float('inf')
|
||||||
|
for lms in landmark_sets:
|
||||||
|
tip = lms[8] # INDEX_TIP = 8
|
||||||
|
dist = math.hypot(tip[0] - anchor_x, tip[1] - anchor_y)
|
||||||
|
if dist < best_dist:
|
||||||
|
best, best_dist = lms, dist
|
||||||
|
return best
|
||||||
|
|
||||||
|
# ── 5 Hz publish tick ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _tick(self) -> None:
|
||||||
|
msg = PointingTarget()
|
||||||
|
msg.header.stamp = self.get_clock().now().to_msg()
|
||||||
|
msg.header.frame_id = 'camera_color_optical_frame'
|
||||||
|
|
||||||
|
age = time.monotonic() - self._last_gesture_t
|
||||||
|
msg.is_active = age <= self._gesture_tmo
|
||||||
|
|
||||||
|
if self._last_gesture is not None:
|
||||||
|
msg.person_id = self._last_gesture.person_id
|
||||||
|
msg.confidence = self._last_gesture.confidence
|
||||||
|
msg.coarse_direction = self._last_gesture.direction
|
||||||
|
|
||||||
|
if msg.is_active and self._last_result is not None:
|
||||||
|
ox, oy, oz = self._last_result['origin_3d']
|
||||||
|
dx, dy, dz = self._last_result['direction_3d']
|
||||||
|
tx, ty, tz = self._last_result['target_3d']
|
||||||
|
|
||||||
|
msg.origin.x = ox; msg.origin.y = oy; msg.origin.z = oz
|
||||||
|
msg.direction.x = dx; msg.direction.y = dy; msg.direction.z = dz
|
||||||
|
msg.target.x = tx; msg.target.y = ty; msg.target.z = tz
|
||||||
|
msg.range_m = self._last_result['range_m']
|
||||||
|
|
||||||
|
self._pub.publish(msg)
|
||||||
|
|
||||||
|
def destroy_node(self) -> None:
|
||||||
|
if self._hands:
|
||||||
|
self._hands.close()
|
||||||
|
super().destroy_node()
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = PointingNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
@ -37,6 +37,8 @@ setup(
|
|||||||
'voice_command_node = saltybot_social.voice_command_node:main',
|
'voice_command_node = saltybot_social.voice_command_node:main',
|
||||||
# Multi-camera gesture recognition (Issue #140)
|
# Multi-camera gesture recognition (Issue #140)
|
||||||
'gesture_node = saltybot_social.gesture_node:main',
|
'gesture_node = saltybot_social.gesture_node:main',
|
||||||
|
# Hand gesture pointing direction (Issue #221)
|
||||||
|
'pointing_node = saltybot_social.pointing_node:main',
|
||||||
# Facial expression recognition (Issue #161)
|
# Facial expression recognition (Issue #161)
|
||||||
'emotion_node = saltybot_social.emotion_node:main',
|
'emotion_node = saltybot_social.emotion_node:main',
|
||||||
# Robot mesh communication (Issue #171)
|
# Robot mesh communication (Issue #171)
|
||||||
|
|||||||
193
jetson/ros2_ws/src/saltybot_social/test/test_pointing.py
Normal file
193
jetson/ros2_ws/src/saltybot_social/test/test_pointing.py
Normal file
@ -0,0 +1,193 @@
|
|||||||
|
"""
|
||||||
|
test_pointing.py — Unit tests for pointing ray helpers (no ROS2 required).
|
||||||
|
|
||||||
|
Covers:
|
||||||
|
- unproject: pixel + depth → 3-D point
|
||||||
|
- sample_depth: median depth sampling with outlier rejection
|
||||||
|
- compute_pointing_ray: full 3-D ray computation
|
||||||
|
"""
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
import math
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
||||||
|
|
||||||
|
from saltybot_social._pointing_ray import unproject, sample_depth, compute_pointing_ray
|
||||||
|
|
||||||
|
|
||||||
|
# ── unproject ─────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestUnproject:
|
||||||
|
|
||||||
|
def test_principal_point_gives_zero_xy(self):
|
||||||
|
X, Y, Z = unproject(320.0, 240.0, 1.0, fx=500.0, fy=500.0, cx=320.0, cy=240.0)
|
||||||
|
assert X == pytest.approx(0.0)
|
||||||
|
assert Y == pytest.approx(0.0)
|
||||||
|
assert Z == pytest.approx(1.0)
|
||||||
|
|
||||||
|
def test_z_equals_depth(self):
|
||||||
|
_, _, Z = unproject(0.0, 0.0, 3.5, fx=600.0, fy=600.0, cx=320.0, cy=240.0)
|
||||||
|
assert Z == pytest.approx(3.5)
|
||||||
|
|
||||||
|
def test_right_of_principal_gives_positive_x(self):
|
||||||
|
X, _, _ = unproject(420.0, 240.0, 1.0, fx=500.0, fy=500.0, cx=320.0, cy=240.0)
|
||||||
|
assert X > 0.0
|
||||||
|
|
||||||
|
def test_below_principal_gives_positive_y(self):
|
||||||
|
_, Y, _ = unproject(320.0, 340.0, 1.0, fx=500.0, fy=500.0, cx=320.0, cy=240.0)
|
||||||
|
assert Y > 0.0
|
||||||
|
|
||||||
|
def test_x_scales_linearly_with_depth(self):
|
||||||
|
X1, _, _ = unproject(400.0, 240.0, 1.0, fx=500.0, fy=500.0, cx=320.0, cy=240.0)
|
||||||
|
X2, _, _ = unproject(400.0, 240.0, 2.0, fx=500.0, fy=500.0, cx=320.0, cy=240.0)
|
||||||
|
assert X2 == pytest.approx(X1 * 2.0)
|
||||||
|
|
||||||
|
def test_known_value(self):
|
||||||
|
# (u=370, v=290, d=2.0, fx=fy=500, cx=320, cy=240)
|
||||||
|
# X = (370-320)*2/500 = 0.2, Y = (290-240)*2/500 = 0.2, Z = 2.0
|
||||||
|
X, Y, Z = unproject(370.0, 290.0, 2.0, fx=500.0, fy=500.0, cx=320.0, cy=240.0)
|
||||||
|
assert X == pytest.approx(0.2)
|
||||||
|
assert Y == pytest.approx(0.2)
|
||||||
|
assert Z == pytest.approx(2.0)
|
||||||
|
|
||||||
|
|
||||||
|
# ── sample_depth ──────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestSampleDepth:
|
||||||
|
|
||||||
|
def _uniform_depth(self, h=480, w=640, val=2.0):
|
||||||
|
return np.full((h, w), val, dtype=np.float32)
|
||||||
|
|
||||||
|
def test_uniform_image_returns_value(self):
|
||||||
|
img = self._uniform_depth(val=1.5)
|
||||||
|
d = sample_depth(img, 320.0, 240.0)
|
||||||
|
assert d == pytest.approx(1.5)
|
||||||
|
|
||||||
|
def test_out_of_range_returns_nan(self):
|
||||||
|
img = np.zeros((480, 640), dtype=np.float32) # all zeros (< d_min=0.1)
|
||||||
|
d = sample_depth(img, 320.0, 240.0)
|
||||||
|
assert math.isnan(d)
|
||||||
|
|
||||||
|
def test_ignores_outliers_above_d_max(self):
|
||||||
|
img = self._uniform_depth(val=1.0)
|
||||||
|
# Put a large spike at the centre
|
||||||
|
img[240, 320] = 100.0
|
||||||
|
d = sample_depth(img, 320.0, 240.0, window=3, d_max=8.0)
|
||||||
|
assert d == pytest.approx(1.0)
|
||||||
|
|
||||||
|
def test_edge_pixel_does_not_crash(self):
|
||||||
|
img = self._uniform_depth(val=2.0)
|
||||||
|
d = sample_depth(img, 0.0, 0.0)
|
||||||
|
assert d == pytest.approx(2.0)
|
||||||
|
|
||||||
|
def test_median_of_mixed_values(self):
|
||||||
|
img = np.zeros((10, 10), dtype=np.float32)
|
||||||
|
img[5, 5] = 1.0
|
||||||
|
img[5, 6] = 2.0
|
||||||
|
img[6, 5] = 3.0
|
||||||
|
d = sample_depth(img, 5.0, 5.0, window=3, d_min=0.0)
|
||||||
|
# Valid: 1.0, 2.0, 3.0 — median = 2.0
|
||||||
|
assert d == pytest.approx(2.0)
|
||||||
|
|
||||||
|
|
||||||
|
# ── compute_pointing_ray ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _make_landmarks(mcp_nx=0.5, mcp_ny=0.5, tip_nx=0.6, tip_ny=0.4):
|
||||||
|
"""21 dummy landmarks; only MCP (idx 5) and TIP (idx 8) matter."""
|
||||||
|
lms = [(0.5, 0.5, 0.0)] * 21
|
||||||
|
lms[5] = (mcp_nx, mcp_ny, 0.0) # INDEX_MCP
|
||||||
|
lms[8] = (tip_nx, tip_ny, 0.0) # INDEX_TIP
|
||||||
|
return lms
|
||||||
|
|
||||||
|
|
||||||
|
def _depth_img(val=2.0, h=480, w=640):
|
||||||
|
return np.full((h, w), val, dtype=np.float32)
|
||||||
|
|
||||||
|
|
||||||
|
_FX = 600.0; _FY = 600.0; _CX = 320.0; _CY = 240.0
|
||||||
|
_W = 640; _H = 480
|
||||||
|
|
||||||
|
|
||||||
|
class TestComputePointingRay:
|
||||||
|
|
||||||
|
def test_returns_dict_on_valid_input(self):
|
||||||
|
lms = _make_landmarks()
|
||||||
|
result = compute_pointing_ray(
|
||||||
|
lms, _depth_img(), _FX, _FY, _CX, _CY, _W, _H)
|
||||||
|
assert result is not None
|
||||||
|
assert 'origin_3d' in result
|
||||||
|
assert 'direction_3d' in result
|
||||||
|
assert 'target_3d' in result
|
||||||
|
assert 'range_m' in result
|
||||||
|
|
||||||
|
def test_returns_none_when_depth_all_zero(self):
|
||||||
|
lms = _make_landmarks()
|
||||||
|
result = compute_pointing_ray(
|
||||||
|
lms, np.zeros((_H, _W), dtype=np.float32),
|
||||||
|
_FX, _FY, _CX, _CY, _W, _H)
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
def test_direction_is_unit_vector(self):
|
||||||
|
lms = _make_landmarks(mcp_nx=0.4, mcp_ny=0.5, tip_nx=0.6, tip_ny=0.4)
|
||||||
|
result = compute_pointing_ray(
|
||||||
|
lms, _depth_img(), _FX, _FY, _CX, _CY, _W, _H)
|
||||||
|
assert result is not None
|
||||||
|
dx, dy, dz = result['direction_3d']
|
||||||
|
norm = math.sqrt(dx*dx + dy*dy + dz*dz)
|
||||||
|
assert norm == pytest.approx(1.0, abs=1e-5)
|
||||||
|
|
||||||
|
def test_target_is_origin_plus_range_times_direction(self):
|
||||||
|
lms = _make_landmarks()
|
||||||
|
result = compute_pointing_ray(
|
||||||
|
lms, _depth_img(val=2.0), _FX, _FY, _CX, _CY, _W, _H,
|
||||||
|
ref_distance=3.0)
|
||||||
|
ox, oy, oz = result['origin_3d']
|
||||||
|
dx, dy, dz = result['direction_3d']
|
||||||
|
tx, ty, tz = result['target_3d']
|
||||||
|
r = result['range_m']
|
||||||
|
assert tx == pytest.approx(ox + dx * r, abs=1e-5)
|
||||||
|
assert ty == pytest.approx(oy + dy * r, abs=1e-5)
|
||||||
|
assert tz == pytest.approx(oz + dz * r, abs=1e-5)
|
||||||
|
|
||||||
|
def test_mcp_uv_matches_landmark_projection(self):
|
||||||
|
lms = _make_landmarks(mcp_nx=0.5, mcp_ny=0.5)
|
||||||
|
result = compute_pointing_ray(
|
||||||
|
lms, _depth_img(), _FX, _FY, _CX, _CY, _W, _H)
|
||||||
|
mu, mv = result['mcp_uv']
|
||||||
|
assert mu == pytest.approx(0.5 * _W)
|
||||||
|
assert mv == pytest.approx(0.5 * _H)
|
||||||
|
|
||||||
|
def test_fallback_when_only_one_depth_valid(self):
|
||||||
|
"""Only the MCP pixel has valid depth; TIP is in an invalid region."""
|
||||||
|
depth = np.zeros((_H, _W), dtype=np.float32)
|
||||||
|
# Set valid depth only at the MCP location
|
||||||
|
mcp_u, mcp_v = int(0.5 * _W), int(0.5 * _H)
|
||||||
|
for du in range(-3, 4):
|
||||||
|
for dv in range(-3, 4):
|
||||||
|
u_ = max(0, min(_W - 1, mcp_u + du))
|
||||||
|
v_ = max(0, min(_H - 1, mcp_v + dv))
|
||||||
|
depth[v_, u_] = 2.0
|
||||||
|
|
||||||
|
lms = _make_landmarks(mcp_nx=0.5, mcp_ny=0.5, tip_nx=0.9, tip_ny=0.1)
|
||||||
|
result = compute_pointing_ray(
|
||||||
|
lms, depth, _FX, _FY, _CX, _CY, _W, _H)
|
||||||
|
# Should still return a result (using MCP depth for TIP fallback)
|
||||||
|
assert result is not None
|
||||||
|
dx, dy, dz = result['direction_3d']
|
||||||
|
norm = math.sqrt(dx*dx + dy*dy + dz*dz)
|
||||||
|
assert norm == pytest.approx(1.0, abs=1e-4)
|
||||||
|
|
||||||
|
def test_ref_distance_used_as_range(self):
|
||||||
|
lms = _make_landmarks()
|
||||||
|
result = compute_pointing_ray(
|
||||||
|
lms, _depth_img(), _FX, _FY, _CX, _CY, _W, _H,
|
||||||
|
ref_distance=5.0)
|
||||||
|
assert result['range_m'] == pytest.approx(5.0)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
pytest.main([__file__, '-v'])
|
||||||
@ -44,6 +44,8 @@ rosidl_generate_interfaces(${PROJECT_NAME}
|
|||||||
# Issue #171 — robot mesh comms
|
# Issue #171 — robot mesh comms
|
||||||
"msg/MeshPeer.msg"
|
"msg/MeshPeer.msg"
|
||||||
"msg/MeshHandoff.msg"
|
"msg/MeshHandoff.msg"
|
||||||
|
# Issue #221 — hand gesture pointing direction
|
||||||
|
"msg/PointingTarget.msg"
|
||||||
DEPENDENCIES std_msgs geometry_msgs builtin_interfaces
|
DEPENDENCIES std_msgs geometry_msgs builtin_interfaces
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@ -0,0 +1,21 @@
|
|||||||
|
# PointingTarget.msg — 3-D pointing ray derived from hand landmarks + depth (Issue #221).
|
||||||
|
#
|
||||||
|
# Published on /saltybot/pointing_target at 5 Hz.
|
||||||
|
# Frame: camera_color_optical_frame (D435i colour stream).
|
||||||
|
#
|
||||||
|
# is_active is false when no 'point' gesture has been received recently.
|
||||||
|
|
||||||
|
std_msgs/Header header
|
||||||
|
|
||||||
|
int32 person_id # from Gesture.msg (-1 = unidentified)
|
||||||
|
float32 confidence # detection confidence from the source Gesture
|
||||||
|
bool is_active # true only when a recent 'point' gesture is present
|
||||||
|
string coarse_direction # Gesture.msg direction field: "left"/"right"/"up"/"forward"/"down"
|
||||||
|
|
||||||
|
# 3-D ray in camera_color_optical_frame (metres)
|
||||||
|
geometry_msgs/Point origin # INDEX_MCP (knuckle) position — ray origin
|
||||||
|
geometry_msgs/Vector3 direction # unit vector from MCP toward INDEX_TIP
|
||||||
|
|
||||||
|
# Estimated pointing target
|
||||||
|
geometry_msgs/Point target # origin + direction * range_m
|
||||||
|
float32 range_m # distance along the ray (m)
|
||||||
Loading…
x
Reference in New Issue
Block a user