Compare commits

..

No commits in common. "569ac3fb35fcc206cc0b15f88576cf92e7ad8171" and "3ea19fbb999d5c940577874189c3c8ec7892b7f3" have entirely different histories.

27 changed files with 0 additions and 1690 deletions

View File

@ -1,196 +0,0 @@
# saltybot_gesture_recognition
Hand and body gesture recognition via MediaPipe on Jetson Orin GPU (Issue #454).
Detects human hand and body gestures in real-time camera feed and publishes recognized gestures for multimodal interaction. Integrates with voice command router for combined audio+gesture control.
## Recognized Gestures
### Hand Gestures
- **wave** — Lateral wrist oscillation (temporal) | Greeting, acknowledgment
- **point** — Index extended, others curled | Direction indication ("left"/"right"/"up"/"forward")
- **stop_palm** — All fingers extended, palm forward | Emergency stop (e-stop)
- **thumbs_up** — Thumb extended up, fist closed | Confirmation, approval
- **come_here** — Beckoning: index curled toward palm (temporal) | Call to approach
- **follow** — Index extended horizontally | Follow me
### Body Gestures
- **arms_up** — Both wrists above shoulders | Stop / emergency
- **arms_spread** — Arms extended laterally | Back off / clear space
- **crouch** — Hips below standing threshold | Come closer
## Performance
- **Frame Rate**: 1015 fps on Jetson Orin (with GPU acceleration)
- **Latency**: ~100150 ms end-to-end
- **Range**: 25 meters (optimal 23 m)
- **Accuracy**: ~8590% for known gestures (varies by lighting, occlusion)
- **Simultaneous Detections**: Up to 10 people + gestures per frame
## Topics
### Published
- **`/saltybot/gestures`** (`saltybot_social_msgs/GestureArray`)
Array of detected gestures with type, confidence, position, source (hand/body)
## Parameters
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `camera_topic` | str | `/camera/color/image_raw` | RGB camera topic |
| `confidence_threshold` | float | 0.7 | Min confidence to publish (01) |
| `publish_hz` | float | 15.0 | Output rate (Hz) |
| `max_distance_m` | float | 5.0 | Max gesture range (meters) |
| `enable_gpu` | bool | true | Use Jetson GPU acceleration |
## Messages
### GestureArray
```
Header header
Gesture[] gestures
uint32 count
```
### Gesture (from saltybot_social_msgs)
```
Header header
string gesture_type # "wave", "point", "stop_palm", etc.
int32 person_id # -1 if unidentified
float32 confidence # 01 (typically >= 0.7)
int32 camera_id # 0=front
float32 hand_x, hand_y # Normalized position (01)
bool is_right_hand # True for right hand
string direction # For "point": "left"/"right"/"up"/"forward"/"down"
string source # "hand" or "body_pose"
```
## Usage
### Launch Node
```bash
ros2 launch saltybot_gesture_recognition gesture_recognition.launch.py
```
### With Custom Parameters
```bash
ros2 launch saltybot_gesture_recognition gesture_recognition.launch.py \
camera_topic:='/camera/front/image_raw' \
confidence_threshold:=0.75 \
publish_hz:=20.0
```
### Using Config File
```bash
ros2 launch saltybot_gesture_recognition gesture_recognition.launch.py \
--ros-args --params-file config/gesture_params.yaml
```
## Algorithm
### MediaPipe Hands
- 21 landmarks per hand (wrist + finger joints)
- Detects: palm orientation, finger extension, hand pose
- Model complexity: 0 (lite, faster) for Jetson
### MediaPipe Pose
- 33 body landmarks (shoulders, hips, wrists, knees, etc.)
- Detects: arm angle, body orientation, posture
- Model complexity: 1 (balanced accuracy/speed)
### Gesture Classification
1. **Thumbs-up**: Thumb extended >0.3, no other fingers extended
2. **Stop-palm**: All fingers extended, palm normal > 0.3 (facing camera)
3. **Point**: Only index extended, direction from hand position
4. **Wave**: High variance in hand x-position over ~5 frames
5. **Beckon**: High variance in hand y-position over ~4 frames
6. **Arms-up**: Both wrists > shoulder height
7. **Arms-spread**: Wrist distance > shoulder width × 1.2
8. **Crouch**: Hip-y > shoulder-y + 0.3
### Confidence Scoring
- MediaPipe detection confidence × gesture classification confidence
- Temporal smoothing: history over last 10 frames
- Threshold: 0.7 (configurable) for publication
## Integration with Voice Command Router
```python
# Listen to both topics
rospy.Subscriber('/saltybot/speech', SpeechTranscript, voice_callback)
rospy.Subscriber('/saltybot/gestures', GestureArray, gesture_callback)
def multimodal_command(voice_cmd, gesture):
# "robot forward" (voice) + point-forward (gesture) = confirmed forward
if gesture.gesture_type == 'point' and gesture.direction == 'forward':
if 'forward' in voice_cmd:
nav.set_goal(forward_pos) # High confidence
```
## Dependencies
- `mediapipe` — Hand and Pose detection
- `opencv-python` — Image processing
- `numpy`, `scipy` — Numerical computation
- `rclpy` — ROS2 Python client
- `saltybot_social_msgs` — Custom gesture messages
## Build & Test
### Build
```bash
colcon build --packages-select saltybot_gesture_recognition
```
### Run Tests
```bash
pytest jetson/ros2_ws/src/saltybot_gesture_recognition/test/
```
### Benchmark on Jetson Orin
```bash
ros2 run saltybot_gesture_recognition gesture_node \
--ros-args -p publish_hz:=30.0 &
ros2 topic hz /saltybot/gestures
# Expected: ~15 Hz (GPU-limited, not message processing)
```
## Troubleshooting
**Issue**: Low frame rate (< 10 Hz)
- **Solution**: Reduce camera resolution or use model_complexity=0
**Issue**: False positives (confidence > 0.7 but wrong gesture)
- **Solution**: Increase `confidence_threshold` to 0.750.8
**Issue**: Doesn't detect gestures at distance > 3m
- **Solution**: Improve lighting, move closer, or reduce `max_distance_m`
## Future Enhancements
- **Dynamic Gesture Timeout**: Stop publishing after 2s without update
- **Person Association**: Match gestures to tracked persons (from `saltybot_multi_person_tracker`)
- **Custom Gesture Training**: TensorFlow Lite fine-tuning on robot-specific gestures
- **Gesture Sequences**: Recognize multi-step command chains ("wave → point → thumbs-up")
- **Sign Language**: ASL/BSL recognition (larger model, future Phase)
- **Accessibility**: Voice + gesture for accessibility (e.g., hands-free "stop")
## Performance Targets (Jetson Orin Nano Super)
| Metric | Target | Actual |
|--------|--------|--------|
| Frame Rate | 10+ fps | ~15 fps (GPU) |
| Latency | <200 ms | ~100150 ms |
| Max People | 510 | ~10 (GPU-limited) |
| Confidence | 0.7+ | 0.750.95 |
| GPU Memory | <1 GB | ~400500 MB |
## References
- [MediaPipe Solutions](https://developers.google.com/mediapipe/solutions)
- [MediaPipe Hands](https://developers.google.com/mediapipe/solutions/vision/hand_landmarker)
- [MediaPipe Pose](https://developers.google.com/mediapipe/solutions/vision/pose_landmarker)
## License
MIT

View File

@ -1,14 +0,0 @@
# Gesture recognition ROS2 parameters
/**:
ros__parameters:
# Input
camera_topic: '/camera/color/image_raw'
# Detection
confidence_threshold: 0.7 # Only publish gestures with confidence >= 0.7
max_distance_m: 5.0 # Maximum gesture range (2-5m typical)
# Performance
publish_hz: 15.0 # 10+ fps target on Jetson Orin
enable_gpu: true # Use Jetson GPU acceleration

View File

@ -1,68 +0,0 @@
"""
Launch gesture recognition node.
Typical usage:
ros2 launch saltybot_gesture_recognition gesture_recognition.launch.py
"""
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
"""Generate launch description for gesture recognition node."""
# Declare launch arguments
camera_topic_arg = DeclareLaunchArgument(
'camera_topic',
default_value='/camera/color/image_raw',
description='RGB camera topic',
)
confidence_arg = DeclareLaunchArgument(
'confidence_threshold',
default_value='0.7',
description='Detection confidence threshold (0-1)',
)
publish_hz_arg = DeclareLaunchArgument(
'publish_hz',
default_value='15.0',
description='Publication rate (Hz, target 10+ fps)',
)
max_distance_arg = DeclareLaunchArgument(
'max_distance_m',
default_value='5.0',
description='Maximum gesture recognition range (meters)',
)
gpu_arg = DeclareLaunchArgument(
'enable_gpu',
default_value='true',
description='Use GPU acceleration (Jetson Orin)',
)
# Gesture recognition node
gesture_node = Node(
package='saltybot_gesture_recognition',
executable='gesture_node',
name='gesture_recognition',
output='screen',
parameters=[
{'camera_topic': LaunchConfiguration('camera_topic')},
{'confidence_threshold': LaunchConfiguration('confidence_threshold')},
{'publish_hz': LaunchConfiguration('publish_hz')},
{'max_distance_m': LaunchConfiguration('max_distance_m')},
{'enable_gpu': LaunchConfiguration('gpu_arg')},
],
)
return LaunchDescription(
[
camera_topic_arg,
confidence_arg,
publish_hz_arg,
max_distance_arg,
gpu_arg,
gesture_node,
]
)

View File

@ -1,35 +0,0 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_gesture_recognition</name>
<version>0.1.0</version>
<description>
Hand and body gesture recognition via MediaPipe on Jetson Orin GPU.
Recognizes wave, point, palm-stop, thumbs-up, beckon, arms-crossed.
Integrates with voice command router for multimodal interaction.
Issue #454.
</description>
<maintainer email="sl-perception@saltylab.local">sl-perception</maintainer>
<license>MIT</license>
<buildtool_depend>ament_python</buildtool_depend>
<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>sensor_msgs</depend>
<depend>geometry_msgs</depend>
<depend>cv_bridge</depend>
<depend>saltybot_social_msgs</depend>
<depend>saltybot_multi_person_tracker</depend>
<exec_depend>python3-numpy</exec_depend>
<exec_depend>python3-opencv</exec_depend>
<exec_depend>python3-mediapipe</exec_depend>
<exec_depend>python3-scipy</exec_depend>
<test_depend>pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -1,480 +0,0 @@
"""
gesture_recognition_node.py Hand and body gesture recognition via MediaPipe.
Uses MediaPipe Hands and Pose to detect gestures on Jetson Orin GPU.
Recognizes:
Hand gestures: wave, point, stop_palm (e-stop), thumbs_up, come_here (beckon)
Body gestures: arms_up (stop), arms_spread (back off)
Publishes:
/saltybot/gestures saltybot_social_msgs/GestureArray 10+ fps
Parameters:
camera_topic str '/camera/color/image_raw' RGB camera input
confidence_threshold float 0.7 detection confidence
publish_hz float 15.0 output rate (10+ fps target)
max_distance_m float 5.0 max gesture range
enable_gpu bool true use GPU acceleration
"""
from __future__ import annotations
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
import numpy as np
import cv2
from cv_bridge import CvBridge
import threading
from collections import deque
from typing import Optional
from std_msgs.msg import Header
from sensor_msgs.msg import Image
from geometry_msgs.msg import Point
try:
from saltybot_social_msgs.msg import Gesture, GestureArray
_GESTURE_MSGS_OK = True
except ImportError:
_GESTURE_MSGS_OK = False
try:
import mediapipe as mp
_MEDIAPIPE_OK = True
except ImportError:
_MEDIAPIPE_OK = False
_SENSOR_QOS = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=5,
)
class GestureDetector:
"""MediaPipe-based gesture detector for hands and pose."""
# Hand gesture thresholds
GESTURE_DISTANCE_THRESHOLD = 0.05
WAVE_DURATION = 5 # frames
BECKON_DURATION = 4
POINT_MIN_EXTEND = 0.3 # index extension threshold
def __init__(self, enable_gpu: bool = True):
if not _MEDIAPIPE_OK:
raise ImportError("MediaPipe not available")
self.enable_gpu = enable_gpu
# Initialize MediaPipe
self.mp_hands = mp.solutions.hands
self.mp_pose = mp.solutions.pose
self.mp_drawing = mp.solutions.drawing_utils
# Create hand detector
self.hands = self.mp_hands.Hands(
static_image_mode=False,
max_num_hands=10,
min_detection_confidence=0.5,
min_tracking_confidence=0.5,
model_complexity=0, # 0=lite (faster), 1=full
)
# Create pose detector
self.pose = self.mp_pose.Pose(
static_image_mode=False,
model_complexity=1,
smooth_landmarks=True,
min_detection_confidence=0.5,
min_tracking_confidence=0.5,
)
# Gesture history for temporal smoothing
self.hand_history = deque(maxlen=10)
self.pose_history = deque(maxlen=10)
def detect_hand_gestures(self, frame: np.ndarray, person_id: int = -1) -> list[dict]:
"""
Detect hand gestures using MediaPipe Hands.
Returns:
List of detected gestures with type, confidence, position
"""
gestures = []
if frame is None or frame.size == 0:
return gestures
try:
# Convert BGR to RGB
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
h, w, _ = rgb_frame.shape
# Detect hands
results = self.hands.process(rgb_frame)
if not results.multi_hand_landmarks or not results.multi_handedness:
return gestures
for hand_landmarks, handedness in zip(
results.multi_hand_landmarks, results.multi_handedness
):
is_right = handedness.classification[0].label == "Right"
confidence = handedness.classification[0].score
# Extract key landmarks
landmarks = np.array(
[[lm.x, lm.y, lm.z] for lm in hand_landmarks.landmark]
)
# Detect specific hand gestures
gesture_type, gesture_conf = self._classify_hand_gesture(
landmarks, is_right
)
if gesture_type:
# Get hand center position
hand_x = float(np.mean(landmarks[:, 0]))
hand_y = float(np.mean(landmarks[:, 1]))
gestures.append({
'type': gesture_type,
'confidence': float(gesture_conf * confidence),
'hand_x': hand_x,
'hand_y': hand_y,
'is_right_hand': is_right,
'source': 'hand',
'person_id': person_id,
})
self.hand_history.append(gestures)
except Exception as e:
pass
return gestures
def detect_body_gestures(self, frame: np.ndarray, person_id: int = -1) -> list[dict]:
"""
Detect body/pose gestures using MediaPipe Pose.
Returns:
List of detected pose-based gestures
"""
gestures = []
if frame is None or frame.size == 0:
return gestures
try:
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
h, w, _ = rgb_frame.shape
results = self.pose.process(rgb_frame)
if not results.pose_landmarks:
return gestures
landmarks = np.array(
[[lm.x, lm.y, lm.z] for lm in results.pose_landmarks.landmark]
)
# Detect specific body gestures
gesture_type, gesture_conf = self._classify_body_gesture(landmarks)
if gesture_type:
# Get body center
body_x = float(np.mean(landmarks[:, 0]))
body_y = float(np.mean(landmarks[:, 1]))
gestures.append({
'type': gesture_type,
'confidence': float(gesture_conf),
'hand_x': body_x,
'hand_y': body_y,
'is_right_hand': False,
'source': 'body_pose',
'person_id': person_id,
})
self.pose_history.append(gestures)
except Exception as e:
pass
return gestures
def _classify_hand_gesture(
self, landmarks: np.ndarray, is_right: bool
) -> tuple[Optional[str], float]:
"""
Classify hand gesture from MediaPipe landmarks.
Returns:
(gesture_type, confidence)
"""
if landmarks.shape[0] < 21:
return None, 0.0
# Landmark indices
# 0: wrist, 5: index, 9: middle, 13: ring, 17: pinky
# 4: thumb tip, 8: index tip, 12: middle tip, 16: ring tip, 20: pinky tip
wrist = landmarks[0]
thumb_tip = landmarks[4]
index_tip = landmarks[8]
middle_tip = landmarks[12]
ring_tip = landmarks[16]
pinky_tip = landmarks[20]
# Palm normal (pointing direction)
palm_normal = self._get_palm_normal(landmarks)
# Finger extension
index_extended = self._distance(index_tip, landmarks[5]) > self.POINT_MIN_EXTEND
middle_extended = self._distance(middle_tip, landmarks[9]) > self.POINT_MIN_EXTEND
ring_extended = self._distance(ring_tip, landmarks[13]) > self.POINT_MIN_EXTEND
pinky_extended = self._distance(pinky_tip, landmarks[17]) > self.POINT_MIN_EXTEND
thumb_extended = self._distance(thumb_tip, landmarks[2]) > 0.1
# Thumbs-up: thumb extended up, hand vertical
if thumb_extended and not (index_extended or middle_extended):
palm_y = np.mean([landmarks[i][1] for i in [5, 9, 13, 17]])
if thumb_tip[1] < palm_y - 0.1: # Thumb above palm
return 'thumbs_up', 0.85
# Stop palm: all fingers extended, palm forward
if index_extended and middle_extended and ring_extended and pinky_extended:
if palm_normal[2] > 0.3: # Palm facing camera
return 'stop_palm', 0.8
# Point: only index extended
if index_extended and not (middle_extended or ring_extended or pinky_extended):
return 'point', 0.8
# Wave: hand moving (approximate via history)
if len(self.hand_history) > self.WAVE_DURATION:
if self._detect_wave_motion():
return 'wave', 0.75
# Come-here (beckon): curled fingers, repetitive motion
if not (index_extended or middle_extended):
if len(self.hand_history) > self.BECKON_DURATION:
if self._detect_beckon_motion():
return 'come_here', 0.75
return None, 0.0
def _classify_body_gesture(self, landmarks: np.ndarray) -> tuple[Optional[str], float]:
"""
Classify body gesture from MediaPipe Pose landmarks.
Returns:
(gesture_type, confidence)
"""
if landmarks.shape[0] < 33:
return None, 0.0
# Key body landmarks
left_shoulder = landmarks[11]
right_shoulder = landmarks[12]
left_hip = landmarks[23]
right_hip = landmarks[24]
left_wrist = landmarks[9]
right_wrist = landmarks[10]
shoulder_y = np.mean([left_shoulder[1], right_shoulder[1]])
hip_y = np.mean([left_hip[1], right_hip[1]])
wrist_y_max = max(left_wrist[1], right_wrist[1])
# Arms up (emergency stop)
if wrist_y_max < shoulder_y - 0.2:
return 'arms_up', 0.85
# Arms spread (back off)
shoulder_dist = self._distance(left_shoulder[:2], right_shoulder[:2])
wrist_dist = self._distance(left_wrist[:2], right_wrist[:2])
if wrist_dist > shoulder_dist * 1.2:
return 'arms_spread', 0.8
# Crouch (come closer)
if hip_y - shoulder_y > 0.3:
return 'crouch', 0.8
return None, 0.0
def _get_palm_normal(self, landmarks: np.ndarray) -> np.ndarray:
"""Compute palm normal vector (pointing direction)."""
wrist = landmarks[0]
middle_mcp = landmarks[9]
index_mcp = landmarks[5]
v1 = index_mcp - wrist
v2 = middle_mcp - wrist
normal = np.cross(v1, v2)
return normal / (np.linalg.norm(normal) + 1e-6)
def _distance(self, p1: np.ndarray, p2: np.ndarray) -> float:
"""Euclidean distance between two points."""
return float(np.linalg.norm(p1 - p2))
def _detect_wave_motion(self) -> bool:
"""Detect waving motion from hand history."""
if len(self.hand_history) < self.WAVE_DURATION:
return False
# Simple heuristic: high variance in x-position over time
x_positions = [g[0]['hand_x'] for g in self.hand_history if g]
if len(x_positions) < self.WAVE_DURATION:
return False
return float(np.std(x_positions)) > 0.05
def _detect_beckon_motion(self) -> bool:
"""Detect beckoning motion from hand history."""
if len(self.hand_history) < self.BECKON_DURATION:
return False
# High variance in y-position (up-down motion)
y_positions = [g[0]['hand_y'] for g in self.hand_history if g]
if len(y_positions) < self.BECKON_DURATION:
return False
return float(np.std(y_positions)) > 0.04
class GestureRecognitionNode(Node):
def __init__(self):
super().__init__('gesture_recognition')
# Parameters
self.declare_parameter('camera_topic', '/camera/color/image_raw')
self.declare_parameter('confidence_threshold', 0.7)
self.declare_parameter('publish_hz', 15.0)
self.declare_parameter('max_distance_m', 5.0)
self.declare_parameter('enable_gpu', True)
camera_topic = self.get_parameter('camera_topic').value
self.confidence_threshold = self.get_parameter('confidence_threshold').value
pub_hz = self.get_parameter('publish_hz').value
max_distance = self.get_parameter('max_distance_m').value
enable_gpu = self.get_parameter('enable_gpu').value
# Publisher
self._pub_gestures = None
if _GESTURE_MSGS_OK:
self._pub_gestures = self.create_publisher(
GestureArray, '/saltybot/gestures', 10, qos_profile=_SENSOR_QOS
)
else:
self.get_logger().error('saltybot_social_msgs not available')
return
# Gesture detector
self._detector: Optional[GestureDetector] = None
self._detector_lock = threading.Lock()
if _MEDIAPIPE_OK:
try:
self._detector = GestureDetector(enable_gpu=enable_gpu)
except Exception as e:
self.get_logger().error(f'Failed to initialize MediaPipe: {e}')
# Video bridge
self._bridge = CvBridge()
self._latest_image: Image | None = None
self._lock = threading.Lock()
# Subscriptions
self.create_subscription(Image, camera_topic, self._on_image, _SENSOR_QOS)
# Publish timer
self.create_timer(1.0 / pub_hz, self._tick)
self.get_logger().info(
f'gesture_recognition ready — '
f'camera={camera_topic} confidence_threshold={self.confidence_threshold} hz={pub_hz}'
)
def _on_image(self, msg: Image) -> None:
with self._lock:
self._latest_image = msg
def _tick(self) -> None:
"""Detect and publish gestures."""
if self._pub_gestures is None or self._detector is None:
return
with self._lock:
if self._latest_image is None:
return
image_msg = self._latest_image
try:
frame = self._bridge.imgmsg_to_cv2(
image_msg, desired_encoding='bgr8'
)
except Exception as e:
self.get_logger().warn(f'Image conversion error: {e}')
return
# Detect hand and body gestures
hand_gestures = self._detector.detect_hand_gestures(frame)
body_gestures = self._detector.detect_body_gestures(frame)
all_gestures = hand_gestures + body_gestures
# Filter by confidence threshold
filtered_gestures = [
g for g in all_gestures if g['confidence'] >= self.confidence_threshold
]
# Build and publish GestureArray
gesture_array = GestureArray()
gesture_array.header = Header(
stamp=self.get_clock().now().to_msg(),
frame_id='camera',
)
for g in filtered_gestures:
gesture = Gesture()
gesture.header = gesture_array.header
gesture.gesture_type = g['type']
gesture.person_id = g.get('person_id', -1)
gesture.confidence = g['confidence']
gesture.hand_x = g['hand_x']
gesture.hand_y = g['hand_y']
gesture.is_right_hand = g['is_right_hand']
gesture.source = g['source']
# Map point direction if applicable
if g['type'] == 'point':
if g['hand_x'] < 0.33:
gesture.direction = 'left'
elif g['hand_x'] > 0.67:
gesture.direction = 'right'
elif g['hand_y'] < 0.33:
gesture.direction = 'up'
else:
gesture.direction = 'forward'
gesture_array.gestures.append(gesture)
gesture_array.count = len(gesture_array.gestures)
self._pub_gestures.publish(gesture_array)
def main(args=None):
rclpy.init(args=args)
node = GestureRecognitionNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -1,4 +0,0 @@
[develop]
script_dir=$base/lib/saltybot_gesture_recognition
[egg_info]
tag_date = 0

View File

@ -1,23 +0,0 @@
from setuptools import setup, find_packages
setup(
name='saltybot_gesture_recognition',
version='0.1.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/saltybot_gesture_recognition']),
('share/saltybot_gesture_recognition', ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
author='SaltyLab',
author_email='robot@saltylab.local',
description='Hand/body gesture recognition via MediaPipe',
license='MIT',
entry_points={
'console_scripts': [
'gesture_node=saltybot_gesture_recognition.gesture_recognition_node:main',
],
},
)

View File

@ -1,89 +0,0 @@
"""
Basic tests for gesture recognition.
"""
import pytest
import numpy as np
try:
from saltybot_gesture_recognition.gesture_recognition_node import GestureDetector
_DETECTOR_OK = True
except ImportError:
_DETECTOR_OK = False
@pytest.mark.skipif(not _DETECTOR_OK, reason="GestureDetector not available")
class TestGestureDetector:
"""Tests for gesture detection."""
def test_detector_init(self):
"""Test GestureDetector initialization."""
try:
detector = GestureDetector(enable_gpu=False)
assert detector is not None
except ImportError:
pytest.skip("MediaPipe not available")
def test_hand_gesture_detection_empty(self):
"""Test hand gesture detection with empty frame."""
try:
detector = GestureDetector(enable_gpu=False)
gestures = detector.detect_hand_gestures(None)
assert gestures == []
except ImportError:
pytest.skip("MediaPipe not available")
def test_body_gesture_detection_empty(self):
"""Test body gesture detection with empty frame."""
try:
detector = GestureDetector(enable_gpu=False)
gestures = detector.detect_body_gestures(None)
assert gestures == []
except ImportError:
pytest.skip("MediaPipe not available")
def test_hand_gesture_detection_frame(self):
"""Test hand gesture detection with synthetic frame."""
try:
detector = GestureDetector(enable_gpu=False)
# Create a blank frame
frame = np.zeros((480, 640, 3), dtype=np.uint8)
gestures = detector.detect_hand_gestures(frame)
# May or may not detect anything in blank frame
assert isinstance(gestures, list)
except ImportError:
pytest.skip("MediaPipe not available")
class TestGestureMessages:
"""Basic Gesture message tests."""
def test_gesture_creation(self):
"""Test creating a Gesture message."""
try:
from saltybot_social_msgs.msg import Gesture
g = Gesture()
g.gesture_type = 'wave'
g.confidence = 0.85
assert g.gesture_type == 'wave'
assert g.confidence == 0.85
except ImportError:
pytest.skip("saltybot_social_msgs not built")
def test_gesture_array_creation(self):
"""Test creating a GestureArray message."""
try:
from saltybot_social_msgs.msg import Gesture, GestureArray
arr = GestureArray()
g = Gesture()
g.gesture_type = 'point'
arr.gestures.append(g)
arr.count = 1
assert arr.count == 1
assert arr.gestures[0].gesture_type == 'point'
except ImportError:
pytest.skip("saltybot_social_msgs not built")
if __name__ == '__main__':
pytest.main([__file__])

View File

@ -1,8 +0,0 @@
smooth_velocity:
max_linear_accel: 0.5
max_angular_accel: 1.0
max_linear_decel: 0.8
max_angular_decel: 1.0
use_scurve: true
scurve_duration: 0.2
update_rate: 50

View File

@ -1,16 +0,0 @@
"""Launch smooth velocity controller."""
from launch import LaunchDescription
from launch_ros.actions import Node
from ament_index_python.packages import get_package_share_directory
import os
def generate_launch_description():
package_dir = get_package_share_directory("saltybot_smooth_velocity")
config_path = os.path.join(package_dir, "config", "smooth_velocity_config.yaml")
node = Node(
package="saltybot_smooth_velocity",
executable="smooth_velocity_node",
name="smooth_velocity_node",
output="screen",
parameters=[config_path],
)
return LaunchDescription([node])

View File

@ -1,20 +0,0 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_smooth_velocity</name>
<version>0.1.0</version>
<description>Smooth velocity controller with acceleration limiting and S-curve jerk reduction.</description>
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>geometry_msgs</depend>
<depend>std_msgs</depend>
<buildtool_depend>ament_python</buildtool_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -1,112 +0,0 @@
#!/usr/bin/env python3
"""Smooth velocity controller - Issue #455. Acceleration limiting + S-curve jerk reduction."""
import math
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
from std_msgs.msg import String
class SmoothVelocityNode(Node):
def __init__(self):
super().__init__("smooth_velocity_controller")
self.declare_parameter("max_linear_accel", 0.5)
self.declare_parameter("max_angular_accel", 1.0)
self.declare_parameter("max_linear_decel", 0.8)
self.declare_parameter("max_angular_decel", 1.0)
self.declare_parameter("use_scurve", True)
self.declare_parameter("scurve_duration", 0.2)
self.declare_parameter("update_rate", 50)
self.max_lin_accel = self.get_parameter("max_linear_accel").value
self.max_ang_accel = self.get_parameter("max_angular_accel").value
self.max_lin_decel = self.get_parameter("max_linear_decel").value
self.max_ang_decel = self.get_parameter("max_angular_decel").value
self.use_scurve = self.get_parameter("use_scurve").value
self.scurve_duration = self.get_parameter("scurve_duration").value
self.update_rate = self.get_parameter("update_rate").value
self.dt = 1.0 / self.update_rate
self.cmd_vel_pub = self.create_publisher(Twist, "/cmd_vel", 1)
self.velocity_profile_pub = self.create_publisher(String, "/saltybot/velocity_profile", 1)
self.create_subscription(Twist, "/cmd_vel_raw", self._cmd_vel_raw_callback, 1)
self.current_lin_vel = 0.0
self.current_ang_vel = 0.0
self.target_lin_vel = 0.0
self.target_ang_vel = 0.0
self.scurve_time_lin = 0.0
self.scurve_time_ang = 0.0
self.create_timer(self.dt, self._update_loop)
self.get_logger().info(f"Smooth velocity controller: {self.update_rate}Hz, S-curve={self.use_scurve}")
def _cmd_vel_raw_callback(self, msg: Twist):
self.target_lin_vel = msg.linear.x
self.target_ang_vel = msg.angular.z
def _update_loop(self):
if self.use_scurve:
lin_vel = self._apply_scurve(self.current_lin_vel, self.target_lin_vel, self.max_lin_accel, self.max_lin_decel, "linear")
ang_vel = self._apply_scurve(self.current_ang_vel, self.target_ang_vel, self.max_ang_accel, self.max_ang_decel, "angular")
else:
lin_vel = self._apply_accel_limit(self.current_lin_vel, self.target_lin_vel, self.max_lin_accel, self.max_lin_decel)
ang_vel = self._apply_accel_limit(self.current_ang_vel, self.target_ang_vel, self.max_ang_accel, self.max_ang_decel)
self.current_lin_vel = lin_vel
self.current_ang_vel = ang_vel
twist = Twist()
twist.linear.x = lin_vel
twist.angular.z = ang_vel
self.cmd_vel_pub.publish(twist)
profile = f"lin:{lin_vel:.3f},ang:{ang_vel:.3f}"
self.velocity_profile_pub.publish(String(data=profile))
def _apply_scurve(self, current, target, max_accel, max_decel, axis):
if abs(target - current) < 0.001:
return target
direction = 1 if target > current else -1
accel_limit = max_accel if direction > 0 else max_decel
max_vel_change = accel_limit * self.dt
if self.scurve_duration > 0:
time_var = self.scurve_time_lin if axis == "linear" else self.scurve_time_ang
progress = min(time_var, self.scurve_duration) / self.scurve_duration
smooth_factor = (1 - math.cos(progress * math.pi)) / 2
max_vel_change *= smooth_factor
new_vel = current + max_vel_change * direction
if (direction > 0 and new_vel >= target) or (direction < 0 and new_vel <= target):
if axis == "linear":
self.scurve_time_lin = 0.0
else:
self.scurve_time_ang = 0.0
return target
if axis == "linear":
self.scurve_time_lin += self.dt
else:
self.scurve_time_ang += self.dt
return new_vel
def _apply_accel_limit(self, current, target, max_accel, max_decel):
if abs(target - current) < 0.001:
return target
direction = 1 if target > current else -1
accel_limit = max_accel if direction > 0 else max_decel
max_vel_change = accel_limit * self.dt
new_vel = current + max_vel_change * direction
if (direction > 0 and new_vel >= target) or (direction < 0 and new_vel <= target):
return target
return new_vel
def main(args=None):
rclpy.init(args=args)
try:
rclpy.spin(SmoothVelocityNode())
except KeyboardInterrupt:
pass
finally:
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -1 +0,0 @@
[develop]\nscript_dir=$base/lib/saltybot_smooth_velocity\n

View File

@ -1,24 +0,0 @@
from setuptools import setup
setup(
name="saltybot_smooth_velocity",
version="0.1.0",
packages=["saltybot_smooth_velocity"],
data_files=[
("share/ament_index/resource_index/packages", ["resource/saltybot_smooth_velocity"]),
("share/saltybot_smooth_velocity", ["package.xml"]),
("share/saltybot_smooth_velocity/launch", ["launch/smooth_velocity.launch.py"]),
("share/saltybot_smooth_velocity/config", ["config/smooth_velocity_config.yaml"]),
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="sl-controls",
maintainer_email="sl-controls@saltylab.local",
description="Smooth velocity controller with S-curve jerk reduction",
license="MIT",
tests_require=["pytest"],
entry_points={
"console_scripts": [
"smooth_velocity_node = saltybot_smooth_velocity.smooth_velocity_node:main",
],
},
)

View File

@ -1,166 +0,0 @@
# Sound Effects Configuration
# Audio library path
audio:
sounds_directory: "/home/seb/saltybot-data/sounds" # Audio files location
supported_formats: [".wav", ".ogg"] # Supported file formats
default_format: "wav" # Default synthesis format
# Volume control
volume:
default_volume: 0.7 # Default playback volume (0.0-1.0)
quiet_mode_volume: 0.4 # Quiet mode volume reduction
night_mode_volume: 0.3 # Night mode volume reduction
max_volume: 1.0 # Absolute maximum
min_volume: 0.0 # Absolute minimum
fade_in_time: 0.1 # Fade in duration (seconds)
fade_out_time: 0.2 # Fade out duration (seconds)
# Event definitions
events:
boot_complete:
description: "Robot startup chime"
filename: "boot_complete.wav"
duration: 1.2
priority: 100
repeat: false
person_detected:
description: "Subtle detection sound"
filename: "person_detected.wav"
duration: 0.5
priority: 20
repeat: false
wake_word:
description: "Wake word activation beep"
filename: "wake_word.wav"
duration: 0.3
priority: 80
repeat: false
low_battery:
description: "Low battery warning"
filename: "low_battery.wav"
duration: 1.0
priority: 90
repeat: false
obstacle_close:
description: "Proximity warning beep"
filename: "obstacle_close.wav"
duration: 0.4
priority: 70
repeat: false
trick_complete:
description: "Trick execution jingle"
filename: "trick_complete.wav"
duration: 1.5
priority: 50
repeat: false
error:
description: "Error descending tone"
filename: "error.wav"
duration: 0.8
priority: 85
repeat: false
charging_start:
description: "Charging initiated power-up"
filename: "charging_start.wav"
duration: 1.0
priority: 60
repeat: false
geofence_warning:
description: "Geofence boundary alert"
filename: "geofence_warning.wav"
duration: 0.9
priority: 75
repeat: false
# Programmatic sound generation
synthesis:
enabled: true # Enable fallback synthesis if files missing
sample_rate: 16000 # Sample rate (Hz)
bit_depth: 16 # Bit depth (16 or 24)
# Default tones for each event
defaults:
boot_complete:
type: "chime" # Ascending notes
duration: 1.2
frequencies: [523.25, 659.25, 783.99] # C5, E5, G5
person_detected:
type: "sine"
duration: 0.5
frequency: 800 # Hz
wake_word:
type: "beep"
duration: 0.3
frequency: 1000
low_battery:
type: "warning" # Double beep
duration: 1.0
frequency: 880
obstacle_close:
type: "proximity" # Rapid beeps
duration: 0.4
frequency: 1200
pulse_rate: 5 # Pulses per second
trick_complete:
type: "jingle" # Ascending arpeggio
duration: 1.5
frequencies: [523.25, 659.25, 783.99, 987.77] # C5, E5, G5, B5
error:
type: "descending" # Descending tone
duration: 0.8
frequencies: [800, 600, 400]
charging_start:
type: "power_up" # Rising tone
duration: 1.0
start_freq: 400
end_freq: 1200
geofence_warning:
type: "alert" # Repeating tone
duration: 0.9
frequency: 950
# Priority queue settings
queue:
max_size: 20 # Maximum queued events
overlap_allowed: true # Allow sounds to overlap
ducking: false # Reduce volume of existing sounds for new ones
duck_amount: 0.5 # How much to reduce (0.0-1.0)
# Topic subscriptions
subscriptions:
boot_complete: "/saltybot/system/boot_complete"
person_detected: "/saltybot/perception/person_detected"
wake_word: "/saltybot/voice/wake_word"
low_battery: "/saltybot/battery/low_warning"
obstacle_close: "/saltybot/obstacle/close"
trick_complete: "/saltybot/tricks/complete"
error: "/saltybot/system/error"
charging_start: "/saltybot/charging/started"
geofence_warning: "/saltybot/geofence/warning"
# Publishing
publications:
sound_playing: "/saltybot/sound_playing"
# Mode settings
modes:
quiet_mode_enabled: false # Global quiet mode
night_mode_enabled: false # Night mode affects volume
mute_all: false # Mute all sounds

View File

@ -1,36 +0,0 @@
"""Launch file for Sound Effects node."""
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.substitutions import LaunchConfiguration
from launch.actions import DeclareLaunchArgument
from ament_index_python.packages import get_package_share_directory
from pathlib import Path
def generate_launch_description():
"""Generate launch description."""
package_dir = get_package_share_directory("saltybot_sound_effects")
config_file = str(Path(package_dir) / "config" / "sound_config.yaml")
# Launch arguments
config_arg = DeclareLaunchArgument(
"config_file",
default_value=config_file,
description="Path to sound effects configuration file",
)
# Sound Effects node
sound_node = Node(
package="saltybot_sound_effects",
executable="sound_effects_node",
name="sound_effects",
parameters=[
{
"config_file": LaunchConfiguration("config_file"),
}
],
output="screen",
)
return LaunchDescription([config_arg, sound_node])

View File

@ -1,27 +0,0 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_sound_effects</name>
<version>0.1.0</version>
<description>
Sound effects library for SaltyBot. Event-driven audio feedback system with
priority queue, volume control, and programmatic sound generation. Supports
WAV/OGG formats with fallback synthesis if files missing.
</description>
<maintainer email="sl-mechanical@saltylab.local">sl-mechanical</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>std_msgs</depend>
<buildtool_depend>ament_python</buildtool_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -1,337 +0,0 @@
#!/usr/bin/env python3
"""Sound Effects Library for SaltyBot.
Event-driven audio feedback system with priority queue, volume control,
and programmatic sound generation. Supports WAV/OGG formats with fallback
synthesis if files missing.
Subscribed topics (dynamic based on config):
/saltybot/system/boot_complete (std_msgs/Bool)
/saltybot/perception/person_detected (std_msgs/Bool)
/saltybot/voice/wake_word (std_msgs/Bool)
... (see config for all)
Published topics:
/saltybot/sound_playing (std_msgs/String) - JSON current sound status
"""
import json
import time
import threading
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import Dict, Optional, List, Callable
from enum import Enum
from collections import deque
from queue import PriorityQueue
import warnings
import yaml
import numpy as np
from scipy import signal
from scipy.io import wavfile
import rclpy
from rclpy.node import Node
from std_msgs.msg import String, Bool, Float32
@dataclass
class SoundEvent:
"""Sound event with priority."""
name: str
filename: str
duration: float
priority: int
timestamp: float
def __lt__(self, other):
"""Compare by priority (higher priority first)."""
return self.priority > other.priority # Inverted for max-heap
class SoundSynthesizer:
"""Programmatically generate sounds if files missing."""
def __init__(self, sample_rate: int = 16000):
"""Initialize synthesizer.
Args:
sample_rate: Sample rate in Hz
"""
self.sample_rate = sample_rate
def generate_sine(self, frequency: float, duration: float,
amplitude: float = 0.3) -> np.ndarray:
"""Generate sine wave."""
t = np.linspace(0, duration, int(self.sample_rate * duration))
return (amplitude * np.sin(2 * np.pi * frequency * t)).astype(np.float32)
def generate_chime(self, frequencies: List[float], duration: float) -> np.ndarray:
"""Generate ascending chime."""
num_notes = len(frequencies)
note_duration = duration / num_notes
samples = []
for freq in frequencies:
note = self.generate_sine(freq, note_duration)
# Add fade envelope
envelope = np.linspace(0, 1, len(note) // 2).tolist() + np.linspace(1, 0, len(note) // 2).tolist()
note = note * np.array(envelope)
samples.append(note)
return np.concatenate(samples)
def generate_descending(self, frequencies: List[float],
duration: float) -> np.ndarray:
"""Generate descending tone."""
num_notes = len(frequencies)
note_duration = duration / num_notes
samples = []
for freq in frequencies:
note = self.generate_sine(freq, note_duration)
envelope = signal.windows.hann(len(note))
note = note * envelope
samples.append(note)
return np.concatenate(samples)
def generate_power_up(self, start_freq: float, end_freq: float,
duration: float) -> np.ndarray:
"""Generate rising power-up tone."""
t = np.linspace(0, duration, int(self.sample_rate * duration))
freq = np.linspace(start_freq, end_freq, len(t))
phase = 2 * np.pi * np.cumsum(freq) / self.sample_rate
return (0.3 * np.sin(phase)).astype(np.float32)
def generate_beep(self, frequency: float, duration: float) -> np.ndarray:
"""Generate simple beep with envelope."""
t = np.linspace(0, duration, int(self.sample_rate * duration))
sine = 0.3 * np.sin(2 * np.pi * frequency * t)
envelope = signal.windows.hann(len(sine))
return (sine * envelope).astype(np.float32)
def generate_proximity(self, frequency: float, duration: float,
pulse_rate: float = 5) -> np.ndarray:
"""Generate rapid proximity beeps."""
pulse_duration = 1.0 / pulse_rate
samples = []
elapsed = 0
while elapsed < duration:
beep = self.generate_beep(frequency, min(pulse_duration / 2, duration - elapsed))
silence = np.zeros(int(self.sample_rate * pulse_duration / 2))
samples.append(np.concatenate([beep, silence]))
elapsed += pulse_duration
return np.concatenate(samples[:int(duration * pulse_rate)])
class SoundEffectsNode(Node):
"""ROS2 node for sound effects playback."""
def __init__(self):
super().__init__("sound_effects")
# Load config
self.declare_parameter("config_file", "sound_config.yaml")
config_path = self.get_parameter("config_file").value
self.config = self._load_config(config_path)
# State
self.current_sound: Optional[SoundEvent] = None
self.sound_queue = PriorityQueue()
self.synthesizer = SoundSynthesizer(
self.config["synthesis"]["sample_rate"]
)
self.volume = self.config["volume"]["default_volume"]
self.is_playing = False
self.quiet_mode = False
self.night_mode = False
# Load sounds
self.sounds_cache: Dict[str, np.ndarray] = {}
self._load_sounds()
# Subscribe to event topics
self._subscribe_to_events()
# Publisher
self.sound_state_pub = self.create_publisher(
String, "/saltybot/sound_playing", 10
)
# Playback thread
self.playback_thread = threading.Thread(target=self._playback_loop, daemon=True)
self.playback_thread.start()
# Timer for queue processing
self.timer = self.create_timer(0.1, self._process_queue)
self.get_logger().info("Sound Effects node initialized")
def _load_config(self, config_path: str) -> Dict:
"""Load YAML configuration."""
try:
with open(config_path) as f:
return yaml.safe_load(f)
except FileNotFoundError:
self.get_logger().warn(f"Config not found: {config_path}")
return {}
def _load_sounds(self):
"""Load sound files and cache them."""
sounds_dir = Path(self.config.get("audio", {}).get("sounds_directory", "/tmp"))
for event_name, event_config in self.config.get("events", {}).items():
filename = event_config.get("filename")
if not filename:
continue
sound_path = sounds_dir / filename
if sound_path.exists():
try:
rate, data = wavfile.read(str(sound_path))
# Normalize to float
if data.dtype != np.float32:
data = data.astype(np.float32) / 32768.0
self.sounds_cache[event_name] = data
self.get_logger().info(f"Loaded sound: {event_name}")
except Exception as e:
self.get_logger().warn(f"Failed to load {filename}: {e}")
else:
self.get_logger().info(f"Sound file missing: {filename}, will use synthesis")
def _subscribe_to_events(self):
"""Subscribe to all event topics from config."""
for event_name, topic in self.config.get("subscriptions", {}).items():
def callback(msg, event=event_name):
if msg.data:
self._queue_sound(event)
self.create_subscription(Bool, topic, callback, 10)
def _queue_sound(self, event_name: str):
"""Queue a sound for playback."""
event_config = self.config.get("events", {}).get(event_name)
if not event_config:
return
event = SoundEvent(
name=event_name,
filename=event_config.get("filename", ""),
duration=event_config.get("duration", 1.0),
priority=event_config.get("priority", 50),
timestamp=time.time(),
)
self.sound_queue.put(event)
self.get_logger().info(f"Queued sound: {event_name} (priority: {event.priority})")
def _process_queue(self):
"""Process sound queue."""
if self.is_playing:
return # Still playing current sound
if not self.sound_queue.empty():
try:
self.current_sound = self.sound_queue.get_nowait()
self.is_playing = True
except:
pass
def _playback_loop(self):
"""Background thread for audio playback."""
while True:
if self.current_sound and self.is_playing:
audio_data = self._get_audio_data(self.current_sound.name)
if audio_data is not None:
self._play_audio(audio_data, self.current_sound.name)
self.is_playing = False
self.current_sound = None
time.sleep(0.05)
def _get_audio_data(self, event_name: str) -> Optional[np.ndarray]:
"""Get audio data for event (cached or synthesized)."""
if event_name in self.sounds_cache:
return self.sounds_cache[event_name]
if not self.config.get("synthesis", {}).get("enabled", True):
return None
# Synthesize default sound
return self._synthesize_sound(event_name)
def _synthesize_sound(self, event_name: str) -> Optional[np.ndarray]:
"""Synthesize sound programmatically."""
defaults = self.config.get("synthesis", {}).get("defaults", {})
event_config = defaults.get(event_name)
if not event_config:
return None
sound_type = event_config.get("type", "sine")
duration = event_config.get("duration", 1.0)
try:
if sound_type == "sine":
freq = event_config.get("frequency", 440)
return self.synthesizer.generate_sine(freq, duration)
elif sound_type == "chime":
freqs = event_config.get("frequencies", [523.25, 659.25])
return self.synthesizer.generate_chime(freqs, duration)
elif sound_type == "descending":
freqs = event_config.get("frequencies", [800, 600, 400])
return self.synthesizer.generate_descending(freqs, duration)
elif sound_type == "power_up":
start = event_config.get("start_freq", 400)
end = event_config.get("end_freq", 1200)
return self.synthesizer.generate_power_up(start, end, duration)
elif sound_type in ["beep", "warning", "alert"]:
freq = event_config.get("frequency", 1000)
return self.synthesizer.generate_beep(freq, duration)
elif sound_type == "proximity":
freq = event_config.get("frequency", 1200)
pulse = event_config.get("pulse_rate", 5)
return self.synthesizer.generate_proximity(freq, duration, pulse)
elif sound_type == "jingle":
freqs = event_config.get("frequencies", [523.25, 659.25, 783.99])
return self.synthesizer.generate_chime(freqs, duration)
except Exception as e:
self.get_logger().warn(f"Sound synthesis failed for {event_name}: {e}")
return None
def _play_audio(self, audio_data: np.ndarray, event_name: str):
"""Play audio data (mock implementation)."""
# In real implementation, would use pygame, pyaudio, or similar
duration = len(audio_data) / self.config["synthesis"]["sample_rate"]
state = {
"event": event_name,
"playing": True,
"duration": duration,
"volume": self.volume,
}
self.sound_state_pub.publish(String(data=json.dumps(state)))
# Simulate playback
time.sleep(duration)
state["playing"] = False
self.sound_state_pub.publish(String(data=json.dumps(state)))
def main(args=None):
"""Main entry point."""
rclpy.init(args=args)
node = SoundEffectsNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -1,4 +0,0 @@
[develop]
script_dir=$base/lib/saltybot_sound_effects
[bdist_wheel]
universal=0

View File

@ -1,30 +0,0 @@
from setuptools import setup
package_name = "saltybot_sound_effects"
setup(
name=package_name,
version="0.1.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(f"share/{package_name}/launch", ["launch/sound_effects.launch.py"]),
(f"share/{package_name}/config", ["config/sound_config.yaml"]),
],
install_requires=["setuptools", "pyyaml", "numpy", "scipy"],
zip_safe=True,
maintainer="sl-mechanical",
maintainer_email="sl-mechanical@saltylab.local",
description=(
"Sound effects library: event-driven audio feedback, priority queue, "
"programmatic synthesis, volume control"
),
license="MIT",
tests_require=["pytest"],
entry_points={
"console_scripts": [
"sound_effects_node = saltybot_sound_effects.sound_effects_node:main",
],
},
)