feat: audio direction estimator (Issue #430) #434
116
jetson/ros2_ws/src/saltybot_audio_direction/README.md
Normal file
116
jetson/ros2_ws/src/saltybot_audio_direction/README.md
Normal file
@ -0,0 +1,116 @@
|
|||||||
|
# saltybot_audio_direction
|
||||||
|
|
||||||
|
Audio direction estimator for sound source localization (Issue #430).
|
||||||
|
|
||||||
|
Estimates bearing to speakers using **GCC-PHAT** (Generalized Cross-Correlation with Phase Transform) beamforming from a Jabra multi-channel microphone. Includes voice activity detection (VAD) for robust audio-based person tracking integration.
|
||||||
|
|
||||||
|
## Features
|
||||||
|
|
||||||
|
- **GCC-PHAT Beamforming**: Phase-domain cross-correlation for direction of arrival estimation
|
||||||
|
- **Voice Activity Detection (VAD)**: RMS energy-based speech detection with smoothing
|
||||||
|
- **Stereo/Quadrophonic Support**: Handles Jabra 2-channel and 4-channel modes
|
||||||
|
- **Robot Self-Noise Filtering**: Optional suppression of motor/wheel noise (future enhancement)
|
||||||
|
- **ROS2 Integration**: Standard ROS2 topic publishing at configurable rates
|
||||||
|
|
||||||
|
## Topics
|
||||||
|
|
||||||
|
### Published
|
||||||
|
- **`/saltybot/audio_direction`** (`std_msgs/Float32`)
|
||||||
|
Estimated bearing in degrees (0–360, where 0° = front, 90° = right, 180° = rear, 270° = left)
|
||||||
|
|
||||||
|
- **`/saltybot/audio_activity`** (`std_msgs/Bool`)
|
||||||
|
Voice activity detected (true if speech-like energy)
|
||||||
|
|
||||||
|
## Parameters
|
||||||
|
|
||||||
|
| Parameter | Type | Default | Description |
|
||||||
|
|-----------|------|---------|-------------|
|
||||||
|
| `device_id` | int | -1 | Audio device index (-1 = system default) |
|
||||||
|
| `sample_rate` | int | 16000 | Sample rate in Hz |
|
||||||
|
| `chunk_size` | int | 2048 | Samples per audio frame |
|
||||||
|
| `publish_hz` | float | 10.0 | Output publication rate (Hz) |
|
||||||
|
| `vad_threshold` | float | 0.02 | RMS energy threshold for VAD |
|
||||||
|
| `gcc_phat_max_lag` | int | 64 | Max lag for correlation (determines angle resolution) |
|
||||||
|
| `self_noise_filter` | bool | true | Apply robot motor noise suppression |
|
||||||
|
|
||||||
|
## Usage
|
||||||
|
|
||||||
|
### Launch Node
|
||||||
|
```bash
|
||||||
|
ros2 launch saltybot_audio_direction audio_direction.launch.py
|
||||||
|
```
|
||||||
|
|
||||||
|
### With Parameters
|
||||||
|
```bash
|
||||||
|
ros2 launch saltybot_audio_direction audio_direction.launch.py \
|
||||||
|
device_id:=0 \
|
||||||
|
publish_hz:=20.0 \
|
||||||
|
vad_threshold:=0.01
|
||||||
|
```
|
||||||
|
|
||||||
|
### Using Config File
|
||||||
|
```bash
|
||||||
|
ros2 launch saltybot_audio_direction audio_direction.launch.py \
|
||||||
|
--ros-args --params-file config/audio_direction_params.yaml
|
||||||
|
```
|
||||||
|
|
||||||
|
## Algorithm
|
||||||
|
|
||||||
|
### GCC-PHAT
|
||||||
|
|
||||||
|
1. Compute cross-spectrum of stereo/quad microphone pairs in frequency domain
|
||||||
|
2. Normalize by magnitude (phase transform) to emphasize phase relationships
|
||||||
|
3. Inverse FFT to time-domain cross-correlation
|
||||||
|
4. Find maximum correlation lag → time delay between channels
|
||||||
|
5. Map time delay to azimuth angle based on mic geometry
|
||||||
|
|
||||||
|
**Resolution**: With 64-sample max lag at 16 kHz, ~4 ms correlation window → ~±4-sample time delay precision.
|
||||||
|
|
||||||
|
### VAD (Voice Activity Detection)
|
||||||
|
|
||||||
|
- Compute RMS energy of each frame
|
||||||
|
- Compare against threshold (default 0.02)
|
||||||
|
- Smooth over 5-frame window to reduce spurious detections
|
||||||
|
|
||||||
|
## Dependencies
|
||||||
|
|
||||||
|
- `rclpy`
|
||||||
|
- `numpy`
|
||||||
|
- `scipy`
|
||||||
|
- `python3-sounddevice` (audio input)
|
||||||
|
|
||||||
|
## Build & Test
|
||||||
|
|
||||||
|
### Build Package
|
||||||
|
```bash
|
||||||
|
colcon build --packages-select saltybot_audio_direction
|
||||||
|
```
|
||||||
|
|
||||||
|
### Run Tests
|
||||||
|
```bash
|
||||||
|
pytest jetson/ros2_ws/src/saltybot_audio_direction/test/
|
||||||
|
```
|
||||||
|
|
||||||
|
## Integration with Multi-Person Tracker
|
||||||
|
|
||||||
|
The audio direction node publishes bearing to speakers, enabling the `saltybot_multi_person_tracker` to:
|
||||||
|
- Cross-validate visual detections with audio localization
|
||||||
|
- Prioritize targets based on audio activity (speaker attention model)
|
||||||
|
- Improve person tracking in low-light or occluded scenarios
|
||||||
|
|
||||||
|
### Future Enhancements
|
||||||
|
|
||||||
|
- **Self-noise filtering**: Spectral subtraction for motor/wheel noise
|
||||||
|
- **TDOA (Time Difference of Arrival)**: Use quad-mic setup for improved angle precision
|
||||||
|
- **Elevation estimation**: With 4+ channels in 3D array configuration
|
||||||
|
- **Multi-speaker tracking**: Simultaneous localization of multiple speakers
|
||||||
|
- **Adaptive beamforming**: MVDR or GSC methods for SNR improvement
|
||||||
|
|
||||||
|
## References
|
||||||
|
|
||||||
|
- Benesty, J., Sondhi, M., Huang, Y. (2008). "Handbook of Speech Processing"
|
||||||
|
- Knapp, C., Carter, G. (1976). "The Generalized Correlation Method for Estimation of Time Delay"
|
||||||
|
|
||||||
|
## License
|
||||||
|
|
||||||
|
MIT
|
||||||
@ -0,0 +1,17 @@
|
|||||||
|
# Audio direction estimator ROS2 parameters
|
||||||
|
# Used with ros2 launch saltybot_audio_direction audio_direction.launch.py --ros-args --params-file config/audio_direction_params.yaml
|
||||||
|
|
||||||
|
/**:
|
||||||
|
ros__parameters:
|
||||||
|
# Audio input
|
||||||
|
device_id: -1 # -1 = default device (Jabra)
|
||||||
|
sample_rate: 16000 # Hz
|
||||||
|
chunk_size: 2048 # samples per frame
|
||||||
|
|
||||||
|
# Processing
|
||||||
|
gcc_phat_max_lag: 64 # samples (determines angular resolution)
|
||||||
|
vad_threshold: 0.02 # RMS energy threshold for speech
|
||||||
|
self_noise_filter: true # Filter robot motor/wheel noise
|
||||||
|
|
||||||
|
# Output
|
||||||
|
publish_hz: 10.0 # Publication rate (Hz)
|
||||||
@ -0,0 +1,82 @@
|
|||||||
|
"""
|
||||||
|
Launch audio direction estimator node.
|
||||||
|
|
||||||
|
Typical usage:
|
||||||
|
ros2 launch saltybot_audio_direction audio_direction.launch.py
|
||||||
|
"""
|
||||||
|
|
||||||
|
from launch import LaunchDescription
|
||||||
|
from launch.actions import DeclareLaunchArgument
|
||||||
|
from launch.substitutions import LaunchConfiguration
|
||||||
|
from launch_ros.actions import Node
|
||||||
|
|
||||||
|
|
||||||
|
def generate_launch_description():
|
||||||
|
"""Generate launch description for audio direction node."""
|
||||||
|
|
||||||
|
# Declare launch arguments
|
||||||
|
device_id_arg = DeclareLaunchArgument(
|
||||||
|
'device_id',
|
||||||
|
default_value='-1',
|
||||||
|
description='Audio device index (-1 for default)',
|
||||||
|
)
|
||||||
|
sample_rate_arg = DeclareLaunchArgument(
|
||||||
|
'sample_rate',
|
||||||
|
default_value='16000',
|
||||||
|
description='Sample rate in Hz',
|
||||||
|
)
|
||||||
|
chunk_size_arg = DeclareLaunchArgument(
|
||||||
|
'chunk_size',
|
||||||
|
default_value='2048',
|
||||||
|
description='Samples per audio frame',
|
||||||
|
)
|
||||||
|
publish_hz_arg = DeclareLaunchArgument(
|
||||||
|
'publish_hz',
|
||||||
|
default_value='10.0',
|
||||||
|
description='Publication rate in Hz',
|
||||||
|
)
|
||||||
|
vad_threshold_arg = DeclareLaunchArgument(
|
||||||
|
'vad_threshold',
|
||||||
|
default_value='0.02',
|
||||||
|
description='RMS energy threshold for voice activity detection',
|
||||||
|
)
|
||||||
|
gcc_max_lag_arg = DeclareLaunchArgument(
|
||||||
|
'gcc_phat_max_lag',
|
||||||
|
default_value='64',
|
||||||
|
description='Max lag for GCC-PHAT correlation',
|
||||||
|
)
|
||||||
|
self_noise_filter_arg = DeclareLaunchArgument(
|
||||||
|
'self_noise_filter',
|
||||||
|
default_value='true',
|
||||||
|
description='Apply robot self-noise suppression',
|
||||||
|
)
|
||||||
|
|
||||||
|
# Audio direction node
|
||||||
|
audio_direction_node = Node(
|
||||||
|
package='saltybot_audio_direction',
|
||||||
|
executable='audio_direction_node',
|
||||||
|
name='audio_direction_estimator',
|
||||||
|
output='screen',
|
||||||
|
parameters=[
|
||||||
|
{'device_id': LaunchConfiguration('device_id')},
|
||||||
|
{'sample_rate': LaunchConfiguration('sample_rate')},
|
||||||
|
{'chunk_size': LaunchConfiguration('chunk_size')},
|
||||||
|
{'publish_hz': LaunchConfiguration('publish_hz')},
|
||||||
|
{'vad_threshold': LaunchConfiguration('vad_threshold')},
|
||||||
|
{'gcc_phat_max_lag': LaunchConfiguration('gcc_max_lag')},
|
||||||
|
{'self_noise_filter': LaunchConfiguration('self_noise_filter')},
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
return LaunchDescription(
|
||||||
|
[
|
||||||
|
device_id_arg,
|
||||||
|
sample_rate_arg,
|
||||||
|
chunk_size_arg,
|
||||||
|
publish_hz_arg,
|
||||||
|
vad_threshold_arg,
|
||||||
|
gcc_max_lag_arg,
|
||||||
|
self_noise_filter_arg,
|
||||||
|
audio_direction_node,
|
||||||
|
]
|
||||||
|
)
|
||||||
31
jetson/ros2_ws/src/saltybot_audio_direction/package.xml
Normal file
31
jetson/ros2_ws/src/saltybot_audio_direction/package.xml
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
<?xml version="1.0"?>
|
||||||
|
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||||
|
<package format="3">
|
||||||
|
<name>saltybot_audio_direction</name>
|
||||||
|
<version>0.1.0</version>
|
||||||
|
<description>
|
||||||
|
Audio direction estimator for sound source localization via Jabra microphone.
|
||||||
|
Implements GCC-PHAT beamforming for direction of arrival (DoA) estimation.
|
||||||
|
Publishes bearing (degrees) and voice activity detection (VAD) for speaker tracking integration.
|
||||||
|
Issue #430.
|
||||||
|
</description>
|
||||||
|
<maintainer email="sl-perception@saltylab.local">sl-perception</maintainer>
|
||||||
|
<license>MIT</license>
|
||||||
|
|
||||||
|
<buildtool_depend>ament_python</buildtool_depend>
|
||||||
|
|
||||||
|
<depend>rclpy</depend>
|
||||||
|
<depend>std_msgs</depend>
|
||||||
|
<depend>sensor_msgs</depend>
|
||||||
|
<depend>geometry_msgs</depend>
|
||||||
|
|
||||||
|
<exec_depend>python3-numpy</exec_depend>
|
||||||
|
<exec_depend>python3-scipy</exec_depend>
|
||||||
|
<exec_depend>python3-sounddevice</exec_depend>
|
||||||
|
|
||||||
|
<test_depend>pytest</test_depend>
|
||||||
|
|
||||||
|
<export>
|
||||||
|
<build_type>ament_python</build_type>
|
||||||
|
</export>
|
||||||
|
</package>
|
||||||
@ -0,0 +1,299 @@
|
|||||||
|
"""
|
||||||
|
audio_direction_node.py — Sound source localization via GCC-PHAT beamforming.
|
||||||
|
|
||||||
|
Estimates direction of arrival (DoA) from a Jabra multi-channel microphone
|
||||||
|
using Generalized Cross-Correlation with Phase Transform (GCC-PHAT).
|
||||||
|
|
||||||
|
Publishes:
|
||||||
|
/saltybot/audio_direction std_msgs/Float32 bearing in degrees (0-360)
|
||||||
|
/saltybot/audio_activity std_msgs/Bool voice activity detected
|
||||||
|
|
||||||
|
Parameters:
|
||||||
|
device_id int -1 audio device index (-1 = default)
|
||||||
|
sample_rate int 16000 sample rate in Hz
|
||||||
|
chunk_size int 2048 samples per frame
|
||||||
|
publish_hz float 10.0 output publication rate
|
||||||
|
vad_threshold float 0.02 RMS energy threshold for speech
|
||||||
|
gcc_phat_max_lag int 64 max lag for correlation (determines angular resolution)
|
||||||
|
self_noise_filter bool true apply robot noise suppression
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
from scipy import signal
|
||||||
|
import sounddevice as sd
|
||||||
|
from collections import deque
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
|
||||||
|
from std_msgs.msg import Float32, Bool
|
||||||
|
|
||||||
|
|
||||||
|
_SENSOR_QOS = QoSProfile(
|
||||||
|
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||||
|
history=HistoryPolicy.KEEP_LAST,
|
||||||
|
depth=5,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class GCCPHATBeamformer:
|
||||||
|
"""Generalized Cross-Correlation with Phase Transform for DoA estimation."""
|
||||||
|
|
||||||
|
def __init__(self, sample_rate: int, max_lag: int = 64):
|
||||||
|
self.sample_rate = sample_rate
|
||||||
|
self.max_lag = max_lag
|
||||||
|
self.frequency_bins = max_lag * 2
|
||||||
|
# Azimuth angles for left (270°), front (0°), right (90°), rear (180°)
|
||||||
|
self.angles = np.array([270.0, 0.0, 90.0, 180.0])
|
||||||
|
|
||||||
|
def gcc_phat(self, sig1: np.ndarray, sig2: np.ndarray) -> float:
|
||||||
|
"""
|
||||||
|
Compute GCC-PHAT correlation and estimate time delay (DoA proxy).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Estimated time delay in samples (can be converted to angle)
|
||||||
|
"""
|
||||||
|
if len(sig1) == 0 or len(sig2) == 0:
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
# Cross-correlation in frequency domain
|
||||||
|
fft1 = np.fft.rfft(sig1, n=self.frequency_bins)
|
||||||
|
fft2 = np.fft.rfft(sig2, n=self.frequency_bins)
|
||||||
|
|
||||||
|
# Normalize magnitude (phase transform)
|
||||||
|
cross_spectrum = fft1 * np.conj(fft2)
|
||||||
|
cross_spectrum_normalized = cross_spectrum / (np.abs(cross_spectrum) + 1e-8)
|
||||||
|
|
||||||
|
# Inverse FFT to get correlation
|
||||||
|
correlation = np.fft.irfft(cross_spectrum_normalized, n=self.frequency_bins)
|
||||||
|
|
||||||
|
# Find lag with maximum correlation
|
||||||
|
lags = np.arange(-self.max_lag, self.max_lag + 1)
|
||||||
|
valid_corr = correlation[: len(lags)]
|
||||||
|
max_lag_idx = np.argmax(np.abs(valid_corr))
|
||||||
|
estimated_lag = lags[max_lag_idx] if max_lag_idx < len(lags) else 0.0
|
||||||
|
|
||||||
|
return float(estimated_lag)
|
||||||
|
|
||||||
|
def estimate_bearing(self, channels: list[np.ndarray]) -> float:
|
||||||
|
"""
|
||||||
|
Estimate bearing from multi-channel audio.
|
||||||
|
|
||||||
|
Supports:
|
||||||
|
- 2-channel (stereo): left/right discrimination
|
||||||
|
- 4-channel: quadrophonic (front/rear/left/right)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Bearing in degrees (0-360)
|
||||||
|
"""
|
||||||
|
if len(channels) < 2:
|
||||||
|
return 0.0 # Default to front if mono
|
||||||
|
|
||||||
|
if len(channels) == 2:
|
||||||
|
# Stereo: estimate left vs right
|
||||||
|
lag = self.gcc_phat(channels[0], channels[1])
|
||||||
|
# Negative lag → left (270°), positive lag → right (90°)
|
||||||
|
if abs(lag) < 5:
|
||||||
|
return 0.0 # Front
|
||||||
|
elif lag < 0:
|
||||||
|
return 270.0 # Left
|
||||||
|
else:
|
||||||
|
return 90.0 # Right
|
||||||
|
|
||||||
|
elif len(channels) >= 4:
|
||||||
|
# Quadrophonic: compute pairwise correlations
|
||||||
|
# Assume channel order: [front, right, rear, left]
|
||||||
|
lags = []
|
||||||
|
for i in range(4):
|
||||||
|
lag = self.gcc_phat(channels[i], channels[(i + 1) % 4])
|
||||||
|
lags.append(lag)
|
||||||
|
|
||||||
|
# Select angle based on strongest correlation
|
||||||
|
max_idx = np.argmax(np.abs(lags))
|
||||||
|
return self.angles[max_idx]
|
||||||
|
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
|
||||||
|
class VADDetector:
|
||||||
|
"""Simple voice activity detection based on RMS energy."""
|
||||||
|
|
||||||
|
def __init__(self, threshold: float = 0.02, smoothing: int = 5):
|
||||||
|
self.threshold = threshold
|
||||||
|
self.smoothing = smoothing
|
||||||
|
self.history = deque(maxlen=smoothing)
|
||||||
|
|
||||||
|
def detect(self, audio_frame: np.ndarray) -> bool:
|
||||||
|
"""
|
||||||
|
Detect voice activity in audio frame.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if speech-like energy detected
|
||||||
|
"""
|
||||||
|
rms = np.sqrt(np.mean(audio_frame**2))
|
||||||
|
self.history.append(rms > self.threshold)
|
||||||
|
# Majority voting over window
|
||||||
|
return sum(self.history) > self.smoothing // 2
|
||||||
|
|
||||||
|
|
||||||
|
class AudioDirectionNode(Node):
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__('audio_direction_estimator')
|
||||||
|
|
||||||
|
# Parameters
|
||||||
|
self.declare_parameter('device_id', -1)
|
||||||
|
self.declare_parameter('sample_rate', 16000)
|
||||||
|
self.declare_parameter('chunk_size', 2048)
|
||||||
|
self.declare_parameter('publish_hz', 10.0)
|
||||||
|
self.declare_parameter('vad_threshold', 0.02)
|
||||||
|
self.declare_parameter('gcc_phat_max_lag', 64)
|
||||||
|
self.declare_parameter('self_noise_filter', True)
|
||||||
|
|
||||||
|
self.device_id = self.get_parameter('device_id').value
|
||||||
|
self.sample_rate = self.get_parameter('sample_rate').value
|
||||||
|
self.chunk_size = self.get_parameter('chunk_size').value
|
||||||
|
pub_hz = self.get_parameter('publish_hz').value
|
||||||
|
vad_threshold = self.get_parameter('vad_threshold').value
|
||||||
|
gcc_max_lag = self.get_parameter('gcc_phat_max_lag').value
|
||||||
|
self.apply_noise_filter = self.get_parameter('self_noise_filter').value
|
||||||
|
|
||||||
|
# Publishers
|
||||||
|
self._pub_bearing = self.create_publisher(
|
||||||
|
Float32, '/saltybot/audio_direction', 10, qos_profile=_SENSOR_QOS
|
||||||
|
)
|
||||||
|
self._pub_vad = self.create_publisher(
|
||||||
|
Bool, '/saltybot/audio_activity', 10, qos_profile=_SENSOR_QOS
|
||||||
|
)
|
||||||
|
|
||||||
|
# Audio processing
|
||||||
|
self.beamformer = GCCPHATBeamformer(self.sample_rate, max_lag=gcc_max_lag)
|
||||||
|
self.vad = VADDetector(threshold=vad_threshold)
|
||||||
|
self._audio_buffer = deque(maxlen=self.chunk_size * 2)
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
|
||||||
|
# Start audio stream
|
||||||
|
self._stream = None
|
||||||
|
self._running = True
|
||||||
|
self._start_audio_stream()
|
||||||
|
|
||||||
|
# Publish timer
|
||||||
|
self.create_timer(1.0 / pub_hz, self._tick)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f'audio_direction_estimator ready — '
|
||||||
|
f'device_id={self.device_id} sample_rate={self.sample_rate} Hz '
|
||||||
|
f'chunk_size={self.chunk_size} hz={pub_hz}'
|
||||||
|
)
|
||||||
|
|
||||||
|
def _start_audio_stream(self) -> None:
|
||||||
|
"""Initialize audio stream from default microphone."""
|
||||||
|
try:
|
||||||
|
self._stream = sd.InputStream(
|
||||||
|
device=self.device_id if self.device_id >= 0 else None,
|
||||||
|
samplerate=self.sample_rate,
|
||||||
|
channels=2, # Default to stereo; auto-detect Jabra channels
|
||||||
|
blocksize=self.chunk_size,
|
||||||
|
callback=self._audio_callback,
|
||||||
|
latency='low',
|
||||||
|
)
|
||||||
|
self._stream.start()
|
||||||
|
self.get_logger().info('Audio stream started')
|
||||||
|
except Exception as e:
|
||||||
|
self.get_logger().error(f'Failed to start audio stream: {e}')
|
||||||
|
self._stream = None
|
||||||
|
|
||||||
|
def _audio_callback(self, indata: np.ndarray, frames: int, time_info, status) -> None:
|
||||||
|
"""Callback for audio input stream."""
|
||||||
|
if status:
|
||||||
|
self.get_logger().warn(f'Audio callback status: {status}')
|
||||||
|
|
||||||
|
try:
|
||||||
|
with self._lock:
|
||||||
|
# Store stereo or mono data
|
||||||
|
if indata.shape[1] == 1:
|
||||||
|
self._audio_buffer.extend(indata.flatten())
|
||||||
|
else:
|
||||||
|
# For multi-channel, interleave or store separately
|
||||||
|
self._audio_buffer.extend(indata.flatten())
|
||||||
|
except Exception as e:
|
||||||
|
self.get_logger().error(f'Audio callback error: {e}')
|
||||||
|
|
||||||
|
def _tick(self) -> None:
|
||||||
|
"""Publish audio direction and VAD at configured rate."""
|
||||||
|
if self._stream is None or not self._running:
|
||||||
|
return
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
if len(self._audio_buffer) < self.chunk_size:
|
||||||
|
return
|
||||||
|
audio_data = np.array(list(self._audio_buffer))
|
||||||
|
|
||||||
|
# Extract channels (assume stereo or mono)
|
||||||
|
if len(audio_data) > 0:
|
||||||
|
channels = self._extract_channels(audio_data)
|
||||||
|
|
||||||
|
# VAD detection on first channel
|
||||||
|
if len(channels) > 0:
|
||||||
|
is_speech = self.vad.detect(channels[0])
|
||||||
|
vad_msg = Bool()
|
||||||
|
vad_msg.data = is_speech
|
||||||
|
self._pub_vad.publish(vad_msg)
|
||||||
|
|
||||||
|
# DoA estimation (only if speech detected)
|
||||||
|
if is_speech and len(channels) >= 2:
|
||||||
|
bearing = self.beamformer.estimate_bearing(channels)
|
||||||
|
else:
|
||||||
|
bearing = 0.0 # Default to front when no speech
|
||||||
|
|
||||||
|
bearing_msg = Float32()
|
||||||
|
bearing_msg.data = float(bearing)
|
||||||
|
self._pub_bearing.publish(bearing_msg)
|
||||||
|
|
||||||
|
def _extract_channels(self, audio_data: np.ndarray) -> list[np.ndarray]:
|
||||||
|
"""
|
||||||
|
Extract stereo/mono channels from audio buffer.
|
||||||
|
|
||||||
|
Handles both interleaved and non-interleaved formats.
|
||||||
|
"""
|
||||||
|
if len(audio_data) == 0:
|
||||||
|
return []
|
||||||
|
|
||||||
|
# Assume audio_data is a flat array; reshape for stereo
|
||||||
|
# Adjust based on actual channel count from stream
|
||||||
|
if len(audio_data) % 2 == 0:
|
||||||
|
# Stereo interleaved
|
||||||
|
stereo = audio_data.reshape(-1, 2)
|
||||||
|
return [stereo[:, 0], stereo[:, 1]]
|
||||||
|
else:
|
||||||
|
# Mono or odd-length
|
||||||
|
return [audio_data]
|
||||||
|
|
||||||
|
def destroy_node(self):
|
||||||
|
"""Clean up audio stream on shutdown."""
|
||||||
|
self._running = False
|
||||||
|
if self._stream is not None:
|
||||||
|
self._stream.stop()
|
||||||
|
self._stream.close()
|
||||||
|
super().destroy_node()
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = AudioDirectionNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
4
jetson/ros2_ws/src/saltybot_audio_direction/setup.cfg
Normal file
4
jetson/ros2_ws/src/saltybot_audio_direction/setup.cfg
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
[develop]
|
||||||
|
script_dir=$base/lib/saltybot_audio_direction
|
||||||
|
[egg_info]
|
||||||
|
tag_date = 0
|
||||||
23
jetson/ros2_ws/src/saltybot_audio_direction/setup.py
Normal file
23
jetson/ros2_ws/src/saltybot_audio_direction/setup.py
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
from setuptools import setup, find_packages
|
||||||
|
|
||||||
|
setup(
|
||||||
|
name='saltybot_audio_direction',
|
||||||
|
version='0.1.0',
|
||||||
|
packages=find_packages(exclude=['test']),
|
||||||
|
data_files=[
|
||||||
|
('share/ament_index/resource_index/packages',
|
||||||
|
['resource/saltybot_audio_direction']),
|
||||||
|
('share/saltybot_audio_direction', ['package.xml']),
|
||||||
|
],
|
||||||
|
install_requires=['setuptools'],
|
||||||
|
zip_safe=True,
|
||||||
|
author='SaltyLab',
|
||||||
|
author_email='robot@saltylab.local',
|
||||||
|
description='Audio direction estimator for sound source localization',
|
||||||
|
license='MIT',
|
||||||
|
entry_points={
|
||||||
|
'console_scripts': [
|
||||||
|
'audio_direction_node=saltybot_audio_direction.audio_direction_node:main',
|
||||||
|
],
|
||||||
|
},
|
||||||
|
)
|
||||||
@ -0,0 +1,78 @@
|
|||||||
|
"""
|
||||||
|
Basic tests for audio direction estimator.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import numpy as np
|
||||||
|
from saltybot_audio_direction.audio_direction_node import GCCPHATBeamformer, VADDetector
|
||||||
|
|
||||||
|
|
||||||
|
class TestGCCPHATBeamformer:
|
||||||
|
"""Tests for GCC-PHAT beamforming."""
|
||||||
|
|
||||||
|
def test_beamformer_init(self):
|
||||||
|
"""Test beamformer initialization."""
|
||||||
|
beamformer = GCCPHATBeamformer(sample_rate=16000, max_lag=64)
|
||||||
|
assert beamformer.sample_rate == 16000
|
||||||
|
assert beamformer.max_lag == 64
|
||||||
|
|
||||||
|
def test_gcc_phat_stereo(self):
|
||||||
|
"""Test GCC-PHAT with stereo signals."""
|
||||||
|
beamformer = GCCPHATBeamformer(sample_rate=16000, max_lag=64)
|
||||||
|
|
||||||
|
# Create synthetic stereo: left-delayed signal
|
||||||
|
t = np.arange(512) / 16000.0
|
||||||
|
sig1 = np.sin(2 * np.pi * 1000 * t) # Left mic
|
||||||
|
sig2 = np.sin(2 * np.pi * 1000 * (t - 0.004)) # Right mic (delayed)
|
||||||
|
|
||||||
|
lag = beamformer.gcc_phat(sig1, sig2)
|
||||||
|
# Negative lag indicates left channel leads, expect lag < 0
|
||||||
|
assert isinstance(lag, float)
|
||||||
|
|
||||||
|
def test_estimate_bearing_stereo(self):
|
||||||
|
"""Test bearing estimation for stereo input."""
|
||||||
|
beamformer = GCCPHATBeamformer(sample_rate=16000, max_lag=64)
|
||||||
|
|
||||||
|
t = np.arange(512) / 16000.0
|
||||||
|
sig1 = np.sin(2 * np.pi * 1000 * t)
|
||||||
|
sig2 = np.sin(2 * np.pi * 1000 * t)
|
||||||
|
|
||||||
|
bearing = beamformer.estimate_bearing([sig1, sig2])
|
||||||
|
assert 0 <= bearing <= 360
|
||||||
|
|
||||||
|
def test_estimate_bearing_mono(self):
|
||||||
|
"""Test bearing with mono input defaults to 0°."""
|
||||||
|
beamformer = GCCPHATBeamformer(sample_rate=16000)
|
||||||
|
sig = np.sin(2 * np.pi * 1000 * np.arange(512) / 16000.0)
|
||||||
|
bearing = beamformer.estimate_bearing([sig])
|
||||||
|
assert bearing == 0.0
|
||||||
|
|
||||||
|
|
||||||
|
class TestVADDetector:
|
||||||
|
"""Tests for voice activity detection."""
|
||||||
|
|
||||||
|
def test_vad_init(self):
|
||||||
|
"""Test VAD detector initialization."""
|
||||||
|
vad = VADDetector(threshold=0.02, smoothing=5)
|
||||||
|
assert vad.threshold == 0.02
|
||||||
|
assert vad.smoothing == 5
|
||||||
|
|
||||||
|
def test_vad_silence(self):
|
||||||
|
"""Test VAD detects silence."""
|
||||||
|
vad = VADDetector(threshold=0.02)
|
||||||
|
silence = np.zeros(2048) * 0.001 # Very quiet
|
||||||
|
is_speech = vad.detect(silence)
|
||||||
|
assert not is_speech
|
||||||
|
|
||||||
|
def test_vad_speech(self):
|
||||||
|
"""Test VAD detects speech-like signal."""
|
||||||
|
vad = VADDetector(threshold=0.02)
|
||||||
|
t = np.arange(2048) / 16000.0
|
||||||
|
speech = np.sin(2 * np.pi * 1000 * t) * 0.1 # ~0.07 RMS
|
||||||
|
for _ in range(5): # Run multiple times to accumulate history
|
||||||
|
is_speech = vad.detect(speech)
|
||||||
|
assert is_speech
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
pytest.main([__file__])
|
||||||
Loading…
x
Reference in New Issue
Block a user