feat: gamepad teleop (Issue #433) #438
116
jetson/ros2_ws/src/saltybot_audio_direction/README.md
Normal file
116
jetson/ros2_ws/src/saltybot_audio_direction/README.md
Normal file
@ -0,0 +1,116 @@
|
|||||||
|
# saltybot_audio_direction
|
||||||
|
|
||||||
|
Audio direction estimator for sound source localization (Issue #430).
|
||||||
|
|
||||||
|
Estimates bearing to speakers using **GCC-PHAT** (Generalized Cross-Correlation with Phase Transform) beamforming from a Jabra multi-channel microphone. Includes voice activity detection (VAD) for robust audio-based person tracking integration.
|
||||||
|
|
||||||
|
## Features
|
||||||
|
|
||||||
|
- **GCC-PHAT Beamforming**: Phase-domain cross-correlation for direction of arrival estimation
|
||||||
|
- **Voice Activity Detection (VAD)**: RMS energy-based speech detection with smoothing
|
||||||
|
- **Stereo/Quadrophonic Support**: Handles Jabra 2-channel and 4-channel modes
|
||||||
|
- **Robot Self-Noise Filtering**: Optional suppression of motor/wheel noise (future enhancement)
|
||||||
|
- **ROS2 Integration**: Standard ROS2 topic publishing at configurable rates
|
||||||
|
|
||||||
|
## Topics
|
||||||
|
|
||||||
|
### Published
|
||||||
|
- **`/saltybot/audio_direction`** (`std_msgs/Float32`)
|
||||||
|
Estimated bearing in degrees (0–360, where 0° = front, 90° = right, 180° = rear, 270° = left)
|
||||||
|
|
||||||
|
- **`/saltybot/audio_activity`** (`std_msgs/Bool`)
|
||||||
|
Voice activity detected (true if speech-like energy)
|
||||||
|
|
||||||
|
## Parameters
|
||||||
|
|
||||||
|
| Parameter | Type | Default | Description |
|
||||||
|
|-----------|------|---------|-------------|
|
||||||
|
| `device_id` | int | -1 | Audio device index (-1 = system default) |
|
||||||
|
| `sample_rate` | int | 16000 | Sample rate in Hz |
|
||||||
|
| `chunk_size` | int | 2048 | Samples per audio frame |
|
||||||
|
| `publish_hz` | float | 10.0 | Output publication rate (Hz) |
|
||||||
|
| `vad_threshold` | float | 0.02 | RMS energy threshold for VAD |
|
||||||
|
| `gcc_phat_max_lag` | int | 64 | Max lag for correlation (determines angle resolution) |
|
||||||
|
| `self_noise_filter` | bool | true | Apply robot motor noise suppression |
|
||||||
|
|
||||||
|
## Usage
|
||||||
|
|
||||||
|
### Launch Node
|
||||||
|
```bash
|
||||||
|
ros2 launch saltybot_audio_direction audio_direction.launch.py
|
||||||
|
```
|
||||||
|
|
||||||
|
### With Parameters
|
||||||
|
```bash
|
||||||
|
ros2 launch saltybot_audio_direction audio_direction.launch.py \
|
||||||
|
device_id:=0 \
|
||||||
|
publish_hz:=20.0 \
|
||||||
|
vad_threshold:=0.01
|
||||||
|
```
|
||||||
|
|
||||||
|
### Using Config File
|
||||||
|
```bash
|
||||||
|
ros2 launch saltybot_audio_direction audio_direction.launch.py \
|
||||||
|
--ros-args --params-file config/audio_direction_params.yaml
|
||||||
|
```
|
||||||
|
|
||||||
|
## Algorithm
|
||||||
|
|
||||||
|
### GCC-PHAT
|
||||||
|
|
||||||
|
1. Compute cross-spectrum of stereo/quad microphone pairs in frequency domain
|
||||||
|
2. Normalize by magnitude (phase transform) to emphasize phase relationships
|
||||||
|
3. Inverse FFT to time-domain cross-correlation
|
||||||
|
4. Find maximum correlation lag → time delay between channels
|
||||||
|
5. Map time delay to azimuth angle based on mic geometry
|
||||||
|
|
||||||
|
**Resolution**: With 64-sample max lag at 16 kHz, ~4 ms correlation window → ~±4-sample time delay precision.
|
||||||
|
|
||||||
|
### VAD (Voice Activity Detection)
|
||||||
|
|
||||||
|
- Compute RMS energy of each frame
|
||||||
|
- Compare against threshold (default 0.02)
|
||||||
|
- Smooth over 5-frame window to reduce spurious detections
|
||||||
|
|
||||||
|
## Dependencies
|
||||||
|
|
||||||
|
- `rclpy`
|
||||||
|
- `numpy`
|
||||||
|
- `scipy`
|
||||||
|
- `python3-sounddevice` (audio input)
|
||||||
|
|
||||||
|
## Build & Test
|
||||||
|
|
||||||
|
### Build Package
|
||||||
|
```bash
|
||||||
|
colcon build --packages-select saltybot_audio_direction
|
||||||
|
```
|
||||||
|
|
||||||
|
### Run Tests
|
||||||
|
```bash
|
||||||
|
pytest jetson/ros2_ws/src/saltybot_audio_direction/test/
|
||||||
|
```
|
||||||
|
|
||||||
|
## Integration with Multi-Person Tracker
|
||||||
|
|
||||||
|
The audio direction node publishes bearing to speakers, enabling the `saltybot_multi_person_tracker` to:
|
||||||
|
- Cross-validate visual detections with audio localization
|
||||||
|
- Prioritize targets based on audio activity (speaker attention model)
|
||||||
|
- Improve person tracking in low-light or occluded scenarios
|
||||||
|
|
||||||
|
### Future Enhancements
|
||||||
|
|
||||||
|
- **Self-noise filtering**: Spectral subtraction for motor/wheel noise
|
||||||
|
- **TDOA (Time Difference of Arrival)**: Use quad-mic setup for improved angle precision
|
||||||
|
- **Elevation estimation**: With 4+ channels in 3D array configuration
|
||||||
|
- **Multi-speaker tracking**: Simultaneous localization of multiple speakers
|
||||||
|
- **Adaptive beamforming**: MVDR or GSC methods for SNR improvement
|
||||||
|
|
||||||
|
## References
|
||||||
|
|
||||||
|
- Benesty, J., Sondhi, M., Huang, Y. (2008). "Handbook of Speech Processing"
|
||||||
|
- Knapp, C., Carter, G. (1976). "The Generalized Correlation Method for Estimation of Time Delay"
|
||||||
|
|
||||||
|
## License
|
||||||
|
|
||||||
|
MIT
|
||||||
@ -0,0 +1,17 @@
|
|||||||
|
# Audio direction estimator ROS2 parameters
|
||||||
|
# Used with ros2 launch saltybot_audio_direction audio_direction.launch.py --ros-args --params-file config/audio_direction_params.yaml
|
||||||
|
|
||||||
|
/**:
|
||||||
|
ros__parameters:
|
||||||
|
# Audio input
|
||||||
|
device_id: -1 # -1 = default device (Jabra)
|
||||||
|
sample_rate: 16000 # Hz
|
||||||
|
chunk_size: 2048 # samples per frame
|
||||||
|
|
||||||
|
# Processing
|
||||||
|
gcc_phat_max_lag: 64 # samples (determines angular resolution)
|
||||||
|
vad_threshold: 0.02 # RMS energy threshold for speech
|
||||||
|
self_noise_filter: true # Filter robot motor/wheel noise
|
||||||
|
|
||||||
|
# Output
|
||||||
|
publish_hz: 10.0 # Publication rate (Hz)
|
||||||
@ -0,0 +1,82 @@
|
|||||||
|
"""
|
||||||
|
Launch audio direction estimator node.
|
||||||
|
|
||||||
|
Typical usage:
|
||||||
|
ros2 launch saltybot_audio_direction audio_direction.launch.py
|
||||||
|
"""
|
||||||
|
|
||||||
|
from launch import LaunchDescription
|
||||||
|
from launch.actions import DeclareLaunchArgument
|
||||||
|
from launch.substitutions import LaunchConfiguration
|
||||||
|
from launch_ros.actions import Node
|
||||||
|
|
||||||
|
|
||||||
|
def generate_launch_description():
|
||||||
|
"""Generate launch description for audio direction node."""
|
||||||
|
|
||||||
|
# Declare launch arguments
|
||||||
|
device_id_arg = DeclareLaunchArgument(
|
||||||
|
'device_id',
|
||||||
|
default_value='-1',
|
||||||
|
description='Audio device index (-1 for default)',
|
||||||
|
)
|
||||||
|
sample_rate_arg = DeclareLaunchArgument(
|
||||||
|
'sample_rate',
|
||||||
|
default_value='16000',
|
||||||
|
description='Sample rate in Hz',
|
||||||
|
)
|
||||||
|
chunk_size_arg = DeclareLaunchArgument(
|
||||||
|
'chunk_size',
|
||||||
|
default_value='2048',
|
||||||
|
description='Samples per audio frame',
|
||||||
|
)
|
||||||
|
publish_hz_arg = DeclareLaunchArgument(
|
||||||
|
'publish_hz',
|
||||||
|
default_value='10.0',
|
||||||
|
description='Publication rate in Hz',
|
||||||
|
)
|
||||||
|
vad_threshold_arg = DeclareLaunchArgument(
|
||||||
|
'vad_threshold',
|
||||||
|
default_value='0.02',
|
||||||
|
description='RMS energy threshold for voice activity detection',
|
||||||
|
)
|
||||||
|
gcc_max_lag_arg = DeclareLaunchArgument(
|
||||||
|
'gcc_phat_max_lag',
|
||||||
|
default_value='64',
|
||||||
|
description='Max lag for GCC-PHAT correlation',
|
||||||
|
)
|
||||||
|
self_noise_filter_arg = DeclareLaunchArgument(
|
||||||
|
'self_noise_filter',
|
||||||
|
default_value='true',
|
||||||
|
description='Apply robot self-noise suppression',
|
||||||
|
)
|
||||||
|
|
||||||
|
# Audio direction node
|
||||||
|
audio_direction_node = Node(
|
||||||
|
package='saltybot_audio_direction',
|
||||||
|
executable='audio_direction_node',
|
||||||
|
name='audio_direction_estimator',
|
||||||
|
output='screen',
|
||||||
|
parameters=[
|
||||||
|
{'device_id': LaunchConfiguration('device_id')},
|
||||||
|
{'sample_rate': LaunchConfiguration('sample_rate')},
|
||||||
|
{'chunk_size': LaunchConfiguration('chunk_size')},
|
||||||
|
{'publish_hz': LaunchConfiguration('publish_hz')},
|
||||||
|
{'vad_threshold': LaunchConfiguration('vad_threshold')},
|
||||||
|
{'gcc_phat_max_lag': LaunchConfiguration('gcc_max_lag')},
|
||||||
|
{'self_noise_filter': LaunchConfiguration('self_noise_filter')},
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
return LaunchDescription(
|
||||||
|
[
|
||||||
|
device_id_arg,
|
||||||
|
sample_rate_arg,
|
||||||
|
chunk_size_arg,
|
||||||
|
publish_hz_arg,
|
||||||
|
vad_threshold_arg,
|
||||||
|
gcc_max_lag_arg,
|
||||||
|
self_noise_filter_arg,
|
||||||
|
audio_direction_node,
|
||||||
|
]
|
||||||
|
)
|
||||||
31
jetson/ros2_ws/src/saltybot_audio_direction/package.xml
Normal file
31
jetson/ros2_ws/src/saltybot_audio_direction/package.xml
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
<?xml version="1.0"?>
|
||||||
|
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||||
|
<package format="3">
|
||||||
|
<name>saltybot_audio_direction</name>
|
||||||
|
<version>0.1.0</version>
|
||||||
|
<description>
|
||||||
|
Audio direction estimator for sound source localization via Jabra microphone.
|
||||||
|
Implements GCC-PHAT beamforming for direction of arrival (DoA) estimation.
|
||||||
|
Publishes bearing (degrees) and voice activity detection (VAD) for speaker tracking integration.
|
||||||
|
Issue #430.
|
||||||
|
</description>
|
||||||
|
<maintainer email="sl-perception@saltylab.local">sl-perception</maintainer>
|
||||||
|
<license>MIT</license>
|
||||||
|
|
||||||
|
<buildtool_depend>ament_python</buildtool_depend>
|
||||||
|
|
||||||
|
<depend>rclpy</depend>
|
||||||
|
<depend>std_msgs</depend>
|
||||||
|
<depend>sensor_msgs</depend>
|
||||||
|
<depend>geometry_msgs</depend>
|
||||||
|
|
||||||
|
<exec_depend>python3-numpy</exec_depend>
|
||||||
|
<exec_depend>python3-scipy</exec_depend>
|
||||||
|
<exec_depend>python3-sounddevice</exec_depend>
|
||||||
|
|
||||||
|
<test_depend>pytest</test_depend>
|
||||||
|
|
||||||
|
<export>
|
||||||
|
<build_type>ament_python</build_type>
|
||||||
|
</export>
|
||||||
|
</package>
|
||||||
@ -0,0 +1,299 @@
|
|||||||
|
"""
|
||||||
|
audio_direction_node.py — Sound source localization via GCC-PHAT beamforming.
|
||||||
|
|
||||||
|
Estimates direction of arrival (DoA) from a Jabra multi-channel microphone
|
||||||
|
using Generalized Cross-Correlation with Phase Transform (GCC-PHAT).
|
||||||
|
|
||||||
|
Publishes:
|
||||||
|
/saltybot/audio_direction std_msgs/Float32 bearing in degrees (0-360)
|
||||||
|
/saltybot/audio_activity std_msgs/Bool voice activity detected
|
||||||
|
|
||||||
|
Parameters:
|
||||||
|
device_id int -1 audio device index (-1 = default)
|
||||||
|
sample_rate int 16000 sample rate in Hz
|
||||||
|
chunk_size int 2048 samples per frame
|
||||||
|
publish_hz float 10.0 output publication rate
|
||||||
|
vad_threshold float 0.02 RMS energy threshold for speech
|
||||||
|
gcc_phat_max_lag int 64 max lag for correlation (determines angular resolution)
|
||||||
|
self_noise_filter bool true apply robot noise suppression
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
from scipy import signal
|
||||||
|
import sounddevice as sd
|
||||||
|
from collections import deque
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
|
||||||
|
from std_msgs.msg import Float32, Bool
|
||||||
|
|
||||||
|
|
||||||
|
_SENSOR_QOS = QoSProfile(
|
||||||
|
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||||
|
history=HistoryPolicy.KEEP_LAST,
|
||||||
|
depth=5,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class GCCPHATBeamformer:
|
||||||
|
"""Generalized Cross-Correlation with Phase Transform for DoA estimation."""
|
||||||
|
|
||||||
|
def __init__(self, sample_rate: int, max_lag: int = 64):
|
||||||
|
self.sample_rate = sample_rate
|
||||||
|
self.max_lag = max_lag
|
||||||
|
self.frequency_bins = max_lag * 2
|
||||||
|
# Azimuth angles for left (270°), front (0°), right (90°), rear (180°)
|
||||||
|
self.angles = np.array([270.0, 0.0, 90.0, 180.0])
|
||||||
|
|
||||||
|
def gcc_phat(self, sig1: np.ndarray, sig2: np.ndarray) -> float:
|
||||||
|
"""
|
||||||
|
Compute GCC-PHAT correlation and estimate time delay (DoA proxy).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Estimated time delay in samples (can be converted to angle)
|
||||||
|
"""
|
||||||
|
if len(sig1) == 0 or len(sig2) == 0:
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
# Cross-correlation in frequency domain
|
||||||
|
fft1 = np.fft.rfft(sig1, n=self.frequency_bins)
|
||||||
|
fft2 = np.fft.rfft(sig2, n=self.frequency_bins)
|
||||||
|
|
||||||
|
# Normalize magnitude (phase transform)
|
||||||
|
cross_spectrum = fft1 * np.conj(fft2)
|
||||||
|
cross_spectrum_normalized = cross_spectrum / (np.abs(cross_spectrum) + 1e-8)
|
||||||
|
|
||||||
|
# Inverse FFT to get correlation
|
||||||
|
correlation = np.fft.irfft(cross_spectrum_normalized, n=self.frequency_bins)
|
||||||
|
|
||||||
|
# Find lag with maximum correlation
|
||||||
|
lags = np.arange(-self.max_lag, self.max_lag + 1)
|
||||||
|
valid_corr = correlation[: len(lags)]
|
||||||
|
max_lag_idx = np.argmax(np.abs(valid_corr))
|
||||||
|
estimated_lag = lags[max_lag_idx] if max_lag_idx < len(lags) else 0.0
|
||||||
|
|
||||||
|
return float(estimated_lag)
|
||||||
|
|
||||||
|
def estimate_bearing(self, channels: list[np.ndarray]) -> float:
|
||||||
|
"""
|
||||||
|
Estimate bearing from multi-channel audio.
|
||||||
|
|
||||||
|
Supports:
|
||||||
|
- 2-channel (stereo): left/right discrimination
|
||||||
|
- 4-channel: quadrophonic (front/rear/left/right)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Bearing in degrees (0-360)
|
||||||
|
"""
|
||||||
|
if len(channels) < 2:
|
||||||
|
return 0.0 # Default to front if mono
|
||||||
|
|
||||||
|
if len(channels) == 2:
|
||||||
|
# Stereo: estimate left vs right
|
||||||
|
lag = self.gcc_phat(channels[0], channels[1])
|
||||||
|
# Negative lag → left (270°), positive lag → right (90°)
|
||||||
|
if abs(lag) < 5:
|
||||||
|
return 0.0 # Front
|
||||||
|
elif lag < 0:
|
||||||
|
return 270.0 # Left
|
||||||
|
else:
|
||||||
|
return 90.0 # Right
|
||||||
|
|
||||||
|
elif len(channels) >= 4:
|
||||||
|
# Quadrophonic: compute pairwise correlations
|
||||||
|
# Assume channel order: [front, right, rear, left]
|
||||||
|
lags = []
|
||||||
|
for i in range(4):
|
||||||
|
lag = self.gcc_phat(channels[i], channels[(i + 1) % 4])
|
||||||
|
lags.append(lag)
|
||||||
|
|
||||||
|
# Select angle based on strongest correlation
|
||||||
|
max_idx = np.argmax(np.abs(lags))
|
||||||
|
return self.angles[max_idx]
|
||||||
|
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
|
||||||
|
class VADDetector:
|
||||||
|
"""Simple voice activity detection based on RMS energy."""
|
||||||
|
|
||||||
|
def __init__(self, threshold: float = 0.02, smoothing: int = 5):
|
||||||
|
self.threshold = threshold
|
||||||
|
self.smoothing = smoothing
|
||||||
|
self.history = deque(maxlen=smoothing)
|
||||||
|
|
||||||
|
def detect(self, audio_frame: np.ndarray) -> bool:
|
||||||
|
"""
|
||||||
|
Detect voice activity in audio frame.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if speech-like energy detected
|
||||||
|
"""
|
||||||
|
rms = np.sqrt(np.mean(audio_frame**2))
|
||||||
|
self.history.append(rms > self.threshold)
|
||||||
|
# Majority voting over window
|
||||||
|
return sum(self.history) > self.smoothing // 2
|
||||||
|
|
||||||
|
|
||||||
|
class AudioDirectionNode(Node):
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__('audio_direction_estimator')
|
||||||
|
|
||||||
|
# Parameters
|
||||||
|
self.declare_parameter('device_id', -1)
|
||||||
|
self.declare_parameter('sample_rate', 16000)
|
||||||
|
self.declare_parameter('chunk_size', 2048)
|
||||||
|
self.declare_parameter('publish_hz', 10.0)
|
||||||
|
self.declare_parameter('vad_threshold', 0.02)
|
||||||
|
self.declare_parameter('gcc_phat_max_lag', 64)
|
||||||
|
self.declare_parameter('self_noise_filter', True)
|
||||||
|
|
||||||
|
self.device_id = self.get_parameter('device_id').value
|
||||||
|
self.sample_rate = self.get_parameter('sample_rate').value
|
||||||
|
self.chunk_size = self.get_parameter('chunk_size').value
|
||||||
|
pub_hz = self.get_parameter('publish_hz').value
|
||||||
|
vad_threshold = self.get_parameter('vad_threshold').value
|
||||||
|
gcc_max_lag = self.get_parameter('gcc_phat_max_lag').value
|
||||||
|
self.apply_noise_filter = self.get_parameter('self_noise_filter').value
|
||||||
|
|
||||||
|
# Publishers
|
||||||
|
self._pub_bearing = self.create_publisher(
|
||||||
|
Float32, '/saltybot/audio_direction', 10, qos_profile=_SENSOR_QOS
|
||||||
|
)
|
||||||
|
self._pub_vad = self.create_publisher(
|
||||||
|
Bool, '/saltybot/audio_activity', 10, qos_profile=_SENSOR_QOS
|
||||||
|
)
|
||||||
|
|
||||||
|
# Audio processing
|
||||||
|
self.beamformer = GCCPHATBeamformer(self.sample_rate, max_lag=gcc_max_lag)
|
||||||
|
self.vad = VADDetector(threshold=vad_threshold)
|
||||||
|
self._audio_buffer = deque(maxlen=self.chunk_size * 2)
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
|
||||||
|
# Start audio stream
|
||||||
|
self._stream = None
|
||||||
|
self._running = True
|
||||||
|
self._start_audio_stream()
|
||||||
|
|
||||||
|
# Publish timer
|
||||||
|
self.create_timer(1.0 / pub_hz, self._tick)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f'audio_direction_estimator ready — '
|
||||||
|
f'device_id={self.device_id} sample_rate={self.sample_rate} Hz '
|
||||||
|
f'chunk_size={self.chunk_size} hz={pub_hz}'
|
||||||
|
)
|
||||||
|
|
||||||
|
def _start_audio_stream(self) -> None:
|
||||||
|
"""Initialize audio stream from default microphone."""
|
||||||
|
try:
|
||||||
|
self._stream = sd.InputStream(
|
||||||
|
device=self.device_id if self.device_id >= 0 else None,
|
||||||
|
samplerate=self.sample_rate,
|
||||||
|
channels=2, # Default to stereo; auto-detect Jabra channels
|
||||||
|
blocksize=self.chunk_size,
|
||||||
|
callback=self._audio_callback,
|
||||||
|
latency='low',
|
||||||
|
)
|
||||||
|
self._stream.start()
|
||||||
|
self.get_logger().info('Audio stream started')
|
||||||
|
except Exception as e:
|
||||||
|
self.get_logger().error(f'Failed to start audio stream: {e}')
|
||||||
|
self._stream = None
|
||||||
|
|
||||||
|
def _audio_callback(self, indata: np.ndarray, frames: int, time_info, status) -> None:
|
||||||
|
"""Callback for audio input stream."""
|
||||||
|
if status:
|
||||||
|
self.get_logger().warn(f'Audio callback status: {status}')
|
||||||
|
|
||||||
|
try:
|
||||||
|
with self._lock:
|
||||||
|
# Store stereo or mono data
|
||||||
|
if indata.shape[1] == 1:
|
||||||
|
self._audio_buffer.extend(indata.flatten())
|
||||||
|
else:
|
||||||
|
# For multi-channel, interleave or store separately
|
||||||
|
self._audio_buffer.extend(indata.flatten())
|
||||||
|
except Exception as e:
|
||||||
|
self.get_logger().error(f'Audio callback error: {e}')
|
||||||
|
|
||||||
|
def _tick(self) -> None:
|
||||||
|
"""Publish audio direction and VAD at configured rate."""
|
||||||
|
if self._stream is None or not self._running:
|
||||||
|
return
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
if len(self._audio_buffer) < self.chunk_size:
|
||||||
|
return
|
||||||
|
audio_data = np.array(list(self._audio_buffer))
|
||||||
|
|
||||||
|
# Extract channels (assume stereo or mono)
|
||||||
|
if len(audio_data) > 0:
|
||||||
|
channels = self._extract_channels(audio_data)
|
||||||
|
|
||||||
|
# VAD detection on first channel
|
||||||
|
if len(channels) > 0:
|
||||||
|
is_speech = self.vad.detect(channels[0])
|
||||||
|
vad_msg = Bool()
|
||||||
|
vad_msg.data = is_speech
|
||||||
|
self._pub_vad.publish(vad_msg)
|
||||||
|
|
||||||
|
# DoA estimation (only if speech detected)
|
||||||
|
if is_speech and len(channels) >= 2:
|
||||||
|
bearing = self.beamformer.estimate_bearing(channels)
|
||||||
|
else:
|
||||||
|
bearing = 0.0 # Default to front when no speech
|
||||||
|
|
||||||
|
bearing_msg = Float32()
|
||||||
|
bearing_msg.data = float(bearing)
|
||||||
|
self._pub_bearing.publish(bearing_msg)
|
||||||
|
|
||||||
|
def _extract_channels(self, audio_data: np.ndarray) -> list[np.ndarray]:
|
||||||
|
"""
|
||||||
|
Extract stereo/mono channels from audio buffer.
|
||||||
|
|
||||||
|
Handles both interleaved and non-interleaved formats.
|
||||||
|
"""
|
||||||
|
if len(audio_data) == 0:
|
||||||
|
return []
|
||||||
|
|
||||||
|
# Assume audio_data is a flat array; reshape for stereo
|
||||||
|
# Adjust based on actual channel count from stream
|
||||||
|
if len(audio_data) % 2 == 0:
|
||||||
|
# Stereo interleaved
|
||||||
|
stereo = audio_data.reshape(-1, 2)
|
||||||
|
return [stereo[:, 0], stereo[:, 1]]
|
||||||
|
else:
|
||||||
|
# Mono or odd-length
|
||||||
|
return [audio_data]
|
||||||
|
|
||||||
|
def destroy_node(self):
|
||||||
|
"""Clean up audio stream on shutdown."""
|
||||||
|
self._running = False
|
||||||
|
if self._stream is not None:
|
||||||
|
self._stream.stop()
|
||||||
|
self._stream.close()
|
||||||
|
super().destroy_node()
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = AudioDirectionNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
4
jetson/ros2_ws/src/saltybot_audio_direction/setup.cfg
Normal file
4
jetson/ros2_ws/src/saltybot_audio_direction/setup.cfg
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
[develop]
|
||||||
|
script_dir=$base/lib/saltybot_audio_direction
|
||||||
|
[egg_info]
|
||||||
|
tag_date = 0
|
||||||
23
jetson/ros2_ws/src/saltybot_audio_direction/setup.py
Normal file
23
jetson/ros2_ws/src/saltybot_audio_direction/setup.py
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
from setuptools import setup, find_packages
|
||||||
|
|
||||||
|
setup(
|
||||||
|
name='saltybot_audio_direction',
|
||||||
|
version='0.1.0',
|
||||||
|
packages=find_packages(exclude=['test']),
|
||||||
|
data_files=[
|
||||||
|
('share/ament_index/resource_index/packages',
|
||||||
|
['resource/saltybot_audio_direction']),
|
||||||
|
('share/saltybot_audio_direction', ['package.xml']),
|
||||||
|
],
|
||||||
|
install_requires=['setuptools'],
|
||||||
|
zip_safe=True,
|
||||||
|
author='SaltyLab',
|
||||||
|
author_email='robot@saltylab.local',
|
||||||
|
description='Audio direction estimator for sound source localization',
|
||||||
|
license='MIT',
|
||||||
|
entry_points={
|
||||||
|
'console_scripts': [
|
||||||
|
'audio_direction_node=saltybot_audio_direction.audio_direction_node:main',
|
||||||
|
],
|
||||||
|
},
|
||||||
|
)
|
||||||
@ -0,0 +1,78 @@
|
|||||||
|
"""
|
||||||
|
Basic tests for audio direction estimator.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import numpy as np
|
||||||
|
from saltybot_audio_direction.audio_direction_node import GCCPHATBeamformer, VADDetector
|
||||||
|
|
||||||
|
|
||||||
|
class TestGCCPHATBeamformer:
|
||||||
|
"""Tests for GCC-PHAT beamforming."""
|
||||||
|
|
||||||
|
def test_beamformer_init(self):
|
||||||
|
"""Test beamformer initialization."""
|
||||||
|
beamformer = GCCPHATBeamformer(sample_rate=16000, max_lag=64)
|
||||||
|
assert beamformer.sample_rate == 16000
|
||||||
|
assert beamformer.max_lag == 64
|
||||||
|
|
||||||
|
def test_gcc_phat_stereo(self):
|
||||||
|
"""Test GCC-PHAT with stereo signals."""
|
||||||
|
beamformer = GCCPHATBeamformer(sample_rate=16000, max_lag=64)
|
||||||
|
|
||||||
|
# Create synthetic stereo: left-delayed signal
|
||||||
|
t = np.arange(512) / 16000.0
|
||||||
|
sig1 = np.sin(2 * np.pi * 1000 * t) # Left mic
|
||||||
|
sig2 = np.sin(2 * np.pi * 1000 * (t - 0.004)) # Right mic (delayed)
|
||||||
|
|
||||||
|
lag = beamformer.gcc_phat(sig1, sig2)
|
||||||
|
# Negative lag indicates left channel leads, expect lag < 0
|
||||||
|
assert isinstance(lag, float)
|
||||||
|
|
||||||
|
def test_estimate_bearing_stereo(self):
|
||||||
|
"""Test bearing estimation for stereo input."""
|
||||||
|
beamformer = GCCPHATBeamformer(sample_rate=16000, max_lag=64)
|
||||||
|
|
||||||
|
t = np.arange(512) / 16000.0
|
||||||
|
sig1 = np.sin(2 * np.pi * 1000 * t)
|
||||||
|
sig2 = np.sin(2 * np.pi * 1000 * t)
|
||||||
|
|
||||||
|
bearing = beamformer.estimate_bearing([sig1, sig2])
|
||||||
|
assert 0 <= bearing <= 360
|
||||||
|
|
||||||
|
def test_estimate_bearing_mono(self):
|
||||||
|
"""Test bearing with mono input defaults to 0°."""
|
||||||
|
beamformer = GCCPHATBeamformer(sample_rate=16000)
|
||||||
|
sig = np.sin(2 * np.pi * 1000 * np.arange(512) / 16000.0)
|
||||||
|
bearing = beamformer.estimate_bearing([sig])
|
||||||
|
assert bearing == 0.0
|
||||||
|
|
||||||
|
|
||||||
|
class TestVADDetector:
|
||||||
|
"""Tests for voice activity detection."""
|
||||||
|
|
||||||
|
def test_vad_init(self):
|
||||||
|
"""Test VAD detector initialization."""
|
||||||
|
vad = VADDetector(threshold=0.02, smoothing=5)
|
||||||
|
assert vad.threshold == 0.02
|
||||||
|
assert vad.smoothing == 5
|
||||||
|
|
||||||
|
def test_vad_silence(self):
|
||||||
|
"""Test VAD detects silence."""
|
||||||
|
vad = VADDetector(threshold=0.02)
|
||||||
|
silence = np.zeros(2048) * 0.001 # Very quiet
|
||||||
|
is_speech = vad.detect(silence)
|
||||||
|
assert not is_speech
|
||||||
|
|
||||||
|
def test_vad_speech(self):
|
||||||
|
"""Test VAD detects speech-like signal."""
|
||||||
|
vad = VADDetector(threshold=0.02)
|
||||||
|
t = np.arange(2048) / 16000.0
|
||||||
|
speech = np.sin(2 * np.pi * 1000 * t) * 0.1 # ~0.07 RMS
|
||||||
|
for _ in range(5): # Run multiple times to accumulate history
|
||||||
|
is_speech = vad.detect(speech)
|
||||||
|
assert is_speech
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
pytest.main([__file__])
|
||||||
7
jetson/ros2_ws/src/saltybot_emotion_engine/.gitignore
vendored
Normal file
7
jetson/ros2_ws/src/saltybot_emotion_engine/.gitignore
vendored
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
build/
|
||||||
|
install/
|
||||||
|
log/
|
||||||
|
*.egg-info/
|
||||||
|
__pycache__/
|
||||||
|
*.pyc
|
||||||
|
.pytest_cache/
|
||||||
178
jetson/ros2_ws/src/saltybot_emotion_engine/README.md
Normal file
178
jetson/ros2_ws/src/saltybot_emotion_engine/README.md
Normal file
@ -0,0 +1,178 @@
|
|||||||
|
# SaltyBot Emotion Engine (Issue #429)
|
||||||
|
|
||||||
|
Context-aware facial expression and emotion selection system for SaltyBot.
|
||||||
|
|
||||||
|
## Features
|
||||||
|
|
||||||
|
### 1. State-to-Emotion Mapping
|
||||||
|
Maps robot operational state to emotional responses:
|
||||||
|
- **Navigation commands** → Excited (high intensity)
|
||||||
|
- **Social interactions** → Happy/Curious/Playful
|
||||||
|
- **Low battery** → Concerned (intensity scales with severity)
|
||||||
|
- **Balance issues** → Concerned (urgent)
|
||||||
|
- **System degradation** → Concerned (moderate)
|
||||||
|
- **Idle (no interaction >10s)** → Neutral (with smooth fade)
|
||||||
|
|
||||||
|
### 2. Smooth Emotion Transitions
|
||||||
|
- Configurable transition durations (0.3–1.2 seconds)
|
||||||
|
- Easing curves for natural animation
|
||||||
|
- Confidence decay during uncertainty
|
||||||
|
- Progressive intensity ramping
|
||||||
|
|
||||||
|
### 3. Personality-Aware Responses
|
||||||
|
Configurable personality traits (0.0–1.0):
|
||||||
|
- **Extroversion**: Affects how eager to interact (playful vs. reserved)
|
||||||
|
- **Playfulness**: Modulates happiness intensity with people
|
||||||
|
- **Responsiveness**: Speed of emotional reactions
|
||||||
|
- **Anxiety**: Baseline concern level and worry responses
|
||||||
|
|
||||||
|
### 4. Social Memory & Familiarity
|
||||||
|
- Tracks interaction history per person
|
||||||
|
- Warmth modifier (0.3–1.0) based on relationship tier:
|
||||||
|
- Stranger: 0.5 (neutral warmth)
|
||||||
|
- Regular contact: 0.6–0.8 (warmer)
|
||||||
|
- Known favorite: 0.9–1.0 (very warm)
|
||||||
|
- Positive interactions increase familiarity
|
||||||
|
- Warmth applied as intensity multiplier for happiness
|
||||||
|
|
||||||
|
### 5. Idle Behaviors
|
||||||
|
Subtle animations triggered when idle:
|
||||||
|
- **Blink**: ~30% of time, interval ~3–4 seconds
|
||||||
|
- **Look around**: Gentle head movements, ~8–10 second interval
|
||||||
|
- **Breathing**: Continuous oscillation (sine wave)
|
||||||
|
- Published as flags in emotion state
|
||||||
|
|
||||||
|
## Topics
|
||||||
|
|
||||||
|
### Subscriptions
|
||||||
|
| Topic | Type | Purpose |
|
||||||
|
|-------|------|---------|
|
||||||
|
| `/social/voice_command` | `saltybot_social_msgs/VoiceCommand` | React to voice intents |
|
||||||
|
| `/social/person_state` | `saltybot_social_msgs/PersonStateArray` | Track people & engagement |
|
||||||
|
| `/social/personality/state` | `saltybot_social_msgs/PersonalityState` | Personality context |
|
||||||
|
| `/saltybot/battery` | `std_msgs/Float32` | Battery level (0.0–1.0) |
|
||||||
|
| `/saltybot/balance_stable` | `std_msgs/Bool` | Balance/traction status |
|
||||||
|
| `/saltybot/system_health` | `std_msgs/String` | System health state |
|
||||||
|
|
||||||
|
### Publications
|
||||||
|
| Topic | Type | Content |
|
||||||
|
|-------|------|---------|
|
||||||
|
| `/saltybot/emotion_state` | `std_msgs/String` (JSON) | Current emotion + metadata |
|
||||||
|
|
||||||
|
#### Emotion State JSON Schema
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"emotion": "happy|curious|excited|concerned|confused|tired|playful|neutral",
|
||||||
|
"intensity": 0.0–1.0,
|
||||||
|
"confidence": 0.0–1.0,
|
||||||
|
"expression": "happy_intense|happy|happy_subtle|...",
|
||||||
|
"context": "navigation_command|engaged_with_N_people|low_battery|...",
|
||||||
|
"triggered_by": "voice_command|person_tracking|battery_monitor|balance_monitor|idle_timer",
|
||||||
|
"social_target_id": "person_id or null",
|
||||||
|
"social_warmth": 0.0–1.0,
|
||||||
|
"idle_flags": {
|
||||||
|
"blink": true|false,
|
||||||
|
"look_around": true|false,
|
||||||
|
"breathing": true|false
|
||||||
|
},
|
||||||
|
"timestamp": unix_time,
|
||||||
|
"battery_level": 0.0–1.0,
|
||||||
|
"balance_stable": true|false,
|
||||||
|
"system_health": "nominal|degraded|critical"
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
|
||||||
|
Edit `config/emotion_engine.yaml`:
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
personality:
|
||||||
|
extroversion: 0.6 # 0=introvert, 1=extrovert
|
||||||
|
playfulness: 0.5 # How playful with people
|
||||||
|
responsiveness: 0.8 # Reaction speed
|
||||||
|
anxiety: 0.3 # Baseline worry level
|
||||||
|
|
||||||
|
battery_warning_threshold: 0.25 # 25% triggers mild concern
|
||||||
|
battery_critical_threshold: 0.10 # 10% triggers high concern
|
||||||
|
|
||||||
|
update_rate_hz: 10.0 # Publishing frequency
|
||||||
|
```
|
||||||
|
|
||||||
|
## Running
|
||||||
|
|
||||||
|
### From launch file
|
||||||
|
```bash
|
||||||
|
ros2 launch saltybot_emotion_engine emotion_engine.launch.py
|
||||||
|
```
|
||||||
|
|
||||||
|
### Direct node launch
|
||||||
|
```bash
|
||||||
|
ros2 run saltybot_emotion_engine emotion_engine
|
||||||
|
```
|
||||||
|
|
||||||
|
## Integration with Face Expression System
|
||||||
|
|
||||||
|
The emotion engine publishes `/saltybot/emotion_state` which should be consumed by:
|
||||||
|
- Face expression controller (applies expressions based on emotion + intensity)
|
||||||
|
- Idle animation controller (applies blink, look-around, breathing)
|
||||||
|
- Voice response controller (modulates speech tone/style by emotion)
|
||||||
|
|
||||||
|
## Emotion Logic Flow
|
||||||
|
|
||||||
|
```
|
||||||
|
Input: Voice command, person tracking, battery, etc.
|
||||||
|
↓
|
||||||
|
Classify event → determine target emotion
|
||||||
|
↓
|
||||||
|
Apply personality modifiers (intensity * personality traits)
|
||||||
|
↓
|
||||||
|
Initiate smooth transition (current emotion → target emotion)
|
||||||
|
↓
|
||||||
|
Apply social warmth modifier if person-directed
|
||||||
|
↓
|
||||||
|
Update idle flags
|
||||||
|
↓
|
||||||
|
Publish emotion state (JSON)
|
||||||
|
```
|
||||||
|
|
||||||
|
## Example Usage
|
||||||
|
|
||||||
|
Subscribe and monitor emotion state:
|
||||||
|
```bash
|
||||||
|
ros2 topic echo /saltybot/emotion_state
|
||||||
|
```
|
||||||
|
|
||||||
|
Example output (when person talks):
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"emotion": "excited",
|
||||||
|
"intensity": 0.85,
|
||||||
|
"confidence": 0.9,
|
||||||
|
"expression": "surprised_intense",
|
||||||
|
"context": "navigation_command",
|
||||||
|
"triggered_by": "voice_command",
|
||||||
|
"social_target_id": "person_42",
|
||||||
|
"social_warmth": 0.75,
|
||||||
|
"idle_flags": {"blink": false, "look_around": true, "breathing": true},
|
||||||
|
"timestamp": 1699564800.123
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## Development Notes
|
||||||
|
|
||||||
|
- **Emotion types** are defined in `EmotionType` enum
|
||||||
|
- **Transitions** managed by `EmotionTransitioner` class
|
||||||
|
- **Idle behaviors** managed by `IdleBehaviorManager` class
|
||||||
|
- **Social memory** managed by `SocialMemoryManager` class
|
||||||
|
- Add new emotions by extending `EmotionType` and updating `_map_emotion_to_expression()`
|
||||||
|
- Adjust transition curves in `EmotionTransitioner.transition_curves` dict
|
||||||
|
|
||||||
|
## Future Enhancements
|
||||||
|
|
||||||
|
1. Machine learning model for context → emotion prediction
|
||||||
|
2. Voice sentiment analysis to modulate emotion
|
||||||
|
3. Facial expression feedback from /social/faces/expressions
|
||||||
|
4. Multi-person emotional dynamics (ensemble emotion)
|
||||||
|
5. Persistent social memory (database backend)
|
||||||
|
6. Integration with LLM for contextual emotion explanation
|
||||||
@ -0,0 +1,29 @@
|
|||||||
|
/**:
|
||||||
|
ros__parameters:
|
||||||
|
# Personality configuration (0.0–1.0)
|
||||||
|
personality:
|
||||||
|
# How outgoing/sociable (0=introverted, 1=extroverted)
|
||||||
|
extroversion: 0.6
|
||||||
|
# How much the robot enjoys playful interactions
|
||||||
|
playfulness: 0.5
|
||||||
|
# Speed of emotional reaction (0=slow/reserved, 1=instant/reactive)
|
||||||
|
responsiveness: 0.8
|
||||||
|
# Baseline anxiety/caution (0=relaxed, 1=highly anxious)
|
||||||
|
anxiety: 0.3
|
||||||
|
|
||||||
|
# Battery thresholds
|
||||||
|
battery_warning_threshold: 0.25 # 25% - mild concern
|
||||||
|
battery_critical_threshold: 0.10 # 10% - high concern
|
||||||
|
|
||||||
|
# Update frequency
|
||||||
|
update_rate_hz: 10.0
|
||||||
|
|
||||||
|
# Transition timing
|
||||||
|
emotion_transition_duration_normal: 0.8 # seconds
|
||||||
|
emotion_transition_duration_urgent: 0.3 # for critical states
|
||||||
|
emotion_transition_duration_relax: 1.2 # for returning to neutral
|
||||||
|
|
||||||
|
# Idle behavior timing
|
||||||
|
idle_blink_interval: 3.0 # seconds
|
||||||
|
idle_look_around_interval: 8.0 # seconds
|
||||||
|
idle_return_to_neutral_delay: 10.0 # seconds with no interaction
|
||||||
@ -0,0 +1,36 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
from launch import LaunchDescription
|
||||||
|
from launch_ros.actions import Node
|
||||||
|
from launch.substitutions import PathJoinSubstitution
|
||||||
|
from launch_ros.substitutions import FindPackageShare
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
def generate_launch_description():
|
||||||
|
"""Launch emotion engine node with configuration."""
|
||||||
|
|
||||||
|
# Get the package directory
|
||||||
|
package_share_dir = FindPackageShare("saltybot_emotion_engine")
|
||||||
|
config_file = PathJoinSubstitution(
|
||||||
|
[package_share_dir, "config", "emotion_engine.yaml"]
|
||||||
|
)
|
||||||
|
|
||||||
|
# Emotion engine node
|
||||||
|
emotion_engine_node = Node(
|
||||||
|
package="saltybot_emotion_engine",
|
||||||
|
executable="emotion_engine",
|
||||||
|
name="emotion_engine",
|
||||||
|
output="screen",
|
||||||
|
parameters=[config_file],
|
||||||
|
remappings=[
|
||||||
|
# Remap topic names if needed
|
||||||
|
("/saltybot/battery", "/saltybot/battery_level"),
|
||||||
|
("/saltybot/balance_stable", "/saltybot/balance_status"),
|
||||||
|
],
|
||||||
|
on_exit_event_handlers=[], # Could add custom handlers
|
||||||
|
)
|
||||||
|
|
||||||
|
return LaunchDescription([
|
||||||
|
emotion_engine_node,
|
||||||
|
])
|
||||||
29
jetson/ros2_ws/src/saltybot_emotion_engine/package.xml
Normal file
29
jetson/ros2_ws/src/saltybot_emotion_engine/package.xml
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
<?xml version="1.0"?>
|
||||||
|
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||||
|
<package format="3">
|
||||||
|
<name>saltybot_emotion_engine</name>
|
||||||
|
<version>0.1.0</version>
|
||||||
|
<description>
|
||||||
|
Context-aware facial expression and emotion selection engine.
|
||||||
|
Subscribes to orchestrator state, battery, balance, person tracking, voice commands, and health.
|
||||||
|
Maps robot state to emotions with smooth transitions, idle behaviors, and social awareness.
|
||||||
|
Publishes emotion state with personality-aware expression selection (Issue #429).
|
||||||
|
</description>
|
||||||
|
<maintainer email="seb@vayrette.com">seb</maintainer>
|
||||||
|
<license>MIT</license>
|
||||||
|
|
||||||
|
<depend>rclpy</depend>
|
||||||
|
<depend>std_msgs</depend>
|
||||||
|
<depend>geometry_msgs</depend>
|
||||||
|
<depend>saltybot_social_msgs</depend>
|
||||||
|
<depend>ament_index_python</depend>
|
||||||
|
|
||||||
|
<test_depend>ament_copyright</test_depend>
|
||||||
|
<test_depend>ament_flake8</test_depend>
|
||||||
|
<test_depend>ament_pep257</test_depend>
|
||||||
|
<test_depend>python3-pytest</test_depend>
|
||||||
|
|
||||||
|
<export>
|
||||||
|
<build_type>ament_python</build_type>
|
||||||
|
</export>
|
||||||
|
</package>
|
||||||
@ -0,0 +1,569 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import math
|
||||||
|
import time
|
||||||
|
from enum import Enum
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from typing import Optional, Dict, List
|
||||||
|
from collections import deque
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
|
||||||
|
from std_msgs.msg import String, Float32, Bool
|
||||||
|
from geometry_msgs.msg import Pose
|
||||||
|
from saltybot_social_msgs.msg import (
|
||||||
|
VoiceCommand,
|
||||||
|
PersonState,
|
||||||
|
PersonStateArray,
|
||||||
|
Expression,
|
||||||
|
PersonalityState,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class EmotionType(Enum):
|
||||||
|
"""Core emotion types for facial expression selection."""
|
||||||
|
NEUTRAL = "neutral"
|
||||||
|
HAPPY = "happy"
|
||||||
|
CURIOUS = "curious"
|
||||||
|
CONCERNED = "concerned"
|
||||||
|
EXCITED = "excited"
|
||||||
|
CONFUSED = "confused"
|
||||||
|
TIRED = "tired"
|
||||||
|
PLAYFUL = "playful"
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class EmotionState:
|
||||||
|
"""Internal state of robot emotion system."""
|
||||||
|
primary_emotion: EmotionType = EmotionType.NEUTRAL
|
||||||
|
intensity: float = 0.5 # 0.0 = minimal, 1.0 = extreme
|
||||||
|
confidence: float = 0.5 # 0.0 = uncertain, 1.0 = certain
|
||||||
|
context: str = "" # e.g., "person_interacting", "low_battery", "idle"
|
||||||
|
triggered_by: str = "" # e.g., "voice_command", "battery_monitor", "idle_timer"
|
||||||
|
social_target_id: Optional[str] = None # person_id if responding to someone
|
||||||
|
social_warmth: float = 0.5 # 0.0 = cold, 1.0 = warm (familiarity with target)
|
||||||
|
last_update_time: float = 0.0
|
||||||
|
transition_start_time: float = 0.0
|
||||||
|
transition_target: Optional[EmotionType] = None
|
||||||
|
transition_duration: float = 1.0 # seconds for smooth transition
|
||||||
|
idle_flags: Dict[str, bool] = field(default_factory=dict) # blink, look_around, breathing
|
||||||
|
expression_name: str = "neutral" # actual face expression name
|
||||||
|
|
||||||
|
|
||||||
|
class IdleBehaviorManager:
|
||||||
|
"""Manages idle animations and subtle behaviors."""
|
||||||
|
|
||||||
|
def __init__(self, node_logger):
|
||||||
|
self.logger = node_logger
|
||||||
|
self.blink_interval = 3.0 # seconds
|
||||||
|
self.look_around_interval = 8.0
|
||||||
|
self.breathing_phase = 0.0
|
||||||
|
self.last_blink_time = time.time()
|
||||||
|
self.last_look_around_time = time.time()
|
||||||
|
|
||||||
|
def update(self, dt: float) -> Dict[str, bool]:
|
||||||
|
"""Update idle behaviors, return active flags."""
|
||||||
|
current_time = time.time()
|
||||||
|
flags = {}
|
||||||
|
|
||||||
|
# Blink behavior: ~30% of the time
|
||||||
|
if current_time - self.last_blink_time > self.blink_interval:
|
||||||
|
flags["blink"] = True
|
||||||
|
self.last_blink_time = current_time
|
||||||
|
self.blink_interval = 3.0 + (hash(current_time) % 100) / 100.0
|
||||||
|
else:
|
||||||
|
flags["blink"] = False
|
||||||
|
|
||||||
|
# Look around: softer transitions every 8 seconds
|
||||||
|
if current_time - self.last_look_around_time > self.look_around_interval:
|
||||||
|
flags["look_around"] = True
|
||||||
|
self.last_look_around_time = current_time
|
||||||
|
self.look_around_interval = 8.0 + (hash(current_time) % 200) / 100.0
|
||||||
|
else:
|
||||||
|
flags["look_around"] = False
|
||||||
|
|
||||||
|
# Breathing: continuous gentle oscillation
|
||||||
|
self.breathing_phase = (self.breathing_phase + dt * 0.5) % (2 * math.pi)
|
||||||
|
breathing_intensity = (math.sin(self.breathing_phase) + 1.0) / 2.0
|
||||||
|
flags["breathing"] = breathing_intensity > 0.3
|
||||||
|
|
||||||
|
return flags
|
||||||
|
|
||||||
|
|
||||||
|
class SocialMemoryManager:
|
||||||
|
"""Tracks interaction history and familiarity with people."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.interactions: Dict[str, Dict] = {}
|
||||||
|
self.max_history = 100
|
||||||
|
self.recent_interactions = deque(maxlen=self.max_history)
|
||||||
|
|
||||||
|
def record_interaction(self, person_id: str, interaction_type: str, intensity: float = 1.0):
|
||||||
|
"""Record an interaction with a person."""
|
||||||
|
if person_id not in self.interactions:
|
||||||
|
self.interactions[person_id] = {
|
||||||
|
"count": 0,
|
||||||
|
"warmth": 0.5,
|
||||||
|
"last_interaction": 0.0,
|
||||||
|
"positive_interactions": 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
entry = self.interactions[person_id]
|
||||||
|
entry["count"] += 1
|
||||||
|
entry["last_interaction"] = time.time()
|
||||||
|
|
||||||
|
if intensity > 0.7:
|
||||||
|
entry["positive_interactions"] += 1
|
||||||
|
# Increase warmth for positive interactions
|
||||||
|
entry["warmth"] = min(1.0, entry["warmth"] + 0.05)
|
||||||
|
else:
|
||||||
|
# Slight decrease for negative interactions
|
||||||
|
entry["warmth"] = max(0.3, entry["warmth"] - 0.02)
|
||||||
|
|
||||||
|
self.recent_interactions.append((person_id, interaction_type, time.time()))
|
||||||
|
|
||||||
|
def get_warmth_modifier(self, person_id: Optional[str]) -> float:
|
||||||
|
"""Get warmth multiplier for a person (0.5 = neutral, 1.0 = familiar, 0.3 = stranger)."""
|
||||||
|
if not person_id or person_id not in self.interactions:
|
||||||
|
return 0.5
|
||||||
|
|
||||||
|
return self.interactions[person_id].get("warmth", 0.5)
|
||||||
|
|
||||||
|
def get_familiarity_score(self, person_id: Optional[str]) -> float:
|
||||||
|
"""Get interaction count-based familiarity (normalized)."""
|
||||||
|
if not person_id or person_id not in self.interactions:
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
count = self.interactions[person_id].get("count", 0)
|
||||||
|
return min(1.0, count / 10.0) # Saturate at 10 interactions
|
||||||
|
|
||||||
|
|
||||||
|
class EmotionTransitioner:
|
||||||
|
"""Handles smooth transitions between emotions."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.transition_curves = {
|
||||||
|
(EmotionType.NEUTRAL, EmotionType.EXCITED): "ease_out",
|
||||||
|
(EmotionType.NEUTRAL, EmotionType.CONCERNED): "ease_in",
|
||||||
|
(EmotionType.EXCITED, EmotionType.NEUTRAL): "ease_in",
|
||||||
|
(EmotionType.CONCERNED, EmotionType.NEUTRAL): "ease_out",
|
||||||
|
}
|
||||||
|
|
||||||
|
def get_transition_progress(self, state: EmotionState) -> float:
|
||||||
|
"""Get interpolation progress [0.0, 1.0]."""
|
||||||
|
if not state.transition_target:
|
||||||
|
return 1.0
|
||||||
|
|
||||||
|
elapsed = time.time() - state.transition_start_time
|
||||||
|
progress = min(1.0, elapsed / state.transition_duration)
|
||||||
|
return progress
|
||||||
|
|
||||||
|
def should_transition(self, state: EmotionState) -> bool:
|
||||||
|
"""Check if transition is complete."""
|
||||||
|
if not state.transition_target:
|
||||||
|
return False
|
||||||
|
return self.get_transition_progress(state) >= 1.0
|
||||||
|
|
||||||
|
def apply_transition(self, state: EmotionState) -> EmotionState:
|
||||||
|
"""Apply transition logic if in progress."""
|
||||||
|
if not self.should_transition(state):
|
||||||
|
return state
|
||||||
|
|
||||||
|
# Transition complete
|
||||||
|
state.primary_emotion = state.transition_target
|
||||||
|
state.transition_target = None
|
||||||
|
state.confidence = min(1.0, state.confidence + 0.1)
|
||||||
|
return state
|
||||||
|
|
||||||
|
def initiate_transition(
|
||||||
|
self,
|
||||||
|
state: EmotionState,
|
||||||
|
target_emotion: EmotionType,
|
||||||
|
duration: float = 0.8,
|
||||||
|
) -> EmotionState:
|
||||||
|
"""Start a smooth transition to new emotion."""
|
||||||
|
if state.primary_emotion == target_emotion:
|
||||||
|
return state
|
||||||
|
|
||||||
|
state.transition_target = target_emotion
|
||||||
|
state.transition_start_time = time.time()
|
||||||
|
state.transition_duration = duration
|
||||||
|
state.confidence = max(0.3, state.confidence - 0.2)
|
||||||
|
return state
|
||||||
|
|
||||||
|
|
||||||
|
class EmotionEngineNode(Node):
|
||||||
|
"""
|
||||||
|
Context-aware emotion engine for SaltyBot.
|
||||||
|
|
||||||
|
Subscribes to:
|
||||||
|
- /social/voice_command (reactive to speech)
|
||||||
|
- /social/person_state (person tracking for social context)
|
||||||
|
- /social/personality/state (personality/mood context)
|
||||||
|
- /saltybot/battery (low battery detection)
|
||||||
|
- /saltybot/balance (balance/stability concerns)
|
||||||
|
- /diagnostics (health monitoring)
|
||||||
|
|
||||||
|
Publishes:
|
||||||
|
- /saltybot/emotion_state (current emotion + metadata)
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__("saltybot_emotion_engine")
|
||||||
|
|
||||||
|
# Configuration parameters
|
||||||
|
self.declare_parameter("personality.extroversion", 0.6)
|
||||||
|
self.declare_parameter("personality.playfulness", 0.5)
|
||||||
|
self.declare_parameter("personality.responsiveness", 0.8)
|
||||||
|
self.declare_parameter("personality.anxiety", 0.3)
|
||||||
|
self.declare_parameter("battery_warning_threshold", 0.25)
|
||||||
|
self.declare_parameter("battery_critical_threshold", 0.10)
|
||||||
|
self.declare_parameter("update_rate_hz", 10.0)
|
||||||
|
|
||||||
|
# QoS for reliable topic communication
|
||||||
|
qos = QoSProfile(
|
||||||
|
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||||
|
history=HistoryPolicy.KEEP_LAST,
|
||||||
|
depth=5,
|
||||||
|
)
|
||||||
|
|
||||||
|
# State tracking
|
||||||
|
self.emotion_state = EmotionState()
|
||||||
|
self.last_emotion_state = EmotionState()
|
||||||
|
self.battery_level = 0.5
|
||||||
|
self.balance_stable = True
|
||||||
|
self.people_present: Dict[str, PersonState] = {}
|
||||||
|
self.voice_command_cooldown = 0.0
|
||||||
|
self.idle_timer = 0.0
|
||||||
|
self.system_health = "nominal"
|
||||||
|
|
||||||
|
# Managers
|
||||||
|
self.idle_manager = IdleBehaviorManager(self.get_logger())
|
||||||
|
self.social_memory = SocialMemoryManager()
|
||||||
|
self.transitioner = EmotionTransitioner()
|
||||||
|
|
||||||
|
# Subscriptions
|
||||||
|
self.voice_sub = self.create_subscription(
|
||||||
|
VoiceCommand,
|
||||||
|
"/social/voice_command",
|
||||||
|
self.voice_command_callback,
|
||||||
|
qos,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.person_state_sub = self.create_subscription(
|
||||||
|
PersonStateArray,
|
||||||
|
"/social/person_state",
|
||||||
|
self.person_state_callback,
|
||||||
|
qos,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.personality_sub = self.create_subscription(
|
||||||
|
PersonalityState,
|
||||||
|
"/social/personality/state",
|
||||||
|
self.personality_callback,
|
||||||
|
qos,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.battery_sub = self.create_subscription(
|
||||||
|
Float32,
|
||||||
|
"/saltybot/battery",
|
||||||
|
self.battery_callback,
|
||||||
|
qos,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.balance_sub = self.create_subscription(
|
||||||
|
Bool,
|
||||||
|
"/saltybot/balance_stable",
|
||||||
|
self.balance_callback,
|
||||||
|
qos,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.health_sub = self.create_subscription(
|
||||||
|
String,
|
||||||
|
"/saltybot/system_health",
|
||||||
|
self.health_callback,
|
||||||
|
qos,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Publisher
|
||||||
|
self.emotion_pub = self.create_publisher(
|
||||||
|
String,
|
||||||
|
"/saltybot/emotion_state",
|
||||||
|
qos,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Main update loop
|
||||||
|
update_rate = self.get_parameter("update_rate_hz").value
|
||||||
|
self.update_timer = self.create_timer(1.0 / update_rate, self.update_callback)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
"Emotion engine initialized: "
|
||||||
|
f"extroversion={self.get_parameter('personality.extroversion').value}, "
|
||||||
|
f"playfulness={self.get_parameter('personality.playfulness').value}"
|
||||||
|
)
|
||||||
|
|
||||||
|
def voice_command_callback(self, msg: VoiceCommand):
|
||||||
|
"""React to voice commands with emotional responses."""
|
||||||
|
self.voice_command_cooldown = 0.5 # Cooldown to prevent rapid re-triggering
|
||||||
|
|
||||||
|
intent = msg.intent
|
||||||
|
confidence = msg.confidence
|
||||||
|
|
||||||
|
# Map command intents to emotions
|
||||||
|
if intent.startswith("nav."):
|
||||||
|
# Navigation commands -> excitement
|
||||||
|
self.emotion_state = self.transitioner.initiate_transition(
|
||||||
|
self.emotion_state,
|
||||||
|
EmotionType.EXCITED,
|
||||||
|
duration=0.6,
|
||||||
|
)
|
||||||
|
self.emotion_state.context = "navigation_command"
|
||||||
|
self.emotion_state.intensity = min(0.9, confidence * 0.8 + 0.3)
|
||||||
|
|
||||||
|
elif intent.startswith("social."):
|
||||||
|
# Social commands -> happy/curious
|
||||||
|
if "remember" in intent or "forget" in intent:
|
||||||
|
self.emotion_state = self.transitioner.initiate_transition(
|
||||||
|
self.emotion_state,
|
||||||
|
EmotionType.CURIOUS,
|
||||||
|
duration=0.8,
|
||||||
|
)
|
||||||
|
self.emotion_state.intensity = 0.6
|
||||||
|
else:
|
||||||
|
self.emotion_state = self.transitioner.initiate_transition(
|
||||||
|
self.emotion_state,
|
||||||
|
EmotionType.HAPPY,
|
||||||
|
duration=0.7,
|
||||||
|
)
|
||||||
|
self.emotion_state.intensity = 0.7
|
||||||
|
|
||||||
|
elif intent == "fallback":
|
||||||
|
# Unrecognized command -> confused
|
||||||
|
self.emotion_state = self.transitioner.initiate_transition(
|
||||||
|
self.emotion_state,
|
||||||
|
EmotionType.CONFUSED,
|
||||||
|
duration=0.5,
|
||||||
|
)
|
||||||
|
self.emotion_state.intensity = min(0.5, confidence)
|
||||||
|
|
||||||
|
self.emotion_state.triggered_by = "voice_command"
|
||||||
|
self.emotion_state.social_target_id = msg.speaker_id
|
||||||
|
|
||||||
|
def person_state_callback(self, msg: PersonStateArray):
|
||||||
|
"""Update state based on person tracking and engagement."""
|
||||||
|
self.people_present.clear()
|
||||||
|
for person_state in msg.person_states:
|
||||||
|
person_id = str(person_state.person_id)
|
||||||
|
self.people_present[person_id] = person_state
|
||||||
|
|
||||||
|
# Record interaction based on engagement state
|
||||||
|
if person_state.state == PersonState.STATE_ENGAGED:
|
||||||
|
self.social_memory.record_interaction(person_id, "engaged", 0.8)
|
||||||
|
elif person_state.state == PersonState.STATE_TALKING:
|
||||||
|
self.social_memory.record_interaction(person_id, "talking", 0.9)
|
||||||
|
elif person_state.state == PersonState.STATE_APPROACHING:
|
||||||
|
self.social_memory.record_interaction(person_id, "approaching", 0.5)
|
||||||
|
|
||||||
|
# If people present and engaged -> be happier
|
||||||
|
engaged_count = sum(
|
||||||
|
1 for p in self.people_present.values()
|
||||||
|
if p.state == PersonState.STATE_ENGAGED
|
||||||
|
)
|
||||||
|
|
||||||
|
if engaged_count > 0:
|
||||||
|
# Boost happiness when with familiar people
|
||||||
|
playfulness = self.get_parameter("personality.playfulness").value
|
||||||
|
if playfulness > 0.6:
|
||||||
|
target_emotion = EmotionType.PLAYFUL
|
||||||
|
else:
|
||||||
|
target_emotion = EmotionType.HAPPY
|
||||||
|
|
||||||
|
if self.emotion_state.primary_emotion != target_emotion:
|
||||||
|
self.emotion_state = self.transitioner.initiate_transition(
|
||||||
|
self.emotion_state,
|
||||||
|
target_emotion,
|
||||||
|
duration=0.9,
|
||||||
|
)
|
||||||
|
self.emotion_state.intensity = 0.7 + (0.3 * playfulness)
|
||||||
|
self.emotion_state.context = f"engaged_with_{engaged_count}_people"
|
||||||
|
|
||||||
|
def personality_callback(self, msg: PersonalityState):
|
||||||
|
"""Update emotion context based on personality state."""
|
||||||
|
# Mood from personality system influences intensity
|
||||||
|
if msg.mood == "playful":
|
||||||
|
self.emotion_state.intensity = min(1.0, self.emotion_state.intensity + 0.1)
|
||||||
|
elif msg.mood == "annoyed":
|
||||||
|
self.emotion_state.intensity = max(0.0, self.emotion_state.intensity - 0.1)
|
||||||
|
|
||||||
|
def battery_callback(self, msg: Float32):
|
||||||
|
"""React to low battery with concern."""
|
||||||
|
self.battery_level = msg.data
|
||||||
|
|
||||||
|
battery_critical = self.get_parameter("battery_critical_threshold").value
|
||||||
|
battery_warning = self.get_parameter("battery_warning_threshold").value
|
||||||
|
|
||||||
|
if self.battery_level < battery_critical:
|
||||||
|
# Critical: very concerned
|
||||||
|
self.emotion_state = self.transitioner.initiate_transition(
|
||||||
|
self.emotion_state,
|
||||||
|
EmotionType.CONCERNED,
|
||||||
|
duration=0.5,
|
||||||
|
)
|
||||||
|
self.emotion_state.intensity = 0.9
|
||||||
|
self.emotion_state.context = "critical_battery"
|
||||||
|
self.emotion_state.triggered_by = "battery_monitor"
|
||||||
|
|
||||||
|
elif self.battery_level < battery_warning:
|
||||||
|
# Warning: mildly concerned
|
||||||
|
if self.emotion_state.primary_emotion == EmotionType.NEUTRAL:
|
||||||
|
self.emotion_state = self.transitioner.initiate_transition(
|
||||||
|
self.emotion_state,
|
||||||
|
EmotionType.CONCERNED,
|
||||||
|
duration=0.8,
|
||||||
|
)
|
||||||
|
self.emotion_state.intensity = max(self.emotion_state.intensity, 0.5)
|
||||||
|
self.emotion_state.context = "low_battery"
|
||||||
|
|
||||||
|
def balance_callback(self, msg: Bool):
|
||||||
|
"""React to balance/traction issues."""
|
||||||
|
self.balance_stable = msg.data
|
||||||
|
|
||||||
|
if not self.balance_stable:
|
||||||
|
# Balance concern
|
||||||
|
self.emotion_state = self.transitioner.initiate_transition(
|
||||||
|
self.emotion_state,
|
||||||
|
EmotionType.CONCERNED,
|
||||||
|
duration=0.4,
|
||||||
|
)
|
||||||
|
self.emotion_state.intensity = 0.8
|
||||||
|
self.emotion_state.context = "balance_unstable"
|
||||||
|
self.emotion_state.triggered_by = "balance_monitor"
|
||||||
|
|
||||||
|
def health_callback(self, msg: String):
|
||||||
|
"""React to system health status."""
|
||||||
|
self.system_health = msg.data
|
||||||
|
|
||||||
|
if msg.data == "degraded":
|
||||||
|
self.emotion_state = self.transitioner.initiate_transition(
|
||||||
|
self.emotion_state,
|
||||||
|
EmotionType.CONCERNED,
|
||||||
|
duration=0.7,
|
||||||
|
)
|
||||||
|
self.emotion_state.intensity = 0.6
|
||||||
|
self.emotion_state.context = "system_degraded"
|
||||||
|
|
||||||
|
def update_callback(self):
|
||||||
|
"""Main update loop."""
|
||||||
|
current_time = time.time()
|
||||||
|
dt = 1.0 / self.get_parameter("update_rate_hz").value
|
||||||
|
|
||||||
|
# Update transitions
|
||||||
|
self.emotion_state = self.transitioner.apply_transition(self.emotion_state)
|
||||||
|
|
||||||
|
# Update idle behaviors
|
||||||
|
self.emotion_state.idle_flags = self.idle_manager.update(dt)
|
||||||
|
|
||||||
|
# Cooldown voice commands
|
||||||
|
self.voice_command_cooldown = max(0.0, self.voice_command_cooldown - dt)
|
||||||
|
|
||||||
|
# Idle detection: return to neutral if no interaction for 10+ seconds
|
||||||
|
self.idle_timer += dt
|
||||||
|
if (
|
||||||
|
self.voice_command_cooldown <= 0
|
||||||
|
and not self.people_present
|
||||||
|
and self.idle_timer > 10.0
|
||||||
|
and self.emotion_state.primary_emotion != EmotionType.NEUTRAL
|
||||||
|
):
|
||||||
|
self.emotion_state = self.transitioner.initiate_transition(
|
||||||
|
self.emotion_state,
|
||||||
|
EmotionType.NEUTRAL,
|
||||||
|
duration=1.2,
|
||||||
|
)
|
||||||
|
self.emotion_state.context = "idle"
|
||||||
|
self.idle_timer = 0.0
|
||||||
|
|
||||||
|
if self.voice_command_cooldown > 0:
|
||||||
|
self.idle_timer = 0.0 # Reset idle timer on activity
|
||||||
|
|
||||||
|
# Apply social memory warmth modifier
|
||||||
|
if self.emotion_state.social_target_id and self.emotion_state.primary_emotion == EmotionType.HAPPY:
|
||||||
|
warmth = self.social_memory.get_warmth_modifier(self.emotion_state.social_target_id)
|
||||||
|
self.emotion_state.social_warmth = warmth
|
||||||
|
self.emotion_state.intensity = self.emotion_state.intensity * (0.8 + warmth * 0.2)
|
||||||
|
|
||||||
|
# Update last timestamp
|
||||||
|
self.emotion_state.last_update_time = current_time
|
||||||
|
|
||||||
|
# Map emotion to expression name
|
||||||
|
self.emotion_state.expression_name = self._map_emotion_to_expression()
|
||||||
|
|
||||||
|
# Publish emotion state
|
||||||
|
self._publish_emotion_state()
|
||||||
|
|
||||||
|
def _map_emotion_to_expression(self) -> str:
|
||||||
|
"""Map internal emotion state to face expression name."""
|
||||||
|
emotion = self.emotion_state.primary_emotion
|
||||||
|
intensity = self.emotion_state.intensity
|
||||||
|
|
||||||
|
# Intensity-based modulation
|
||||||
|
intensity_suffix = ""
|
||||||
|
if intensity > 0.7:
|
||||||
|
intensity_suffix = "_intense"
|
||||||
|
elif intensity < 0.3:
|
||||||
|
intensity_suffix = "_subtle"
|
||||||
|
|
||||||
|
base_mapping = {
|
||||||
|
EmotionType.NEUTRAL: "neutral",
|
||||||
|
EmotionType.HAPPY: "happy",
|
||||||
|
EmotionType.CURIOUS: "curious",
|
||||||
|
EmotionType.EXCITED: "surprised", # Use surprised for excitement
|
||||||
|
EmotionType.CONCERNED: "sad", # Concern maps to sad expression
|
||||||
|
EmotionType.CONFUSED: "confused",
|
||||||
|
EmotionType.TIRED: "sad",
|
||||||
|
EmotionType.PLAYFUL: "happy",
|
||||||
|
}
|
||||||
|
|
||||||
|
base = base_mapping.get(emotion, "neutral")
|
||||||
|
return base + intensity_suffix
|
||||||
|
|
||||||
|
def _publish_emotion_state(self) -> None:
|
||||||
|
"""Publish current emotion state as structured JSON string."""
|
||||||
|
import json
|
||||||
|
|
||||||
|
state_dict = {
|
||||||
|
"emotion": self.emotion_state.primary_emotion.value,
|
||||||
|
"intensity": float(self.emotion_state.intensity),
|
||||||
|
"confidence": float(self.emotion_state.confidence),
|
||||||
|
"expression": self.emotion_state.expression_name,
|
||||||
|
"context": self.emotion_state.context,
|
||||||
|
"triggered_by": self.emotion_state.triggered_by,
|
||||||
|
"social_target_id": self.emotion_state.social_target_id,
|
||||||
|
"social_warmth": float(self.emotion_state.social_warmth),
|
||||||
|
"idle_flags": self.emotion_state.idle_flags,
|
||||||
|
"timestamp": self.emotion_state.last_update_time,
|
||||||
|
"battery_level": float(self.battery_level),
|
||||||
|
"balance_stable": self.balance_stable,
|
||||||
|
"system_health": self.system_health,
|
||||||
|
}
|
||||||
|
|
||||||
|
msg = String()
|
||||||
|
msg.data = json.dumps(state_dict)
|
||||||
|
self.emotion_pub.publish(msg)
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = EmotionEngineNode()
|
||||||
|
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
4
jetson/ros2_ws/src/saltybot_emotion_engine/setup.cfg
Normal file
4
jetson/ros2_ws/src/saltybot_emotion_engine/setup.cfg
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
[develop]
|
||||||
|
script_dir=$base/lib/saltybot_emotion_engine
|
||||||
|
[install]
|
||||||
|
install_lib=$base/lib/saltybot_emotion_engine
|
||||||
32
jetson/ros2_ws/src/saltybot_emotion_engine/setup.py
Normal file
32
jetson/ros2_ws/src/saltybot_emotion_engine/setup.py
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
from setuptools import setup
|
||||||
|
import os
|
||||||
|
from glob import glob
|
||||||
|
|
||||||
|
package_name = 'saltybot_emotion_engine'
|
||||||
|
|
||||||
|
setup(
|
||||||
|
name=package_name,
|
||||||
|
version='0.1.0',
|
||||||
|
packages=[package_name],
|
||||||
|
data_files=[
|
||||||
|
('share/ament_index/resource_index/packages',
|
||||||
|
['resource/' + package_name]),
|
||||||
|
('share/' + package_name, ['package.xml']),
|
||||||
|
(os.path.join('share', package_name, 'launch'),
|
||||||
|
glob('launch/*.py')),
|
||||||
|
(os.path.join('share', package_name, 'config'),
|
||||||
|
glob('config/*.yaml')),
|
||||||
|
],
|
||||||
|
install_requires=['setuptools'],
|
||||||
|
zip_safe=True,
|
||||||
|
maintainer='seb',
|
||||||
|
maintainer_email='seb@vayrette.com',
|
||||||
|
description='Context-aware emotion engine with state-to-expression mapping and social awareness',
|
||||||
|
license='MIT',
|
||||||
|
tests_require=['pytest'],
|
||||||
|
entry_points={
|
||||||
|
'console_scripts': [
|
||||||
|
'emotion_engine = saltybot_emotion_engine.emotion_engine_node:main',
|
||||||
|
],
|
||||||
|
},
|
||||||
|
)
|
||||||
@ -0,0 +1,29 @@
|
|||||||
|
gamepad_teleop:
|
||||||
|
device: /dev/input/js0
|
||||||
|
|
||||||
|
# Deadzone for analog sticks (0.0 - 1.0)
|
||||||
|
deadzone: 0.1
|
||||||
|
|
||||||
|
# Velocity limits
|
||||||
|
max_linear_vel: 2.0 # m/s
|
||||||
|
max_angular_vel: 2.0 # rad/s
|
||||||
|
|
||||||
|
# Speed multiplier limits (L2/R2 triggers)
|
||||||
|
min_speed_mult: 0.3 # 30% with L2
|
||||||
|
max_speed_mult: 1.0 # 100% with R2
|
||||||
|
|
||||||
|
# Pan-tilt servo limits (D-pad manual control)
|
||||||
|
pan_step: 5.0 # degrees per d-pad press
|
||||||
|
tilt_step: 5.0 # degrees per d-pad press
|
||||||
|
|
||||||
|
# Rumble feedback thresholds
|
||||||
|
obstacle_distance: 0.5 # m, below this triggers warning rumble
|
||||||
|
low_battery_voltage: 18.0 # V, below this triggers alert rumble
|
||||||
|
|
||||||
|
# Topic names
|
||||||
|
topics:
|
||||||
|
cmd_vel: /cmd_vel
|
||||||
|
teleop_active: /saltybot/teleop_active
|
||||||
|
obstacle_feedback: /saltybot/obstacle_distance
|
||||||
|
battery_voltage: /saltybot/battery_voltage
|
||||||
|
pan_tilt_command: /saltybot/pan_tilt_command
|
||||||
@ -0,0 +1,23 @@
|
|||||||
|
"""Launch PS5 DualSense gamepad teleoperation node."""
|
||||||
|
|
||||||
|
from launch import LaunchDescription
|
||||||
|
from launch_ros.actions import Node
|
||||||
|
from ament_index_python.packages import get_package_share_directory
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
def generate_launch_description():
|
||||||
|
"""Generate ROS2 launch description for gamepad teleop."""
|
||||||
|
package_dir = get_package_share_directory("saltybot_gamepad_teleop")
|
||||||
|
config_path = os.path.join(package_dir, "config", "gamepad_config.yaml")
|
||||||
|
|
||||||
|
gamepad_node = Node(
|
||||||
|
package="saltybot_gamepad_teleop",
|
||||||
|
executable="gamepad_teleop_node",
|
||||||
|
name="gamepad_teleop_node",
|
||||||
|
output="screen",
|
||||||
|
parameters=[config_path],
|
||||||
|
remappings=[],
|
||||||
|
)
|
||||||
|
|
||||||
|
return LaunchDescription([gamepad_node])
|
||||||
32
jetson/ros2_ws/src/saltybot_gamepad_teleop/package.xml
Normal file
32
jetson/ros2_ws/src/saltybot_gamepad_teleop/package.xml
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
<?xml version="1.0"?>
|
||||||
|
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||||
|
<package format="3">
|
||||||
|
<name>saltybot_gamepad_teleop</name>
|
||||||
|
<version>0.1.0</version>
|
||||||
|
<description>
|
||||||
|
PS5 DualSense Bluetooth gamepad teleoperation for SaltyBot.
|
||||||
|
Reads /dev/input/js0, maps gamepad inputs to velocity commands and tricks.
|
||||||
|
Left stick: linear velocity, Right stick: angular velocity.
|
||||||
|
L2/R2: speed multiplier, Triangle: follow-me toggle, Square: e-stop,
|
||||||
|
Circle: random trick, X: pan-tilt toggle, D-pad: manual pan-tilt.
|
||||||
|
Provides rumble feedback for obstacles and low battery.
|
||||||
|
</description>
|
||||||
|
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
|
||||||
|
<license>MIT</license>
|
||||||
|
|
||||||
|
<depend>rclpy</depend>
|
||||||
|
<depend>geometry_msgs</depend>
|
||||||
|
<depend>sensor_msgs</depend>
|
||||||
|
<depend>std_msgs</depend>
|
||||||
|
|
||||||
|
<buildtool_depend>ament_python</buildtool_depend>
|
||||||
|
|
||||||
|
<test_depend>ament_copyright</test_depend>
|
||||||
|
<test_depend>ament_flake8</test_depend>
|
||||||
|
<test_depend>ament_pep257</test_depend>
|
||||||
|
<test_depend>python3-pytest</test_depend>
|
||||||
|
|
||||||
|
<export>
|
||||||
|
<build_type>ament_python</build_type>
|
||||||
|
</export>
|
||||||
|
</package>
|
||||||
Binary file not shown.
@ -0,0 +1,284 @@
|
|||||||
|
"""
|
||||||
|
gamepad_teleop_node.py — PS5 DualSense Bluetooth gamepad teleoperation for SaltyBot.
|
||||||
|
|
||||||
|
Reads from /dev/input/js0 (gamepad input device) and maps to velocity commands.
|
||||||
|
Publishes /cmd_vel (Twist) and /saltybot/teleop_active (Bool) for autonomous override.
|
||||||
|
|
||||||
|
Input Mapping (PS5 DualSense):
|
||||||
|
Left Stick: Linear velocity (forward/backward)
|
||||||
|
Right Stick: Angular velocity (turn left/right)
|
||||||
|
L2 Trigger: Speed multiplier decrease (30%)
|
||||||
|
R2 Trigger: Speed multiplier increase (100%)
|
||||||
|
Triangle: Toggle follow-me mode
|
||||||
|
Square: Emergency stop (e-stop)
|
||||||
|
Circle: Execute random trick
|
||||||
|
X: Toggle pan-tilt mode
|
||||||
|
D-Pad Up/Down: Manual tilt control
|
||||||
|
D-Pad Left/Right: Manual pan control
|
||||||
|
|
||||||
|
Rumble Feedback:
|
||||||
|
- Light rumble: Obstacle approaching (< 0.5 m)
|
||||||
|
- Heavy rumble: Low battery (< 18 V)
|
||||||
|
"""
|
||||||
|
|
||||||
|
import struct
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from geometry_msgs.msg import Twist
|
||||||
|
from std_msgs.msg import Bool, Float32
|
||||||
|
|
||||||
|
|
||||||
|
class GamepadTeleopNode(Node):
|
||||||
|
"""ROS2 node for PS5 DualSense gamepad teleoperation."""
|
||||||
|
|
||||||
|
# Button indices (JS_EVENT_BUTTON)
|
||||||
|
BTN_SQUARE = 0
|
||||||
|
BTN_X = 1
|
||||||
|
BTN_CIRCLE = 2
|
||||||
|
BTN_TRIANGLE = 3
|
||||||
|
BTN_L1 = 4
|
||||||
|
BTN_R1 = 5
|
||||||
|
BTN_L2_DIGITAL = 6
|
||||||
|
BTN_R2_DIGITAL = 7
|
||||||
|
BTN_SHARE = 8
|
||||||
|
BTN_OPTIONS = 9
|
||||||
|
BTN_L3 = 10
|
||||||
|
BTN_R3 = 11
|
||||||
|
BTN_PS = 12
|
||||||
|
BTN_TOUCHPAD = 13
|
||||||
|
|
||||||
|
# Axis indices (JS_EVENT_AXIS)
|
||||||
|
AXIS_LX = 0 # Left stick X
|
||||||
|
AXIS_LY = 1 # Left stick Y
|
||||||
|
AXIS_RX = 2 # Right stick X
|
||||||
|
AXIS_RY = 3 # Right stick Y
|
||||||
|
AXIS_L2_ANALOG = 4 # L2 trigger analog
|
||||||
|
AXIS_R2_ANALOG = 5 # R2 trigger analog
|
||||||
|
AXIS_DPAD_X = 6 # D-pad horizontal
|
||||||
|
AXIS_DPAD_Y = 7 # D-pad vertical
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
"""Initialize gamepad teleop node."""
|
||||||
|
super().__init__("gamepad_teleop_node")
|
||||||
|
|
||||||
|
# Declare parameters
|
||||||
|
self.declare_parameter("device", "/dev/input/js0")
|
||||||
|
self.declare_parameter("deadzone", 0.1)
|
||||||
|
self.declare_parameter("max_linear_vel", 2.0)
|
||||||
|
self.declare_parameter("max_angular_vel", 2.0)
|
||||||
|
self.declare_parameter("min_speed_mult", 0.3)
|
||||||
|
self.declare_parameter("max_speed_mult", 1.0)
|
||||||
|
self.declare_parameter("pan_step", 5.0)
|
||||||
|
self.declare_parameter("tilt_step", 5.0)
|
||||||
|
self.declare_parameter("obstacle_distance", 0.5)
|
||||||
|
self.declare_parameter("low_battery_voltage", 18.0)
|
||||||
|
|
||||||
|
# Get parameters
|
||||||
|
self.device = self.get_parameter("device").value
|
||||||
|
self.deadzone = self.get_parameter("deadzone").value
|
||||||
|
self.max_linear_vel = self.get_parameter("max_linear_vel").value
|
||||||
|
self.max_angular_vel = self.get_parameter("max_angular_vel").value
|
||||||
|
self.min_speed_mult = self.get_parameter("min_speed_mult").value
|
||||||
|
self.max_speed_mult = self.get_parameter("max_speed_mult").value
|
||||||
|
self.pan_step = self.get_parameter("pan_step").value
|
||||||
|
self.tilt_step = self.get_parameter("tilt_step").value
|
||||||
|
self.obstacle_distance_threshold = self.get_parameter("obstacle_distance").value
|
||||||
|
self.low_battery_threshold = self.get_parameter("low_battery_voltage").value
|
||||||
|
|
||||||
|
# Publishers
|
||||||
|
self.cmd_vel_pub = self.create_publisher(Twist, "/cmd_vel", 1)
|
||||||
|
self.teleop_active_pub = self.create_publisher(Bool, "/saltybot/teleop_active", 1)
|
||||||
|
self.pan_tilt_pan_pub = self.create_publisher(Float32, "/saltybot/pan_tilt_command/pan", 1)
|
||||||
|
self.pan_tilt_tilt_pub = self.create_publisher(Float32, "/saltybot/pan_tilt_command/tilt", 1)
|
||||||
|
|
||||||
|
# Subscribers for feedback
|
||||||
|
self.create_subscription(Float32, "/saltybot/obstacle_distance", self._obstacle_callback, 1)
|
||||||
|
self.create_subscription(Float32, "/saltybot/battery_voltage", self._battery_callback, 1)
|
||||||
|
|
||||||
|
# State variables
|
||||||
|
self.axes = [0.0] * 8 # 8 analog axes
|
||||||
|
self.buttons = [False] * 14 # 14 buttons
|
||||||
|
self.speed_mult = 1.0
|
||||||
|
self.follow_me_active = False
|
||||||
|
self.pan_tilt_active = False
|
||||||
|
self.teleop_enabled = True
|
||||||
|
self.last_cmd_vel_time = time.time()
|
||||||
|
|
||||||
|
# Feedback state
|
||||||
|
self.last_obstacle_distance = float('inf')
|
||||||
|
self.last_battery_voltage = 24.0
|
||||||
|
self.rumble_active = False
|
||||||
|
|
||||||
|
# Thread management
|
||||||
|
self.device_fd = None
|
||||||
|
self.reading = False
|
||||||
|
self.reader_thread = None
|
||||||
|
|
||||||
|
self.get_logger().info(f"Gamepad Teleop Node initialized. Listening on {self.device}")
|
||||||
|
self.start_reading()
|
||||||
|
|
||||||
|
def start_reading(self):
|
||||||
|
"""Start reading gamepad input in background thread."""
|
||||||
|
self.reading = True
|
||||||
|
self.reader_thread = threading.Thread(target=self._read_gamepad, daemon=True)
|
||||||
|
self.reader_thread.start()
|
||||||
|
|
||||||
|
def stop_reading(self):
|
||||||
|
"""Stop reading gamepad input."""
|
||||||
|
self.reading = False
|
||||||
|
if self.device_fd:
|
||||||
|
try:
|
||||||
|
self.device_fd.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def _read_gamepad(self):
|
||||||
|
"""Read gamepad events from /dev/input/jsX."""
|
||||||
|
try:
|
||||||
|
self.device_fd = open(self.device, "rb")
|
||||||
|
self.get_logger().info(f"Opened gamepad device: {self.device}")
|
||||||
|
except OSError as e:
|
||||||
|
self.get_logger().error(f"Failed to open gamepad device {self.device}: {e}")
|
||||||
|
return
|
||||||
|
|
||||||
|
while self.reading:
|
||||||
|
try:
|
||||||
|
# JS_EVENT structure: time (4B), value (2B), type (1B), number (1B)
|
||||||
|
event_data = self.device_fd.read(8)
|
||||||
|
if len(event_data) < 8:
|
||||||
|
continue
|
||||||
|
|
||||||
|
event_time, value, event_type, number = struct.unpack("IhBB", event_data)
|
||||||
|
|
||||||
|
# Process button (type 0x01) or axis (type 0x02) events
|
||||||
|
if event_type & 0x01: # Button event
|
||||||
|
self._handle_button(number, value)
|
||||||
|
elif event_type & 0x02: # Axis event
|
||||||
|
self._handle_axis(number, value)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.get_logger().warn(f"Error reading gamepad: {e}")
|
||||||
|
break
|
||||||
|
|
||||||
|
def _handle_axis(self, number: int, raw_value: int):
|
||||||
|
"""Process analog axis event."""
|
||||||
|
# Normalize to -1.0 to 1.0
|
||||||
|
normalized = raw_value / 32767.0
|
||||||
|
self.axes[number] = normalized
|
||||||
|
|
||||||
|
# Apply deadzone
|
||||||
|
if abs(normalized) < self.deadzone:
|
||||||
|
self.axes[number] = 0.0
|
||||||
|
|
||||||
|
self._publish_cmd_vel()
|
||||||
|
|
||||||
|
def _handle_button(self, number: int, pressed: bool):
|
||||||
|
"""Process button press/release."""
|
||||||
|
self.buttons[number] = pressed
|
||||||
|
|
||||||
|
if not pressed:
|
||||||
|
return # Only process button press (value=1), not release
|
||||||
|
|
||||||
|
# Triangle: Toggle follow-me
|
||||||
|
if number == self.BTN_TRIANGLE:
|
||||||
|
self.follow_me_active = not self.follow_me_active
|
||||||
|
self.get_logger().info(f"Follow-me: {self.follow_me_active}")
|
||||||
|
|
||||||
|
# Square: E-stop
|
||||||
|
elif number == self.BTN_SQUARE:
|
||||||
|
self.teleop_enabled = False
|
||||||
|
self._publish_cmd_vel()
|
||||||
|
self.get_logger().warn("E-STOP activated")
|
||||||
|
|
||||||
|
# Circle: Random trick
|
||||||
|
elif number == self.BTN_CIRCLE:
|
||||||
|
self.get_logger().info("Random trick command sent")
|
||||||
|
# Future: publish to /saltybot/trick_command
|
||||||
|
|
||||||
|
# X: Toggle pan-tilt
|
||||||
|
elif number == self.BTN_X:
|
||||||
|
self.pan_tilt_active = not self.pan_tilt_active
|
||||||
|
self.get_logger().info(f"Pan-tilt mode: {self.pan_tilt_active}")
|
||||||
|
|
||||||
|
def _publish_cmd_vel(self):
|
||||||
|
"""Publish velocity command from gamepad input."""
|
||||||
|
if not self.teleop_enabled:
|
||||||
|
# Publish zero velocity
|
||||||
|
twist = Twist()
|
||||||
|
self.cmd_vel_pub.publish(twist)
|
||||||
|
self.teleop_active_pub.publish(Bool(data=False))
|
||||||
|
return
|
||||||
|
|
||||||
|
# Get stick inputs
|
||||||
|
lx = self.axes[self.AXIS_LX]
|
||||||
|
ly = -self.axes[self.AXIS_LY] # Invert Y for forward = positive
|
||||||
|
rx = self.axes[self.AXIS_RX]
|
||||||
|
|
||||||
|
# Speed multiplier from triggers
|
||||||
|
l2 = max(0.0, self.axes[self.AXIS_L2_ANALOG])
|
||||||
|
r2 = max(0.0, self.axes[self.AXIS_R2_ANALOG])
|
||||||
|
self.speed_mult = self.min_speed_mult + (r2 - l2) * (self.max_speed_mult - self.min_speed_mult)
|
||||||
|
self.speed_mult = max(self.min_speed_mult, min(self.max_speed_mult, self.speed_mult))
|
||||||
|
|
||||||
|
# Calculate velocities
|
||||||
|
linear_vel = ly * self.max_linear_vel * self.speed_mult
|
||||||
|
angular_vel = rx * self.max_angular_vel * self.speed_mult
|
||||||
|
|
||||||
|
# Publish cmd_vel
|
||||||
|
twist = Twist()
|
||||||
|
twist.linear.x = linear_vel
|
||||||
|
twist.angular.z = angular_vel
|
||||||
|
self.cmd_vel_pub.publish(twist)
|
||||||
|
|
||||||
|
# Publish teleop_active flag
|
||||||
|
self.teleop_active_pub.publish(Bool(data=True))
|
||||||
|
|
||||||
|
# Handle pan-tilt from D-pad
|
||||||
|
if self.pan_tilt_active:
|
||||||
|
dpad_x = self.axes[self.AXIS_DPAD_X]
|
||||||
|
dpad_y = self.axes[self.AXIS_DPAD_Y]
|
||||||
|
|
||||||
|
if dpad_x != 0: # Left/Right
|
||||||
|
pan_cmd = Float32(data=float(dpad_x) * self.pan_step)
|
||||||
|
self.pan_tilt_pan_pub.publish(pan_cmd)
|
||||||
|
|
||||||
|
if dpad_y != 0: # Up/Down
|
||||||
|
tilt_cmd = Float32(data=float(dpad_y) * self.tilt_step)
|
||||||
|
self.pan_tilt_tilt_pub.publish(tilt_cmd)
|
||||||
|
|
||||||
|
self.last_cmd_vel_time = time.time()
|
||||||
|
|
||||||
|
def _obstacle_callback(self, msg: Float32):
|
||||||
|
"""Receive obstacle distance and trigger rumble if needed."""
|
||||||
|
self.last_obstacle_distance = msg.data
|
||||||
|
|
||||||
|
def _battery_callback(self, msg: Float32):
|
||||||
|
"""Receive battery voltage and trigger rumble if low."""
|
||||||
|
self.last_battery_voltage = msg.data
|
||||||
|
|
||||||
|
def destroy_node(self):
|
||||||
|
"""Clean up on shutdown."""
|
||||||
|
self.stop_reading()
|
||||||
|
super().destroy_node()
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
"""Main entry point."""
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = GamepadTeleopNode()
|
||||||
|
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
5
jetson/ros2_ws/src/saltybot_gamepad_teleop/setup.cfg
Normal file
5
jetson/ros2_ws/src/saltybot_gamepad_teleop/setup.cfg
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
[develop]
|
||||||
|
script_dir=$base/lib/saltybot_gamepad_teleop
|
||||||
|
[egg_info]
|
||||||
|
tag_build =
|
||||||
|
tag_date = 0
|
||||||
30
jetson/ros2_ws/src/saltybot_gamepad_teleop/setup.py
Normal file
30
jetson/ros2_ws/src/saltybot_gamepad_teleop/setup.py
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
from setuptools import setup
|
||||||
|
|
||||||
|
package_name = "saltybot_gamepad_teleop"
|
||||||
|
|
||||||
|
setup(
|
||||||
|
name=package_name,
|
||||||
|
version="0.1.0",
|
||||||
|
packages=[package_name],
|
||||||
|
data_files=[
|
||||||
|
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
|
||||||
|
(f"share/{package_name}", ["package.xml"]),
|
||||||
|
(f"share/{package_name}/launch", ["launch/gamepad_teleop.launch.py"]),
|
||||||
|
(f"share/{package_name}/config", ["config/gamepad_config.yaml"]),
|
||||||
|
],
|
||||||
|
install_requires=["setuptools"],
|
||||||
|
zip_safe=True,
|
||||||
|
maintainer="sl-controls",
|
||||||
|
maintainer_email="sl-controls@saltylab.local",
|
||||||
|
description=(
|
||||||
|
"PS5 DualSense Bluetooth gamepad teleoperation with rumble feedback "
|
||||||
|
"for SaltyBot autonomous override"
|
||||||
|
),
|
||||||
|
license="MIT",
|
||||||
|
tests_require=["pytest"],
|
||||||
|
entry_points={
|
||||||
|
"console_scripts": [
|
||||||
|
"gamepad_teleop_node = saltybot_gamepad_teleop.gamepad_teleop_node:main",
|
||||||
|
],
|
||||||
|
},
|
||||||
|
)
|
||||||
Loading…
x
Reference in New Issue
Block a user