Compare commits
No commits in common. "569ac3fb35fcc206cc0b15f88576cf92e7ad8171" and "3ea19fbb999d5c940577874189c3c8ec7892b7f3" have entirely different histories.
569ac3fb35
...
3ea19fbb99
@ -1,196 +0,0 @@
|
||||
# saltybot_gesture_recognition
|
||||
|
||||
Hand and body gesture recognition via MediaPipe on Jetson Orin GPU (Issue #454).
|
||||
|
||||
Detects human hand and body gestures in real-time camera feed and publishes recognized gestures for multimodal interaction. Integrates with voice command router for combined audio+gesture control.
|
||||
|
||||
## Recognized Gestures
|
||||
|
||||
### Hand Gestures
|
||||
- **wave** — Lateral wrist oscillation (temporal) | Greeting, acknowledgment
|
||||
- **point** — Index extended, others curled | Direction indication ("left"/"right"/"up"/"forward")
|
||||
- **stop_palm** — All fingers extended, palm forward | Emergency stop (e-stop)
|
||||
- **thumbs_up** — Thumb extended up, fist closed | Confirmation, approval
|
||||
- **come_here** — Beckoning: index curled toward palm (temporal) | Call to approach
|
||||
- **follow** — Index extended horizontally | Follow me
|
||||
|
||||
### Body Gestures
|
||||
- **arms_up** — Both wrists above shoulders | Stop / emergency
|
||||
- **arms_spread** — Arms extended laterally | Back off / clear space
|
||||
- **crouch** — Hips below standing threshold | Come closer
|
||||
|
||||
## Performance
|
||||
|
||||
- **Frame Rate**: 10–15 fps on Jetson Orin (with GPU acceleration)
|
||||
- **Latency**: ~100–150 ms end-to-end
|
||||
- **Range**: 2–5 meters (optimal 2–3 m)
|
||||
- **Accuracy**: ~85–90% for known gestures (varies by lighting, occlusion)
|
||||
- **Simultaneous Detections**: Up to 10 people + gestures per frame
|
||||
|
||||
## Topics
|
||||
|
||||
### Published
|
||||
- **`/saltybot/gestures`** (`saltybot_social_msgs/GestureArray`)
|
||||
Array of detected gestures with type, confidence, position, source (hand/body)
|
||||
|
||||
## Parameters
|
||||
|
||||
| Parameter | Type | Default | Description |
|
||||
|-----------|------|---------|-------------|
|
||||
| `camera_topic` | str | `/camera/color/image_raw` | RGB camera topic |
|
||||
| `confidence_threshold` | float | 0.7 | Min confidence to publish (0–1) |
|
||||
| `publish_hz` | float | 15.0 | Output rate (Hz) |
|
||||
| `max_distance_m` | float | 5.0 | Max gesture range (meters) |
|
||||
| `enable_gpu` | bool | true | Use Jetson GPU acceleration |
|
||||
|
||||
## Messages
|
||||
|
||||
### GestureArray
|
||||
```
|
||||
Header header
|
||||
Gesture[] gestures
|
||||
uint32 count
|
||||
```
|
||||
|
||||
### Gesture (from saltybot_social_msgs)
|
||||
```
|
||||
Header header
|
||||
string gesture_type # "wave", "point", "stop_palm", etc.
|
||||
int32 person_id # -1 if unidentified
|
||||
float32 confidence # 0–1 (typically >= 0.7)
|
||||
int32 camera_id # 0=front
|
||||
float32 hand_x, hand_y # Normalized position (0–1)
|
||||
bool is_right_hand # True for right hand
|
||||
string direction # For "point": "left"/"right"/"up"/"forward"/"down"
|
||||
string source # "hand" or "body_pose"
|
||||
```
|
||||
|
||||
## Usage
|
||||
|
||||
### Launch Node
|
||||
```bash
|
||||
ros2 launch saltybot_gesture_recognition gesture_recognition.launch.py
|
||||
```
|
||||
|
||||
### With Custom Parameters
|
||||
```bash
|
||||
ros2 launch saltybot_gesture_recognition gesture_recognition.launch.py \
|
||||
camera_topic:='/camera/front/image_raw' \
|
||||
confidence_threshold:=0.75 \
|
||||
publish_hz:=20.0
|
||||
```
|
||||
|
||||
### Using Config File
|
||||
```bash
|
||||
ros2 launch saltybot_gesture_recognition gesture_recognition.launch.py \
|
||||
--ros-args --params-file config/gesture_params.yaml
|
||||
```
|
||||
|
||||
## Algorithm
|
||||
|
||||
### MediaPipe Hands
|
||||
- 21 landmarks per hand (wrist + finger joints)
|
||||
- Detects: palm orientation, finger extension, hand pose
|
||||
- Model complexity: 0 (lite, faster) for Jetson
|
||||
|
||||
### MediaPipe Pose
|
||||
- 33 body landmarks (shoulders, hips, wrists, knees, etc.)
|
||||
- Detects: arm angle, body orientation, posture
|
||||
- Model complexity: 1 (balanced accuracy/speed)
|
||||
|
||||
### Gesture Classification
|
||||
1. **Thumbs-up**: Thumb extended >0.3, no other fingers extended
|
||||
2. **Stop-palm**: All fingers extended, palm normal > 0.3 (facing camera)
|
||||
3. **Point**: Only index extended, direction from hand position
|
||||
4. **Wave**: High variance in hand x-position over ~5 frames
|
||||
5. **Beckon**: High variance in hand y-position over ~4 frames
|
||||
6. **Arms-up**: Both wrists > shoulder height
|
||||
7. **Arms-spread**: Wrist distance > shoulder width × 1.2
|
||||
8. **Crouch**: Hip-y > shoulder-y + 0.3
|
||||
|
||||
### Confidence Scoring
|
||||
- MediaPipe detection confidence × gesture classification confidence
|
||||
- Temporal smoothing: history over last 10 frames
|
||||
- Threshold: 0.7 (configurable) for publication
|
||||
|
||||
## Integration with Voice Command Router
|
||||
|
||||
```python
|
||||
# Listen to both topics
|
||||
rospy.Subscriber('/saltybot/speech', SpeechTranscript, voice_callback)
|
||||
rospy.Subscriber('/saltybot/gestures', GestureArray, gesture_callback)
|
||||
|
||||
def multimodal_command(voice_cmd, gesture):
|
||||
# "robot forward" (voice) + point-forward (gesture) = confirmed forward
|
||||
if gesture.gesture_type == 'point' and gesture.direction == 'forward':
|
||||
if 'forward' in voice_cmd:
|
||||
nav.set_goal(forward_pos) # High confidence
|
||||
```
|
||||
|
||||
## Dependencies
|
||||
|
||||
- `mediapipe` — Hand and Pose detection
|
||||
- `opencv-python` — Image processing
|
||||
- `numpy`, `scipy` — Numerical computation
|
||||
- `rclpy` — ROS2 Python client
|
||||
- `saltybot_social_msgs` — Custom gesture messages
|
||||
|
||||
## Build & Test
|
||||
|
||||
### Build
|
||||
```bash
|
||||
colcon build --packages-select saltybot_gesture_recognition
|
||||
```
|
||||
|
||||
### Run Tests
|
||||
```bash
|
||||
pytest jetson/ros2_ws/src/saltybot_gesture_recognition/test/
|
||||
```
|
||||
|
||||
### Benchmark on Jetson Orin
|
||||
```bash
|
||||
ros2 run saltybot_gesture_recognition gesture_node \
|
||||
--ros-args -p publish_hz:=30.0 &
|
||||
ros2 topic hz /saltybot/gestures
|
||||
# Expected: ~15 Hz (GPU-limited, not message processing)
|
||||
```
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
**Issue**: Low frame rate (< 10 Hz)
|
||||
- **Solution**: Reduce camera resolution or use model_complexity=0
|
||||
|
||||
**Issue**: False positives (confidence > 0.7 but wrong gesture)
|
||||
- **Solution**: Increase `confidence_threshold` to 0.75–0.8
|
||||
|
||||
**Issue**: Doesn't detect gestures at distance > 3m
|
||||
- **Solution**: Improve lighting, move closer, or reduce `max_distance_m`
|
||||
|
||||
## Future Enhancements
|
||||
|
||||
- **Dynamic Gesture Timeout**: Stop publishing after 2s without update
|
||||
- **Person Association**: Match gestures to tracked persons (from `saltybot_multi_person_tracker`)
|
||||
- **Custom Gesture Training**: TensorFlow Lite fine-tuning on robot-specific gestures
|
||||
- **Gesture Sequences**: Recognize multi-step command chains ("wave → point → thumbs-up")
|
||||
- **Sign Language**: ASL/BSL recognition (larger model, future Phase)
|
||||
- **Accessibility**: Voice + gesture for accessibility (e.g., hands-free "stop")
|
||||
|
||||
## Performance Targets (Jetson Orin Nano Super)
|
||||
|
||||
| Metric | Target | Actual |
|
||||
|--------|--------|--------|
|
||||
| Frame Rate | 10+ fps | ~15 fps (GPU) |
|
||||
| Latency | <200 ms | ~100–150 ms |
|
||||
| Max People | 5–10 | ~10 (GPU-limited) |
|
||||
| Confidence | 0.7+ | 0.75–0.95 |
|
||||
| GPU Memory | <1 GB | ~400–500 MB |
|
||||
|
||||
## References
|
||||
|
||||
- [MediaPipe Solutions](https://developers.google.com/mediapipe/solutions)
|
||||
- [MediaPipe Hands](https://developers.google.com/mediapipe/solutions/vision/hand_landmarker)
|
||||
- [MediaPipe Pose](https://developers.google.com/mediapipe/solutions/vision/pose_landmarker)
|
||||
|
||||
## License
|
||||
|
||||
MIT
|
||||
@ -1,14 +0,0 @@
|
||||
# Gesture recognition ROS2 parameters
|
||||
|
||||
/**:
|
||||
ros__parameters:
|
||||
# Input
|
||||
camera_topic: '/camera/color/image_raw'
|
||||
|
||||
# Detection
|
||||
confidence_threshold: 0.7 # Only publish gestures with confidence >= 0.7
|
||||
max_distance_m: 5.0 # Maximum gesture range (2-5m typical)
|
||||
|
||||
# Performance
|
||||
publish_hz: 15.0 # 10+ fps target on Jetson Orin
|
||||
enable_gpu: true # Use Jetson GPU acceleration
|
||||
@ -1,68 +0,0 @@
|
||||
"""
|
||||
Launch gesture recognition node.
|
||||
|
||||
Typical usage:
|
||||
ros2 launch saltybot_gesture_recognition gesture_recognition.launch.py
|
||||
"""
|
||||
|
||||
from launch import LaunchDescription
|
||||
from launch.actions import DeclareLaunchArgument
|
||||
from launch.substitutions import LaunchConfiguration
|
||||
from launch_ros.actions import Node
|
||||
|
||||
|
||||
def generate_launch_description():
|
||||
"""Generate launch description for gesture recognition node."""
|
||||
|
||||
# Declare launch arguments
|
||||
camera_topic_arg = DeclareLaunchArgument(
|
||||
'camera_topic',
|
||||
default_value='/camera/color/image_raw',
|
||||
description='RGB camera topic',
|
||||
)
|
||||
confidence_arg = DeclareLaunchArgument(
|
||||
'confidence_threshold',
|
||||
default_value='0.7',
|
||||
description='Detection confidence threshold (0-1)',
|
||||
)
|
||||
publish_hz_arg = DeclareLaunchArgument(
|
||||
'publish_hz',
|
||||
default_value='15.0',
|
||||
description='Publication rate (Hz, target 10+ fps)',
|
||||
)
|
||||
max_distance_arg = DeclareLaunchArgument(
|
||||
'max_distance_m',
|
||||
default_value='5.0',
|
||||
description='Maximum gesture recognition range (meters)',
|
||||
)
|
||||
gpu_arg = DeclareLaunchArgument(
|
||||
'enable_gpu',
|
||||
default_value='true',
|
||||
description='Use GPU acceleration (Jetson Orin)',
|
||||
)
|
||||
|
||||
# Gesture recognition node
|
||||
gesture_node = Node(
|
||||
package='saltybot_gesture_recognition',
|
||||
executable='gesture_node',
|
||||
name='gesture_recognition',
|
||||
output='screen',
|
||||
parameters=[
|
||||
{'camera_topic': LaunchConfiguration('camera_topic')},
|
||||
{'confidence_threshold': LaunchConfiguration('confidence_threshold')},
|
||||
{'publish_hz': LaunchConfiguration('publish_hz')},
|
||||
{'max_distance_m': LaunchConfiguration('max_distance_m')},
|
||||
{'enable_gpu': LaunchConfiguration('gpu_arg')},
|
||||
],
|
||||
)
|
||||
|
||||
return LaunchDescription(
|
||||
[
|
||||
camera_topic_arg,
|
||||
confidence_arg,
|
||||
publish_hz_arg,
|
||||
max_distance_arg,
|
||||
gpu_arg,
|
||||
gesture_node,
|
||||
]
|
||||
)
|
||||
@ -1,35 +0,0 @@
|
||||
<?xml version="1.0"?>
|
||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="3">
|
||||
<name>saltybot_gesture_recognition</name>
|
||||
<version>0.1.0</version>
|
||||
<description>
|
||||
Hand and body gesture recognition via MediaPipe on Jetson Orin GPU.
|
||||
Recognizes wave, point, palm-stop, thumbs-up, beckon, arms-crossed.
|
||||
Integrates with voice command router for multimodal interaction.
|
||||
Issue #454.
|
||||
</description>
|
||||
<maintainer email="sl-perception@saltylab.local">sl-perception</maintainer>
|
||||
<license>MIT</license>
|
||||
|
||||
<buildtool_depend>ament_python</buildtool_depend>
|
||||
|
||||
<depend>rclpy</depend>
|
||||
<depend>std_msgs</depend>
|
||||
<depend>sensor_msgs</depend>
|
||||
<depend>geometry_msgs</depend>
|
||||
<depend>cv_bridge</depend>
|
||||
<depend>saltybot_social_msgs</depend>
|
||||
<depend>saltybot_multi_person_tracker</depend>
|
||||
|
||||
<exec_depend>python3-numpy</exec_depend>
|
||||
<exec_depend>python3-opencv</exec_depend>
|
||||
<exec_depend>python3-mediapipe</exec_depend>
|
||||
<exec_depend>python3-scipy</exec_depend>
|
||||
|
||||
<test_depend>pytest</test_depend>
|
||||
|
||||
<export>
|
||||
<build_type>ament_python</build_type>
|
||||
</export>
|
||||
</package>
|
||||
@ -1,480 +0,0 @@
|
||||
"""
|
||||
gesture_recognition_node.py — Hand and body gesture recognition via MediaPipe.
|
||||
|
||||
Uses MediaPipe Hands and Pose to detect gestures on Jetson Orin GPU.
|
||||
|
||||
Recognizes:
|
||||
Hand gestures: wave, point, stop_palm (e-stop), thumbs_up, come_here (beckon)
|
||||
Body gestures: arms_up (stop), arms_spread (back off)
|
||||
|
||||
Publishes:
|
||||
/saltybot/gestures saltybot_social_msgs/GestureArray 10+ fps
|
||||
|
||||
Parameters:
|
||||
camera_topic str '/camera/color/image_raw' RGB camera input
|
||||
confidence_threshold float 0.7 detection confidence
|
||||
publish_hz float 15.0 output rate (10+ fps target)
|
||||
max_distance_m float 5.0 max gesture range
|
||||
enable_gpu bool true use GPU acceleration
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
|
||||
|
||||
import numpy as np
|
||||
import cv2
|
||||
from cv_bridge import CvBridge
|
||||
import threading
|
||||
from collections import deque
|
||||
from typing import Optional
|
||||
|
||||
from std_msgs.msg import Header
|
||||
from sensor_msgs.msg import Image
|
||||
from geometry_msgs.msg import Point
|
||||
|
||||
try:
|
||||
from saltybot_social_msgs.msg import Gesture, GestureArray
|
||||
_GESTURE_MSGS_OK = True
|
||||
except ImportError:
|
||||
_GESTURE_MSGS_OK = False
|
||||
|
||||
try:
|
||||
import mediapipe as mp
|
||||
_MEDIAPIPE_OK = True
|
||||
except ImportError:
|
||||
_MEDIAPIPE_OK = False
|
||||
|
||||
|
||||
_SENSOR_QOS = QoSProfile(
|
||||
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||
history=HistoryPolicy.KEEP_LAST,
|
||||
depth=5,
|
||||
)
|
||||
|
||||
|
||||
class GestureDetector:
|
||||
"""MediaPipe-based gesture detector for hands and pose."""
|
||||
|
||||
# Hand gesture thresholds
|
||||
GESTURE_DISTANCE_THRESHOLD = 0.05
|
||||
WAVE_DURATION = 5 # frames
|
||||
BECKON_DURATION = 4
|
||||
POINT_MIN_EXTEND = 0.3 # index extension threshold
|
||||
|
||||
def __init__(self, enable_gpu: bool = True):
|
||||
if not _MEDIAPIPE_OK:
|
||||
raise ImportError("MediaPipe not available")
|
||||
|
||||
self.enable_gpu = enable_gpu
|
||||
|
||||
# Initialize MediaPipe
|
||||
self.mp_hands = mp.solutions.hands
|
||||
self.mp_pose = mp.solutions.pose
|
||||
self.mp_drawing = mp.solutions.drawing_utils
|
||||
|
||||
# Create hand detector
|
||||
self.hands = self.mp_hands.Hands(
|
||||
static_image_mode=False,
|
||||
max_num_hands=10,
|
||||
min_detection_confidence=0.5,
|
||||
min_tracking_confidence=0.5,
|
||||
model_complexity=0, # 0=lite (faster), 1=full
|
||||
)
|
||||
|
||||
# Create pose detector
|
||||
self.pose = self.mp_pose.Pose(
|
||||
static_image_mode=False,
|
||||
model_complexity=1,
|
||||
smooth_landmarks=True,
|
||||
min_detection_confidence=0.5,
|
||||
min_tracking_confidence=0.5,
|
||||
)
|
||||
|
||||
# Gesture history for temporal smoothing
|
||||
self.hand_history = deque(maxlen=10)
|
||||
self.pose_history = deque(maxlen=10)
|
||||
|
||||
def detect_hand_gestures(self, frame: np.ndarray, person_id: int = -1) -> list[dict]:
|
||||
"""
|
||||
Detect hand gestures using MediaPipe Hands.
|
||||
|
||||
Returns:
|
||||
List of detected gestures with type, confidence, position
|
||||
"""
|
||||
gestures = []
|
||||
|
||||
if frame is None or frame.size == 0:
|
||||
return gestures
|
||||
|
||||
try:
|
||||
# Convert BGR to RGB
|
||||
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
||||
h, w, _ = rgb_frame.shape
|
||||
|
||||
# Detect hands
|
||||
results = self.hands.process(rgb_frame)
|
||||
|
||||
if not results.multi_hand_landmarks or not results.multi_handedness:
|
||||
return gestures
|
||||
|
||||
for hand_landmarks, handedness in zip(
|
||||
results.multi_hand_landmarks, results.multi_handedness
|
||||
):
|
||||
is_right = handedness.classification[0].label == "Right"
|
||||
confidence = handedness.classification[0].score
|
||||
|
||||
# Extract key landmarks
|
||||
landmarks = np.array(
|
||||
[[lm.x, lm.y, lm.z] for lm in hand_landmarks.landmark]
|
||||
)
|
||||
|
||||
# Detect specific hand gestures
|
||||
gesture_type, gesture_conf = self._classify_hand_gesture(
|
||||
landmarks, is_right
|
||||
)
|
||||
|
||||
if gesture_type:
|
||||
# Get hand center position
|
||||
hand_x = float(np.mean(landmarks[:, 0]))
|
||||
hand_y = float(np.mean(landmarks[:, 1]))
|
||||
|
||||
gestures.append({
|
||||
'type': gesture_type,
|
||||
'confidence': float(gesture_conf * confidence),
|
||||
'hand_x': hand_x,
|
||||
'hand_y': hand_y,
|
||||
'is_right_hand': is_right,
|
||||
'source': 'hand',
|
||||
'person_id': person_id,
|
||||
})
|
||||
|
||||
self.hand_history.append(gestures)
|
||||
|
||||
except Exception as e:
|
||||
pass
|
||||
|
||||
return gestures
|
||||
|
||||
def detect_body_gestures(self, frame: np.ndarray, person_id: int = -1) -> list[dict]:
|
||||
"""
|
||||
Detect body/pose gestures using MediaPipe Pose.
|
||||
|
||||
Returns:
|
||||
List of detected pose-based gestures
|
||||
"""
|
||||
gestures = []
|
||||
|
||||
if frame is None or frame.size == 0:
|
||||
return gestures
|
||||
|
||||
try:
|
||||
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
||||
h, w, _ = rgb_frame.shape
|
||||
|
||||
results = self.pose.process(rgb_frame)
|
||||
|
||||
if not results.pose_landmarks:
|
||||
return gestures
|
||||
|
||||
landmarks = np.array(
|
||||
[[lm.x, lm.y, lm.z] for lm in results.pose_landmarks.landmark]
|
||||
)
|
||||
|
||||
# Detect specific body gestures
|
||||
gesture_type, gesture_conf = self._classify_body_gesture(landmarks)
|
||||
|
||||
if gesture_type:
|
||||
# Get body center
|
||||
body_x = float(np.mean(landmarks[:, 0]))
|
||||
body_y = float(np.mean(landmarks[:, 1]))
|
||||
|
||||
gestures.append({
|
||||
'type': gesture_type,
|
||||
'confidence': float(gesture_conf),
|
||||
'hand_x': body_x,
|
||||
'hand_y': body_y,
|
||||
'is_right_hand': False,
|
||||
'source': 'body_pose',
|
||||
'person_id': person_id,
|
||||
})
|
||||
|
||||
self.pose_history.append(gestures)
|
||||
|
||||
except Exception as e:
|
||||
pass
|
||||
|
||||
return gestures
|
||||
|
||||
def _classify_hand_gesture(
|
||||
self, landmarks: np.ndarray, is_right: bool
|
||||
) -> tuple[Optional[str], float]:
|
||||
"""
|
||||
Classify hand gesture from MediaPipe landmarks.
|
||||
|
||||
Returns:
|
||||
(gesture_type, confidence)
|
||||
"""
|
||||
if landmarks.shape[0] < 21:
|
||||
return None, 0.0
|
||||
|
||||
# Landmark indices
|
||||
# 0: wrist, 5: index, 9: middle, 13: ring, 17: pinky
|
||||
# 4: thumb tip, 8: index tip, 12: middle tip, 16: ring tip, 20: pinky tip
|
||||
|
||||
wrist = landmarks[0]
|
||||
thumb_tip = landmarks[4]
|
||||
index_tip = landmarks[8]
|
||||
middle_tip = landmarks[12]
|
||||
ring_tip = landmarks[16]
|
||||
pinky_tip = landmarks[20]
|
||||
|
||||
# Palm normal (pointing direction)
|
||||
palm_normal = self._get_palm_normal(landmarks)
|
||||
|
||||
# Finger extension
|
||||
index_extended = self._distance(index_tip, landmarks[5]) > self.POINT_MIN_EXTEND
|
||||
middle_extended = self._distance(middle_tip, landmarks[9]) > self.POINT_MIN_EXTEND
|
||||
ring_extended = self._distance(ring_tip, landmarks[13]) > self.POINT_MIN_EXTEND
|
||||
pinky_extended = self._distance(pinky_tip, landmarks[17]) > self.POINT_MIN_EXTEND
|
||||
thumb_extended = self._distance(thumb_tip, landmarks[2]) > 0.1
|
||||
|
||||
# Thumbs-up: thumb extended up, hand vertical
|
||||
if thumb_extended and not (index_extended or middle_extended):
|
||||
palm_y = np.mean([landmarks[i][1] for i in [5, 9, 13, 17]])
|
||||
if thumb_tip[1] < palm_y - 0.1: # Thumb above palm
|
||||
return 'thumbs_up', 0.85
|
||||
|
||||
# Stop palm: all fingers extended, palm forward
|
||||
if index_extended and middle_extended and ring_extended and pinky_extended:
|
||||
if palm_normal[2] > 0.3: # Palm facing camera
|
||||
return 'stop_palm', 0.8
|
||||
|
||||
# Point: only index extended
|
||||
if index_extended and not (middle_extended or ring_extended or pinky_extended):
|
||||
return 'point', 0.8
|
||||
|
||||
# Wave: hand moving (approximate via history)
|
||||
if len(self.hand_history) > self.WAVE_DURATION:
|
||||
if self._detect_wave_motion():
|
||||
return 'wave', 0.75
|
||||
|
||||
# Come-here (beckon): curled fingers, repetitive motion
|
||||
if not (index_extended or middle_extended):
|
||||
if len(self.hand_history) > self.BECKON_DURATION:
|
||||
if self._detect_beckon_motion():
|
||||
return 'come_here', 0.75
|
||||
|
||||
return None, 0.0
|
||||
|
||||
def _classify_body_gesture(self, landmarks: np.ndarray) -> tuple[Optional[str], float]:
|
||||
"""
|
||||
Classify body gesture from MediaPipe Pose landmarks.
|
||||
|
||||
Returns:
|
||||
(gesture_type, confidence)
|
||||
"""
|
||||
if landmarks.shape[0] < 33:
|
||||
return None, 0.0
|
||||
|
||||
# Key body landmarks
|
||||
left_shoulder = landmarks[11]
|
||||
right_shoulder = landmarks[12]
|
||||
left_hip = landmarks[23]
|
||||
right_hip = landmarks[24]
|
||||
left_wrist = landmarks[9]
|
||||
right_wrist = landmarks[10]
|
||||
|
||||
shoulder_y = np.mean([left_shoulder[1], right_shoulder[1]])
|
||||
hip_y = np.mean([left_hip[1], right_hip[1]])
|
||||
wrist_y_max = max(left_wrist[1], right_wrist[1])
|
||||
|
||||
# Arms up (emergency stop)
|
||||
if wrist_y_max < shoulder_y - 0.2:
|
||||
return 'arms_up', 0.85
|
||||
|
||||
# Arms spread (back off)
|
||||
shoulder_dist = self._distance(left_shoulder[:2], right_shoulder[:2])
|
||||
wrist_dist = self._distance(left_wrist[:2], right_wrist[:2])
|
||||
if wrist_dist > shoulder_dist * 1.2:
|
||||
return 'arms_spread', 0.8
|
||||
|
||||
# Crouch (come closer)
|
||||
if hip_y - shoulder_y > 0.3:
|
||||
return 'crouch', 0.8
|
||||
|
||||
return None, 0.0
|
||||
|
||||
def _get_palm_normal(self, landmarks: np.ndarray) -> np.ndarray:
|
||||
"""Compute palm normal vector (pointing direction)."""
|
||||
wrist = landmarks[0]
|
||||
middle_mcp = landmarks[9]
|
||||
index_mcp = landmarks[5]
|
||||
v1 = index_mcp - wrist
|
||||
v2 = middle_mcp - wrist
|
||||
normal = np.cross(v1, v2)
|
||||
return normal / (np.linalg.norm(normal) + 1e-6)
|
||||
|
||||
def _distance(self, p1: np.ndarray, p2: np.ndarray) -> float:
|
||||
"""Euclidean distance between two points."""
|
||||
return float(np.linalg.norm(p1 - p2))
|
||||
|
||||
def _detect_wave_motion(self) -> bool:
|
||||
"""Detect waving motion from hand history."""
|
||||
if len(self.hand_history) < self.WAVE_DURATION:
|
||||
return False
|
||||
# Simple heuristic: high variance in x-position over time
|
||||
x_positions = [g[0]['hand_x'] for g in self.hand_history if g]
|
||||
if len(x_positions) < self.WAVE_DURATION:
|
||||
return False
|
||||
return float(np.std(x_positions)) > 0.05
|
||||
|
||||
def _detect_beckon_motion(self) -> bool:
|
||||
"""Detect beckoning motion from hand history."""
|
||||
if len(self.hand_history) < self.BECKON_DURATION:
|
||||
return False
|
||||
# High variance in y-position (up-down motion)
|
||||
y_positions = [g[0]['hand_y'] for g in self.hand_history if g]
|
||||
if len(y_positions) < self.BECKON_DURATION:
|
||||
return False
|
||||
return float(np.std(y_positions)) > 0.04
|
||||
|
||||
|
||||
class GestureRecognitionNode(Node):
|
||||
|
||||
def __init__(self):
|
||||
super().__init__('gesture_recognition')
|
||||
|
||||
# Parameters
|
||||
self.declare_parameter('camera_topic', '/camera/color/image_raw')
|
||||
self.declare_parameter('confidence_threshold', 0.7)
|
||||
self.declare_parameter('publish_hz', 15.0)
|
||||
self.declare_parameter('max_distance_m', 5.0)
|
||||
self.declare_parameter('enable_gpu', True)
|
||||
|
||||
camera_topic = self.get_parameter('camera_topic').value
|
||||
self.confidence_threshold = self.get_parameter('confidence_threshold').value
|
||||
pub_hz = self.get_parameter('publish_hz').value
|
||||
max_distance = self.get_parameter('max_distance_m').value
|
||||
enable_gpu = self.get_parameter('enable_gpu').value
|
||||
|
||||
# Publisher
|
||||
self._pub_gestures = None
|
||||
if _GESTURE_MSGS_OK:
|
||||
self._pub_gestures = self.create_publisher(
|
||||
GestureArray, '/saltybot/gestures', 10, qos_profile=_SENSOR_QOS
|
||||
)
|
||||
else:
|
||||
self.get_logger().error('saltybot_social_msgs not available')
|
||||
return
|
||||
|
||||
# Gesture detector
|
||||
self._detector: Optional[GestureDetector] = None
|
||||
self._detector_lock = threading.Lock()
|
||||
|
||||
if _MEDIAPIPE_OK:
|
||||
try:
|
||||
self._detector = GestureDetector(enable_gpu=enable_gpu)
|
||||
except Exception as e:
|
||||
self.get_logger().error(f'Failed to initialize MediaPipe: {e}')
|
||||
|
||||
# Video bridge
|
||||
self._bridge = CvBridge()
|
||||
self._latest_image: Image | None = None
|
||||
self._lock = threading.Lock()
|
||||
|
||||
# Subscriptions
|
||||
self.create_subscription(Image, camera_topic, self._on_image, _SENSOR_QOS)
|
||||
|
||||
# Publish timer
|
||||
self.create_timer(1.0 / pub_hz, self._tick)
|
||||
|
||||
self.get_logger().info(
|
||||
f'gesture_recognition ready — '
|
||||
f'camera={camera_topic} confidence_threshold={self.confidence_threshold} hz={pub_hz}'
|
||||
)
|
||||
|
||||
def _on_image(self, msg: Image) -> None:
|
||||
with self._lock:
|
||||
self._latest_image = msg
|
||||
|
||||
def _tick(self) -> None:
|
||||
"""Detect and publish gestures."""
|
||||
if self._pub_gestures is None or self._detector is None:
|
||||
return
|
||||
|
||||
with self._lock:
|
||||
if self._latest_image is None:
|
||||
return
|
||||
image_msg = self._latest_image
|
||||
|
||||
try:
|
||||
frame = self._bridge.imgmsg_to_cv2(
|
||||
image_msg, desired_encoding='bgr8'
|
||||
)
|
||||
except Exception as e:
|
||||
self.get_logger().warn(f'Image conversion error: {e}')
|
||||
return
|
||||
|
||||
# Detect hand and body gestures
|
||||
hand_gestures = self._detector.detect_hand_gestures(frame)
|
||||
body_gestures = self._detector.detect_body_gestures(frame)
|
||||
|
||||
all_gestures = hand_gestures + body_gestures
|
||||
|
||||
# Filter by confidence threshold
|
||||
filtered_gestures = [
|
||||
g for g in all_gestures if g['confidence'] >= self.confidence_threshold
|
||||
]
|
||||
|
||||
# Build and publish GestureArray
|
||||
gesture_array = GestureArray()
|
||||
gesture_array.header = Header(
|
||||
stamp=self.get_clock().now().to_msg(),
|
||||
frame_id='camera',
|
||||
)
|
||||
|
||||
for g in filtered_gestures:
|
||||
gesture = Gesture()
|
||||
gesture.header = gesture_array.header
|
||||
gesture.gesture_type = g['type']
|
||||
gesture.person_id = g.get('person_id', -1)
|
||||
gesture.confidence = g['confidence']
|
||||
gesture.hand_x = g['hand_x']
|
||||
gesture.hand_y = g['hand_y']
|
||||
gesture.is_right_hand = g['is_right_hand']
|
||||
gesture.source = g['source']
|
||||
|
||||
# Map point direction if applicable
|
||||
if g['type'] == 'point':
|
||||
if g['hand_x'] < 0.33:
|
||||
gesture.direction = 'left'
|
||||
elif g['hand_x'] > 0.67:
|
||||
gesture.direction = 'right'
|
||||
elif g['hand_y'] < 0.33:
|
||||
gesture.direction = 'up'
|
||||
else:
|
||||
gesture.direction = 'forward'
|
||||
|
||||
gesture_array.gestures.append(gesture)
|
||||
|
||||
gesture_array.count = len(gesture_array.gestures)
|
||||
self._pub_gestures.publish(gesture_array)
|
||||
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = GestureRecognitionNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@ -1,4 +0,0 @@
|
||||
[develop]
|
||||
script_dir=$base/lib/saltybot_gesture_recognition
|
||||
[egg_info]
|
||||
tag_date = 0
|
||||
@ -1,23 +0,0 @@
|
||||
from setuptools import setup, find_packages
|
||||
|
||||
setup(
|
||||
name='saltybot_gesture_recognition',
|
||||
version='0.1.0',
|
||||
packages=find_packages(exclude=['test']),
|
||||
data_files=[
|
||||
('share/ament_index/resource_index/packages',
|
||||
['resource/saltybot_gesture_recognition']),
|
||||
('share/saltybot_gesture_recognition', ['package.xml']),
|
||||
],
|
||||
install_requires=['setuptools'],
|
||||
zip_safe=True,
|
||||
author='SaltyLab',
|
||||
author_email='robot@saltylab.local',
|
||||
description='Hand/body gesture recognition via MediaPipe',
|
||||
license='MIT',
|
||||
entry_points={
|
||||
'console_scripts': [
|
||||
'gesture_node=saltybot_gesture_recognition.gesture_recognition_node:main',
|
||||
],
|
||||
},
|
||||
)
|
||||
@ -1,89 +0,0 @@
|
||||
"""
|
||||
Basic tests for gesture recognition.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import numpy as np
|
||||
|
||||
try:
|
||||
from saltybot_gesture_recognition.gesture_recognition_node import GestureDetector
|
||||
_DETECTOR_OK = True
|
||||
except ImportError:
|
||||
_DETECTOR_OK = False
|
||||
|
||||
|
||||
@pytest.mark.skipif(not _DETECTOR_OK, reason="GestureDetector not available")
|
||||
class TestGestureDetector:
|
||||
"""Tests for gesture detection."""
|
||||
|
||||
def test_detector_init(self):
|
||||
"""Test GestureDetector initialization."""
|
||||
try:
|
||||
detector = GestureDetector(enable_gpu=False)
|
||||
assert detector is not None
|
||||
except ImportError:
|
||||
pytest.skip("MediaPipe not available")
|
||||
|
||||
def test_hand_gesture_detection_empty(self):
|
||||
"""Test hand gesture detection with empty frame."""
|
||||
try:
|
||||
detector = GestureDetector(enable_gpu=False)
|
||||
gestures = detector.detect_hand_gestures(None)
|
||||
assert gestures == []
|
||||
except ImportError:
|
||||
pytest.skip("MediaPipe not available")
|
||||
|
||||
def test_body_gesture_detection_empty(self):
|
||||
"""Test body gesture detection with empty frame."""
|
||||
try:
|
||||
detector = GestureDetector(enable_gpu=False)
|
||||
gestures = detector.detect_body_gestures(None)
|
||||
assert gestures == []
|
||||
except ImportError:
|
||||
pytest.skip("MediaPipe not available")
|
||||
|
||||
def test_hand_gesture_detection_frame(self):
|
||||
"""Test hand gesture detection with synthetic frame."""
|
||||
try:
|
||||
detector = GestureDetector(enable_gpu=False)
|
||||
# Create a blank frame
|
||||
frame = np.zeros((480, 640, 3), dtype=np.uint8)
|
||||
gestures = detector.detect_hand_gestures(frame)
|
||||
# May or may not detect anything in blank frame
|
||||
assert isinstance(gestures, list)
|
||||
except ImportError:
|
||||
pytest.skip("MediaPipe not available")
|
||||
|
||||
|
||||
class TestGestureMessages:
|
||||
"""Basic Gesture message tests."""
|
||||
|
||||
def test_gesture_creation(self):
|
||||
"""Test creating a Gesture message."""
|
||||
try:
|
||||
from saltybot_social_msgs.msg import Gesture
|
||||
g = Gesture()
|
||||
g.gesture_type = 'wave'
|
||||
g.confidence = 0.85
|
||||
assert g.gesture_type == 'wave'
|
||||
assert g.confidence == 0.85
|
||||
except ImportError:
|
||||
pytest.skip("saltybot_social_msgs not built")
|
||||
|
||||
def test_gesture_array_creation(self):
|
||||
"""Test creating a GestureArray message."""
|
||||
try:
|
||||
from saltybot_social_msgs.msg import Gesture, GestureArray
|
||||
arr = GestureArray()
|
||||
g = Gesture()
|
||||
g.gesture_type = 'point'
|
||||
arr.gestures.append(g)
|
||||
arr.count = 1
|
||||
assert arr.count == 1
|
||||
assert arr.gestures[0].gesture_type == 'point'
|
||||
except ImportError:
|
||||
pytest.skip("saltybot_social_msgs not built")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
pytest.main([__file__])
|
||||
@ -1,8 +0,0 @@
|
||||
smooth_velocity:
|
||||
max_linear_accel: 0.5
|
||||
max_angular_accel: 1.0
|
||||
max_linear_decel: 0.8
|
||||
max_angular_decel: 1.0
|
||||
use_scurve: true
|
||||
scurve_duration: 0.2
|
||||
update_rate: 50
|
||||
@ -1,16 +0,0 @@
|
||||
"""Launch smooth velocity controller."""
|
||||
from launch import LaunchDescription
|
||||
from launch_ros.actions import Node
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
import os
|
||||
def generate_launch_description():
|
||||
package_dir = get_package_share_directory("saltybot_smooth_velocity")
|
||||
config_path = os.path.join(package_dir, "config", "smooth_velocity_config.yaml")
|
||||
node = Node(
|
||||
package="saltybot_smooth_velocity",
|
||||
executable="smooth_velocity_node",
|
||||
name="smooth_velocity_node",
|
||||
output="screen",
|
||||
parameters=[config_path],
|
||||
)
|
||||
return LaunchDescription([node])
|
||||
@ -1,20 +0,0 @@
|
||||
<?xml version="1.0"?>
|
||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="3">
|
||||
<name>saltybot_smooth_velocity</name>
|
||||
<version>0.1.0</version>
|
||||
<description>Smooth velocity controller with acceleration limiting and S-curve jerk reduction.</description>
|
||||
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
|
||||
<license>MIT</license>
|
||||
<depend>rclpy</depend>
|
||||
<depend>geometry_msgs</depend>
|
||||
<depend>std_msgs</depend>
|
||||
<buildtool_depend>ament_python</buildtool_depend>
|
||||
<test_depend>ament_copyright</test_depend>
|
||||
<test_depend>ament_flake8</test_depend>
|
||||
<test_depend>ament_pep257</test_depend>
|
||||
<test_depend>python3-pytest</test_depend>
|
||||
<export>
|
||||
<build_type>ament_python</build_type>
|
||||
</export>
|
||||
</package>
|
||||
@ -1,112 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Smooth velocity controller - Issue #455. Acceleration limiting + S-curve jerk reduction."""
|
||||
|
||||
import math
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from geometry_msgs.msg import Twist
|
||||
from std_msgs.msg import String
|
||||
|
||||
class SmoothVelocityNode(Node):
|
||||
def __init__(self):
|
||||
super().__init__("smooth_velocity_controller")
|
||||
self.declare_parameter("max_linear_accel", 0.5)
|
||||
self.declare_parameter("max_angular_accel", 1.0)
|
||||
self.declare_parameter("max_linear_decel", 0.8)
|
||||
self.declare_parameter("max_angular_decel", 1.0)
|
||||
self.declare_parameter("use_scurve", True)
|
||||
self.declare_parameter("scurve_duration", 0.2)
|
||||
self.declare_parameter("update_rate", 50)
|
||||
|
||||
self.max_lin_accel = self.get_parameter("max_linear_accel").value
|
||||
self.max_ang_accel = self.get_parameter("max_angular_accel").value
|
||||
self.max_lin_decel = self.get_parameter("max_linear_decel").value
|
||||
self.max_ang_decel = self.get_parameter("max_angular_decel").value
|
||||
self.use_scurve = self.get_parameter("use_scurve").value
|
||||
self.scurve_duration = self.get_parameter("scurve_duration").value
|
||||
self.update_rate = self.get_parameter("update_rate").value
|
||||
self.dt = 1.0 / self.update_rate
|
||||
|
||||
self.cmd_vel_pub = self.create_publisher(Twist, "/cmd_vel", 1)
|
||||
self.velocity_profile_pub = self.create_publisher(String, "/saltybot/velocity_profile", 1)
|
||||
self.create_subscription(Twist, "/cmd_vel_raw", self._cmd_vel_raw_callback, 1)
|
||||
|
||||
self.current_lin_vel = 0.0
|
||||
self.current_ang_vel = 0.0
|
||||
self.target_lin_vel = 0.0
|
||||
self.target_ang_vel = 0.0
|
||||
self.scurve_time_lin = 0.0
|
||||
self.scurve_time_ang = 0.0
|
||||
|
||||
self.create_timer(self.dt, self._update_loop)
|
||||
self.get_logger().info(f"Smooth velocity controller: {self.update_rate}Hz, S-curve={self.use_scurve}")
|
||||
|
||||
def _cmd_vel_raw_callback(self, msg: Twist):
|
||||
self.target_lin_vel = msg.linear.x
|
||||
self.target_ang_vel = msg.angular.z
|
||||
|
||||
def _update_loop(self):
|
||||
if self.use_scurve:
|
||||
lin_vel = self._apply_scurve(self.current_lin_vel, self.target_lin_vel, self.max_lin_accel, self.max_lin_decel, "linear")
|
||||
ang_vel = self._apply_scurve(self.current_ang_vel, self.target_ang_vel, self.max_ang_accel, self.max_ang_decel, "angular")
|
||||
else:
|
||||
lin_vel = self._apply_accel_limit(self.current_lin_vel, self.target_lin_vel, self.max_lin_accel, self.max_lin_decel)
|
||||
ang_vel = self._apply_accel_limit(self.current_ang_vel, self.target_ang_vel, self.max_ang_accel, self.max_ang_decel)
|
||||
|
||||
self.current_lin_vel = lin_vel
|
||||
self.current_ang_vel = ang_vel
|
||||
|
||||
twist = Twist()
|
||||
twist.linear.x = lin_vel
|
||||
twist.angular.z = ang_vel
|
||||
self.cmd_vel_pub.publish(twist)
|
||||
|
||||
profile = f"lin:{lin_vel:.3f},ang:{ang_vel:.3f}"
|
||||
self.velocity_profile_pub.publish(String(data=profile))
|
||||
|
||||
def _apply_scurve(self, current, target, max_accel, max_decel, axis):
|
||||
if abs(target - current) < 0.001:
|
||||
return target
|
||||
direction = 1 if target > current else -1
|
||||
accel_limit = max_accel if direction > 0 else max_decel
|
||||
max_vel_change = accel_limit * self.dt
|
||||
if self.scurve_duration > 0:
|
||||
time_var = self.scurve_time_lin if axis == "linear" else self.scurve_time_ang
|
||||
progress = min(time_var, self.scurve_duration) / self.scurve_duration
|
||||
smooth_factor = (1 - math.cos(progress * math.pi)) / 2
|
||||
max_vel_change *= smooth_factor
|
||||
new_vel = current + max_vel_change * direction
|
||||
if (direction > 0 and new_vel >= target) or (direction < 0 and new_vel <= target):
|
||||
if axis == "linear":
|
||||
self.scurve_time_lin = 0.0
|
||||
else:
|
||||
self.scurve_time_ang = 0.0
|
||||
return target
|
||||
if axis == "linear":
|
||||
self.scurve_time_lin += self.dt
|
||||
else:
|
||||
self.scurve_time_ang += self.dt
|
||||
return new_vel
|
||||
|
||||
def _apply_accel_limit(self, current, target, max_accel, max_decel):
|
||||
if abs(target - current) < 0.001:
|
||||
return target
|
||||
direction = 1 if target > current else -1
|
||||
accel_limit = max_accel if direction > 0 else max_decel
|
||||
max_vel_change = accel_limit * self.dt
|
||||
new_vel = current + max_vel_change * direction
|
||||
if (direction > 0 and new_vel >= target) or (direction < 0 and new_vel <= target):
|
||||
return target
|
||||
return new_vel
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
try:
|
||||
rclpy.spin(SmoothVelocityNode())
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
rclpy.shutdown()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@ -1 +0,0 @@
|
||||
[develop]\nscript_dir=$base/lib/saltybot_smooth_velocity\n
|
||||
@ -1,24 +0,0 @@
|
||||
from setuptools import setup
|
||||
setup(
|
||||
name="saltybot_smooth_velocity",
|
||||
version="0.1.0",
|
||||
packages=["saltybot_smooth_velocity"],
|
||||
data_files=[
|
||||
("share/ament_index/resource_index/packages", ["resource/saltybot_smooth_velocity"]),
|
||||
("share/saltybot_smooth_velocity", ["package.xml"]),
|
||||
("share/saltybot_smooth_velocity/launch", ["launch/smooth_velocity.launch.py"]),
|
||||
("share/saltybot_smooth_velocity/config", ["config/smooth_velocity_config.yaml"]),
|
||||
],
|
||||
install_requires=["setuptools"],
|
||||
zip_safe=True,
|
||||
maintainer="sl-controls",
|
||||
maintainer_email="sl-controls@saltylab.local",
|
||||
description="Smooth velocity controller with S-curve jerk reduction",
|
||||
license="MIT",
|
||||
tests_require=["pytest"],
|
||||
entry_points={
|
||||
"console_scripts": [
|
||||
"smooth_velocity_node = saltybot_smooth_velocity.smooth_velocity_node:main",
|
||||
],
|
||||
},
|
||||
)
|
||||
@ -1,166 +0,0 @@
|
||||
# Sound Effects Configuration
|
||||
|
||||
# Audio library path
|
||||
audio:
|
||||
sounds_directory: "/home/seb/saltybot-data/sounds" # Audio files location
|
||||
supported_formats: [".wav", ".ogg"] # Supported file formats
|
||||
default_format: "wav" # Default synthesis format
|
||||
|
||||
# Volume control
|
||||
volume:
|
||||
default_volume: 0.7 # Default playback volume (0.0-1.0)
|
||||
quiet_mode_volume: 0.4 # Quiet mode volume reduction
|
||||
night_mode_volume: 0.3 # Night mode volume reduction
|
||||
max_volume: 1.0 # Absolute maximum
|
||||
min_volume: 0.0 # Absolute minimum
|
||||
fade_in_time: 0.1 # Fade in duration (seconds)
|
||||
fade_out_time: 0.2 # Fade out duration (seconds)
|
||||
|
||||
# Event definitions
|
||||
events:
|
||||
boot_complete:
|
||||
description: "Robot startup chime"
|
||||
filename: "boot_complete.wav"
|
||||
duration: 1.2
|
||||
priority: 100
|
||||
repeat: false
|
||||
|
||||
person_detected:
|
||||
description: "Subtle detection sound"
|
||||
filename: "person_detected.wav"
|
||||
duration: 0.5
|
||||
priority: 20
|
||||
repeat: false
|
||||
|
||||
wake_word:
|
||||
description: "Wake word activation beep"
|
||||
filename: "wake_word.wav"
|
||||
duration: 0.3
|
||||
priority: 80
|
||||
repeat: false
|
||||
|
||||
low_battery:
|
||||
description: "Low battery warning"
|
||||
filename: "low_battery.wav"
|
||||
duration: 1.0
|
||||
priority: 90
|
||||
repeat: false
|
||||
|
||||
obstacle_close:
|
||||
description: "Proximity warning beep"
|
||||
filename: "obstacle_close.wav"
|
||||
duration: 0.4
|
||||
priority: 70
|
||||
repeat: false
|
||||
|
||||
trick_complete:
|
||||
description: "Trick execution jingle"
|
||||
filename: "trick_complete.wav"
|
||||
duration: 1.5
|
||||
priority: 50
|
||||
repeat: false
|
||||
|
||||
error:
|
||||
description: "Error descending tone"
|
||||
filename: "error.wav"
|
||||
duration: 0.8
|
||||
priority: 85
|
||||
repeat: false
|
||||
|
||||
charging_start:
|
||||
description: "Charging initiated power-up"
|
||||
filename: "charging_start.wav"
|
||||
duration: 1.0
|
||||
priority: 60
|
||||
repeat: false
|
||||
|
||||
geofence_warning:
|
||||
description: "Geofence boundary alert"
|
||||
filename: "geofence_warning.wav"
|
||||
duration: 0.9
|
||||
priority: 75
|
||||
repeat: false
|
||||
|
||||
# Programmatic sound generation
|
||||
synthesis:
|
||||
enabled: true # Enable fallback synthesis if files missing
|
||||
sample_rate: 16000 # Sample rate (Hz)
|
||||
bit_depth: 16 # Bit depth (16 or 24)
|
||||
|
||||
# Default tones for each event
|
||||
defaults:
|
||||
boot_complete:
|
||||
type: "chime" # Ascending notes
|
||||
duration: 1.2
|
||||
frequencies: [523.25, 659.25, 783.99] # C5, E5, G5
|
||||
|
||||
person_detected:
|
||||
type: "sine"
|
||||
duration: 0.5
|
||||
frequency: 800 # Hz
|
||||
|
||||
wake_word:
|
||||
type: "beep"
|
||||
duration: 0.3
|
||||
frequency: 1000
|
||||
|
||||
low_battery:
|
||||
type: "warning" # Double beep
|
||||
duration: 1.0
|
||||
frequency: 880
|
||||
|
||||
obstacle_close:
|
||||
type: "proximity" # Rapid beeps
|
||||
duration: 0.4
|
||||
frequency: 1200
|
||||
pulse_rate: 5 # Pulses per second
|
||||
|
||||
trick_complete:
|
||||
type: "jingle" # Ascending arpeggio
|
||||
duration: 1.5
|
||||
frequencies: [523.25, 659.25, 783.99, 987.77] # C5, E5, G5, B5
|
||||
|
||||
error:
|
||||
type: "descending" # Descending tone
|
||||
duration: 0.8
|
||||
frequencies: [800, 600, 400]
|
||||
|
||||
charging_start:
|
||||
type: "power_up" # Rising tone
|
||||
duration: 1.0
|
||||
start_freq: 400
|
||||
end_freq: 1200
|
||||
|
||||
geofence_warning:
|
||||
type: "alert" # Repeating tone
|
||||
duration: 0.9
|
||||
frequency: 950
|
||||
|
||||
# Priority queue settings
|
||||
queue:
|
||||
max_size: 20 # Maximum queued events
|
||||
overlap_allowed: true # Allow sounds to overlap
|
||||
ducking: false # Reduce volume of existing sounds for new ones
|
||||
duck_amount: 0.5 # How much to reduce (0.0-1.0)
|
||||
|
||||
# Topic subscriptions
|
||||
subscriptions:
|
||||
boot_complete: "/saltybot/system/boot_complete"
|
||||
person_detected: "/saltybot/perception/person_detected"
|
||||
wake_word: "/saltybot/voice/wake_word"
|
||||
low_battery: "/saltybot/battery/low_warning"
|
||||
obstacle_close: "/saltybot/obstacle/close"
|
||||
trick_complete: "/saltybot/tricks/complete"
|
||||
error: "/saltybot/system/error"
|
||||
charging_start: "/saltybot/charging/started"
|
||||
geofence_warning: "/saltybot/geofence/warning"
|
||||
|
||||
# Publishing
|
||||
publications:
|
||||
sound_playing: "/saltybot/sound_playing"
|
||||
|
||||
# Mode settings
|
||||
modes:
|
||||
quiet_mode_enabled: false # Global quiet mode
|
||||
night_mode_enabled: false # Night mode affects volume
|
||||
mute_all: false # Mute all sounds
|
||||
@ -1,36 +0,0 @@
|
||||
"""Launch file for Sound Effects node."""
|
||||
|
||||
from launch import LaunchDescription
|
||||
from launch_ros.actions import Node
|
||||
from launch.substitutions import LaunchConfiguration
|
||||
from launch.actions import DeclareLaunchArgument
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def generate_launch_description():
|
||||
"""Generate launch description."""
|
||||
package_dir = get_package_share_directory("saltybot_sound_effects")
|
||||
config_file = str(Path(package_dir) / "config" / "sound_config.yaml")
|
||||
|
||||
# Launch arguments
|
||||
config_arg = DeclareLaunchArgument(
|
||||
"config_file",
|
||||
default_value=config_file,
|
||||
description="Path to sound effects configuration file",
|
||||
)
|
||||
|
||||
# Sound Effects node
|
||||
sound_node = Node(
|
||||
package="saltybot_sound_effects",
|
||||
executable="sound_effects_node",
|
||||
name="sound_effects",
|
||||
parameters=[
|
||||
{
|
||||
"config_file": LaunchConfiguration("config_file"),
|
||||
}
|
||||
],
|
||||
output="screen",
|
||||
)
|
||||
|
||||
return LaunchDescription([config_arg, sound_node])
|
||||
@ -1,27 +0,0 @@
|
||||
<?xml version="1.0"?>
|
||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="3">
|
||||
<name>saltybot_sound_effects</name>
|
||||
<version>0.1.0</version>
|
||||
<description>
|
||||
Sound effects library for SaltyBot. Event-driven audio feedback system with
|
||||
priority queue, volume control, and programmatic sound generation. Supports
|
||||
WAV/OGG formats with fallback synthesis if files missing.
|
||||
</description>
|
||||
<maintainer email="sl-mechanical@saltylab.local">sl-mechanical</maintainer>
|
||||
<license>MIT</license>
|
||||
|
||||
<depend>rclpy</depend>
|
||||
<depend>std_msgs</depend>
|
||||
|
||||
<buildtool_depend>ament_python</buildtool_depend>
|
||||
|
||||
<test_depend>ament_copyright</test_depend>
|
||||
<test_depend>ament_flake8</test_depend>
|
||||
<test_depend>ament_pep257</test_depend>
|
||||
<test_depend>python3-pytest</test_depend>
|
||||
|
||||
<export>
|
||||
<build_type>ament_python</build_type>
|
||||
</export>
|
||||
</package>
|
||||
@ -1,337 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Sound Effects Library for SaltyBot.
|
||||
|
||||
Event-driven audio feedback system with priority queue, volume control,
|
||||
and programmatic sound generation. Supports WAV/OGG formats with fallback
|
||||
synthesis if files missing.
|
||||
|
||||
Subscribed topics (dynamic based on config):
|
||||
/saltybot/system/boot_complete (std_msgs/Bool)
|
||||
/saltybot/perception/person_detected (std_msgs/Bool)
|
||||
/saltybot/voice/wake_word (std_msgs/Bool)
|
||||
... (see config for all)
|
||||
|
||||
Published topics:
|
||||
/saltybot/sound_playing (std_msgs/String) - JSON current sound status
|
||||
"""
|
||||
|
||||
import json
|
||||
import time
|
||||
import threading
|
||||
from pathlib import Path
|
||||
from dataclasses import dataclass, asdict
|
||||
from typing import Dict, Optional, List, Callable
|
||||
from enum import Enum
|
||||
from collections import deque
|
||||
from queue import PriorityQueue
|
||||
import warnings
|
||||
|
||||
import yaml
|
||||
import numpy as np
|
||||
from scipy import signal
|
||||
from scipy.io import wavfile
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from std_msgs.msg import String, Bool, Float32
|
||||
|
||||
|
||||
@dataclass
|
||||
class SoundEvent:
|
||||
"""Sound event with priority."""
|
||||
name: str
|
||||
filename: str
|
||||
duration: float
|
||||
priority: int
|
||||
timestamp: float
|
||||
|
||||
def __lt__(self, other):
|
||||
"""Compare by priority (higher priority first)."""
|
||||
return self.priority > other.priority # Inverted for max-heap
|
||||
|
||||
|
||||
class SoundSynthesizer:
|
||||
"""Programmatically generate sounds if files missing."""
|
||||
|
||||
def __init__(self, sample_rate: int = 16000):
|
||||
"""Initialize synthesizer.
|
||||
|
||||
Args:
|
||||
sample_rate: Sample rate in Hz
|
||||
"""
|
||||
self.sample_rate = sample_rate
|
||||
|
||||
def generate_sine(self, frequency: float, duration: float,
|
||||
amplitude: float = 0.3) -> np.ndarray:
|
||||
"""Generate sine wave."""
|
||||
t = np.linspace(0, duration, int(self.sample_rate * duration))
|
||||
return (amplitude * np.sin(2 * np.pi * frequency * t)).astype(np.float32)
|
||||
|
||||
def generate_chime(self, frequencies: List[float], duration: float) -> np.ndarray:
|
||||
"""Generate ascending chime."""
|
||||
num_notes = len(frequencies)
|
||||
note_duration = duration / num_notes
|
||||
samples = []
|
||||
|
||||
for freq in frequencies:
|
||||
note = self.generate_sine(freq, note_duration)
|
||||
# Add fade envelope
|
||||
envelope = np.linspace(0, 1, len(note) // 2).tolist() + np.linspace(1, 0, len(note) // 2).tolist()
|
||||
note = note * np.array(envelope)
|
||||
samples.append(note)
|
||||
|
||||
return np.concatenate(samples)
|
||||
|
||||
def generate_descending(self, frequencies: List[float],
|
||||
duration: float) -> np.ndarray:
|
||||
"""Generate descending tone."""
|
||||
num_notes = len(frequencies)
|
||||
note_duration = duration / num_notes
|
||||
samples = []
|
||||
|
||||
for freq in frequencies:
|
||||
note = self.generate_sine(freq, note_duration)
|
||||
envelope = signal.windows.hann(len(note))
|
||||
note = note * envelope
|
||||
samples.append(note)
|
||||
|
||||
return np.concatenate(samples)
|
||||
|
||||
def generate_power_up(self, start_freq: float, end_freq: float,
|
||||
duration: float) -> np.ndarray:
|
||||
"""Generate rising power-up tone."""
|
||||
t = np.linspace(0, duration, int(self.sample_rate * duration))
|
||||
freq = np.linspace(start_freq, end_freq, len(t))
|
||||
phase = 2 * np.pi * np.cumsum(freq) / self.sample_rate
|
||||
return (0.3 * np.sin(phase)).astype(np.float32)
|
||||
|
||||
def generate_beep(self, frequency: float, duration: float) -> np.ndarray:
|
||||
"""Generate simple beep with envelope."""
|
||||
t = np.linspace(0, duration, int(self.sample_rate * duration))
|
||||
sine = 0.3 * np.sin(2 * np.pi * frequency * t)
|
||||
envelope = signal.windows.hann(len(sine))
|
||||
return (sine * envelope).astype(np.float32)
|
||||
|
||||
def generate_proximity(self, frequency: float, duration: float,
|
||||
pulse_rate: float = 5) -> np.ndarray:
|
||||
"""Generate rapid proximity beeps."""
|
||||
pulse_duration = 1.0 / pulse_rate
|
||||
samples = []
|
||||
elapsed = 0
|
||||
|
||||
while elapsed < duration:
|
||||
beep = self.generate_beep(frequency, min(pulse_duration / 2, duration - elapsed))
|
||||
silence = np.zeros(int(self.sample_rate * pulse_duration / 2))
|
||||
samples.append(np.concatenate([beep, silence]))
|
||||
elapsed += pulse_duration
|
||||
|
||||
return np.concatenate(samples[:int(duration * pulse_rate)])
|
||||
|
||||
|
||||
class SoundEffectsNode(Node):
|
||||
"""ROS2 node for sound effects playback."""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__("sound_effects")
|
||||
|
||||
# Load config
|
||||
self.declare_parameter("config_file", "sound_config.yaml")
|
||||
config_path = self.get_parameter("config_file").value
|
||||
self.config = self._load_config(config_path)
|
||||
|
||||
# State
|
||||
self.current_sound: Optional[SoundEvent] = None
|
||||
self.sound_queue = PriorityQueue()
|
||||
self.synthesizer = SoundSynthesizer(
|
||||
self.config["synthesis"]["sample_rate"]
|
||||
)
|
||||
self.volume = self.config["volume"]["default_volume"]
|
||||
self.is_playing = False
|
||||
self.quiet_mode = False
|
||||
self.night_mode = False
|
||||
|
||||
# Load sounds
|
||||
self.sounds_cache: Dict[str, np.ndarray] = {}
|
||||
self._load_sounds()
|
||||
|
||||
# Subscribe to event topics
|
||||
self._subscribe_to_events()
|
||||
|
||||
# Publisher
|
||||
self.sound_state_pub = self.create_publisher(
|
||||
String, "/saltybot/sound_playing", 10
|
||||
)
|
||||
|
||||
# Playback thread
|
||||
self.playback_thread = threading.Thread(target=self._playback_loop, daemon=True)
|
||||
self.playback_thread.start()
|
||||
|
||||
# Timer for queue processing
|
||||
self.timer = self.create_timer(0.1, self._process_queue)
|
||||
|
||||
self.get_logger().info("Sound Effects node initialized")
|
||||
|
||||
def _load_config(self, config_path: str) -> Dict:
|
||||
"""Load YAML configuration."""
|
||||
try:
|
||||
with open(config_path) as f:
|
||||
return yaml.safe_load(f)
|
||||
except FileNotFoundError:
|
||||
self.get_logger().warn(f"Config not found: {config_path}")
|
||||
return {}
|
||||
|
||||
def _load_sounds(self):
|
||||
"""Load sound files and cache them."""
|
||||
sounds_dir = Path(self.config.get("audio", {}).get("sounds_directory", "/tmp"))
|
||||
|
||||
for event_name, event_config in self.config.get("events", {}).items():
|
||||
filename = event_config.get("filename")
|
||||
if not filename:
|
||||
continue
|
||||
|
||||
sound_path = sounds_dir / filename
|
||||
if sound_path.exists():
|
||||
try:
|
||||
rate, data = wavfile.read(str(sound_path))
|
||||
# Normalize to float
|
||||
if data.dtype != np.float32:
|
||||
data = data.astype(np.float32) / 32768.0
|
||||
self.sounds_cache[event_name] = data
|
||||
self.get_logger().info(f"Loaded sound: {event_name}")
|
||||
except Exception as e:
|
||||
self.get_logger().warn(f"Failed to load {filename}: {e}")
|
||||
else:
|
||||
self.get_logger().info(f"Sound file missing: {filename}, will use synthesis")
|
||||
|
||||
def _subscribe_to_events(self):
|
||||
"""Subscribe to all event topics from config."""
|
||||
for event_name, topic in self.config.get("subscriptions", {}).items():
|
||||
def callback(msg, event=event_name):
|
||||
if msg.data:
|
||||
self._queue_sound(event)
|
||||
|
||||
self.create_subscription(Bool, topic, callback, 10)
|
||||
|
||||
def _queue_sound(self, event_name: str):
|
||||
"""Queue a sound for playback."""
|
||||
event_config = self.config.get("events", {}).get(event_name)
|
||||
if not event_config:
|
||||
return
|
||||
|
||||
event = SoundEvent(
|
||||
name=event_name,
|
||||
filename=event_config.get("filename", ""),
|
||||
duration=event_config.get("duration", 1.0),
|
||||
priority=event_config.get("priority", 50),
|
||||
timestamp=time.time(),
|
||||
)
|
||||
|
||||
self.sound_queue.put(event)
|
||||
self.get_logger().info(f"Queued sound: {event_name} (priority: {event.priority})")
|
||||
|
||||
def _process_queue(self):
|
||||
"""Process sound queue."""
|
||||
if self.is_playing:
|
||||
return # Still playing current sound
|
||||
|
||||
if not self.sound_queue.empty():
|
||||
try:
|
||||
self.current_sound = self.sound_queue.get_nowait()
|
||||
self.is_playing = True
|
||||
except:
|
||||
pass
|
||||
|
||||
def _playback_loop(self):
|
||||
"""Background thread for audio playback."""
|
||||
while True:
|
||||
if self.current_sound and self.is_playing:
|
||||
audio_data = self._get_audio_data(self.current_sound.name)
|
||||
if audio_data is not None:
|
||||
self._play_audio(audio_data, self.current_sound.name)
|
||||
|
||||
self.is_playing = False
|
||||
self.current_sound = None
|
||||
|
||||
time.sleep(0.05)
|
||||
|
||||
def _get_audio_data(self, event_name: str) -> Optional[np.ndarray]:
|
||||
"""Get audio data for event (cached or synthesized)."""
|
||||
if event_name in self.sounds_cache:
|
||||
return self.sounds_cache[event_name]
|
||||
|
||||
if not self.config.get("synthesis", {}).get("enabled", True):
|
||||
return None
|
||||
|
||||
# Synthesize default sound
|
||||
return self._synthesize_sound(event_name)
|
||||
|
||||
def _synthesize_sound(self, event_name: str) -> Optional[np.ndarray]:
|
||||
"""Synthesize sound programmatically."""
|
||||
defaults = self.config.get("synthesis", {}).get("defaults", {})
|
||||
event_config = defaults.get(event_name)
|
||||
|
||||
if not event_config:
|
||||
return None
|
||||
|
||||
sound_type = event_config.get("type", "sine")
|
||||
duration = event_config.get("duration", 1.0)
|
||||
|
||||
try:
|
||||
if sound_type == "sine":
|
||||
freq = event_config.get("frequency", 440)
|
||||
return self.synthesizer.generate_sine(freq, duration)
|
||||
elif sound_type == "chime":
|
||||
freqs = event_config.get("frequencies", [523.25, 659.25])
|
||||
return self.synthesizer.generate_chime(freqs, duration)
|
||||
elif sound_type == "descending":
|
||||
freqs = event_config.get("frequencies", [800, 600, 400])
|
||||
return self.synthesizer.generate_descending(freqs, duration)
|
||||
elif sound_type == "power_up":
|
||||
start = event_config.get("start_freq", 400)
|
||||
end = event_config.get("end_freq", 1200)
|
||||
return self.synthesizer.generate_power_up(start, end, duration)
|
||||
elif sound_type in ["beep", "warning", "alert"]:
|
||||
freq = event_config.get("frequency", 1000)
|
||||
return self.synthesizer.generate_beep(freq, duration)
|
||||
elif sound_type == "proximity":
|
||||
freq = event_config.get("frequency", 1200)
|
||||
pulse = event_config.get("pulse_rate", 5)
|
||||
return self.synthesizer.generate_proximity(freq, duration, pulse)
|
||||
elif sound_type == "jingle":
|
||||
freqs = event_config.get("frequencies", [523.25, 659.25, 783.99])
|
||||
return self.synthesizer.generate_chime(freqs, duration)
|
||||
except Exception as e:
|
||||
self.get_logger().warn(f"Sound synthesis failed for {event_name}: {e}")
|
||||
|
||||
return None
|
||||
|
||||
def _play_audio(self, audio_data: np.ndarray, event_name: str):
|
||||
"""Play audio data (mock implementation)."""
|
||||
# In real implementation, would use pygame, pyaudio, or similar
|
||||
duration = len(audio_data) / self.config["synthesis"]["sample_rate"]
|
||||
|
||||
state = {
|
||||
"event": event_name,
|
||||
"playing": True,
|
||||
"duration": duration,
|
||||
"volume": self.volume,
|
||||
}
|
||||
self.sound_state_pub.publish(String(data=json.dumps(state)))
|
||||
|
||||
# Simulate playback
|
||||
time.sleep(duration)
|
||||
|
||||
state["playing"] = False
|
||||
self.sound_state_pub.publish(String(data=json.dumps(state)))
|
||||
|
||||
|
||||
def main(args=None):
|
||||
"""Main entry point."""
|
||||
rclpy.init(args=args)
|
||||
node = SoundEffectsNode()
|
||||
rclpy.spin(node)
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@ -1,4 +0,0 @@
|
||||
[develop]
|
||||
script_dir=$base/lib/saltybot_sound_effects
|
||||
[bdist_wheel]
|
||||
universal=0
|
||||
@ -1,30 +0,0 @@
|
||||
from setuptools import setup
|
||||
|
||||
package_name = "saltybot_sound_effects"
|
||||
|
||||
setup(
|
||||
name=package_name,
|
||||
version="0.1.0",
|
||||
packages=[package_name],
|
||||
data_files=[
|
||||
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
|
||||
(f"share/{package_name}", ["package.xml"]),
|
||||
(f"share/{package_name}/launch", ["launch/sound_effects.launch.py"]),
|
||||
(f"share/{package_name}/config", ["config/sound_config.yaml"]),
|
||||
],
|
||||
install_requires=["setuptools", "pyyaml", "numpy", "scipy"],
|
||||
zip_safe=True,
|
||||
maintainer="sl-mechanical",
|
||||
maintainer_email="sl-mechanical@saltylab.local",
|
||||
description=(
|
||||
"Sound effects library: event-driven audio feedback, priority queue, "
|
||||
"programmatic synthesis, volume control"
|
||||
),
|
||||
license="MIT",
|
||||
tests_require=["pytest"],
|
||||
entry_points={
|
||||
"console_scripts": [
|
||||
"sound_effects_node = saltybot_sound_effects.sound_effects_node:main",
|
||||
],
|
||||
},
|
||||
)
|
||||
Loading…
x
Reference in New Issue
Block a user