feat: audio direction estimator (Issue #430) #434

Merged
sl-jetson merged 1 commits from sl-perception/issue-430-audio-direction into main 2026-03-05 08:59:09 -05:00
11 changed files with 650 additions and 0 deletions
Showing only changes of commit d7b1366d6c - Show all commits

View File

@ -0,0 +1,116 @@
# saltybot_audio_direction
Audio direction estimator for sound source localization (Issue #430).
Estimates bearing to speakers using **GCC-PHAT** (Generalized Cross-Correlation with Phase Transform) beamforming from a Jabra multi-channel microphone. Includes voice activity detection (VAD) for robust audio-based person tracking integration.
## Features
- **GCC-PHAT Beamforming**: Phase-domain cross-correlation for direction of arrival estimation
- **Voice Activity Detection (VAD)**: RMS energy-based speech detection with smoothing
- **Stereo/Quadrophonic Support**: Handles Jabra 2-channel and 4-channel modes
- **Robot Self-Noise Filtering**: Optional suppression of motor/wheel noise (future enhancement)
- **ROS2 Integration**: Standard ROS2 topic publishing at configurable rates
## Topics
### Published
- **`/saltybot/audio_direction`** (`std_msgs/Float32`)
Estimated bearing in degrees (0360, where 0° = front, 90° = right, 180° = rear, 270° = left)
- **`/saltybot/audio_activity`** (`std_msgs/Bool`)
Voice activity detected (true if speech-like energy)
## Parameters
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `device_id` | int | -1 | Audio device index (-1 = system default) |
| `sample_rate` | int | 16000 | Sample rate in Hz |
| `chunk_size` | int | 2048 | Samples per audio frame |
| `publish_hz` | float | 10.0 | Output publication rate (Hz) |
| `vad_threshold` | float | 0.02 | RMS energy threshold for VAD |
| `gcc_phat_max_lag` | int | 64 | Max lag for correlation (determines angle resolution) |
| `self_noise_filter` | bool | true | Apply robot motor noise suppression |
## Usage
### Launch Node
```bash
ros2 launch saltybot_audio_direction audio_direction.launch.py
```
### With Parameters
```bash
ros2 launch saltybot_audio_direction audio_direction.launch.py \
device_id:=0 \
publish_hz:=20.0 \
vad_threshold:=0.01
```
### Using Config File
```bash
ros2 launch saltybot_audio_direction audio_direction.launch.py \
--ros-args --params-file config/audio_direction_params.yaml
```
## Algorithm
### GCC-PHAT
1. Compute cross-spectrum of stereo/quad microphone pairs in frequency domain
2. Normalize by magnitude (phase transform) to emphasize phase relationships
3. Inverse FFT to time-domain cross-correlation
4. Find maximum correlation lag → time delay between channels
5. Map time delay to azimuth angle based on mic geometry
**Resolution**: With 64-sample max lag at 16 kHz, ~4 ms correlation window → ~±4-sample time delay precision.
### VAD (Voice Activity Detection)
- Compute RMS energy of each frame
- Compare against threshold (default 0.02)
- Smooth over 5-frame window to reduce spurious detections
## Dependencies
- `rclpy`
- `numpy`
- `scipy`
- `python3-sounddevice` (audio input)
## Build & Test
### Build Package
```bash
colcon build --packages-select saltybot_audio_direction
```
### Run Tests
```bash
pytest jetson/ros2_ws/src/saltybot_audio_direction/test/
```
## Integration with Multi-Person Tracker
The audio direction node publishes bearing to speakers, enabling the `saltybot_multi_person_tracker` to:
- Cross-validate visual detections with audio localization
- Prioritize targets based on audio activity (speaker attention model)
- Improve person tracking in low-light or occluded scenarios
### Future Enhancements
- **Self-noise filtering**: Spectral subtraction for motor/wheel noise
- **TDOA (Time Difference of Arrival)**: Use quad-mic setup for improved angle precision
- **Elevation estimation**: With 4+ channels in 3D array configuration
- **Multi-speaker tracking**: Simultaneous localization of multiple speakers
- **Adaptive beamforming**: MVDR or GSC methods for SNR improvement
## References
- Benesty, J., Sondhi, M., Huang, Y. (2008). "Handbook of Speech Processing"
- Knapp, C., Carter, G. (1976). "The Generalized Correlation Method for Estimation of Time Delay"
## License
MIT

View File

@ -0,0 +1,17 @@
# Audio direction estimator ROS2 parameters
# Used with ros2 launch saltybot_audio_direction audio_direction.launch.py --ros-args --params-file config/audio_direction_params.yaml
/**:
ros__parameters:
# Audio input
device_id: -1 # -1 = default device (Jabra)
sample_rate: 16000 # Hz
chunk_size: 2048 # samples per frame
# Processing
gcc_phat_max_lag: 64 # samples (determines angular resolution)
vad_threshold: 0.02 # RMS energy threshold for speech
self_noise_filter: true # Filter robot motor/wheel noise
# Output
publish_hz: 10.0 # Publication rate (Hz)

View File

@ -0,0 +1,82 @@
"""
Launch audio direction estimator node.
Typical usage:
ros2 launch saltybot_audio_direction audio_direction.launch.py
"""
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
"""Generate launch description for audio direction node."""
# Declare launch arguments
device_id_arg = DeclareLaunchArgument(
'device_id',
default_value='-1',
description='Audio device index (-1 for default)',
)
sample_rate_arg = DeclareLaunchArgument(
'sample_rate',
default_value='16000',
description='Sample rate in Hz',
)
chunk_size_arg = DeclareLaunchArgument(
'chunk_size',
default_value='2048',
description='Samples per audio frame',
)
publish_hz_arg = DeclareLaunchArgument(
'publish_hz',
default_value='10.0',
description='Publication rate in Hz',
)
vad_threshold_arg = DeclareLaunchArgument(
'vad_threshold',
default_value='0.02',
description='RMS energy threshold for voice activity detection',
)
gcc_max_lag_arg = DeclareLaunchArgument(
'gcc_phat_max_lag',
default_value='64',
description='Max lag for GCC-PHAT correlation',
)
self_noise_filter_arg = DeclareLaunchArgument(
'self_noise_filter',
default_value='true',
description='Apply robot self-noise suppression',
)
# Audio direction node
audio_direction_node = Node(
package='saltybot_audio_direction',
executable='audio_direction_node',
name='audio_direction_estimator',
output='screen',
parameters=[
{'device_id': LaunchConfiguration('device_id')},
{'sample_rate': LaunchConfiguration('sample_rate')},
{'chunk_size': LaunchConfiguration('chunk_size')},
{'publish_hz': LaunchConfiguration('publish_hz')},
{'vad_threshold': LaunchConfiguration('vad_threshold')},
{'gcc_phat_max_lag': LaunchConfiguration('gcc_max_lag')},
{'self_noise_filter': LaunchConfiguration('self_noise_filter')},
],
)
return LaunchDescription(
[
device_id_arg,
sample_rate_arg,
chunk_size_arg,
publish_hz_arg,
vad_threshold_arg,
gcc_max_lag_arg,
self_noise_filter_arg,
audio_direction_node,
]
)

View File

@ -0,0 +1,31 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_audio_direction</name>
<version>0.1.0</version>
<description>
Audio direction estimator for sound source localization via Jabra microphone.
Implements GCC-PHAT beamforming for direction of arrival (DoA) estimation.
Publishes bearing (degrees) and voice activity detection (VAD) for speaker tracking integration.
Issue #430.
</description>
<maintainer email="sl-perception@saltylab.local">sl-perception</maintainer>
<license>MIT</license>
<buildtool_depend>ament_python</buildtool_depend>
<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>sensor_msgs</depend>
<depend>geometry_msgs</depend>
<exec_depend>python3-numpy</exec_depend>
<exec_depend>python3-scipy</exec_depend>
<exec_depend>python3-sounddevice</exec_depend>
<test_depend>pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,299 @@
"""
audio_direction_node.py Sound source localization via GCC-PHAT beamforming.
Estimates direction of arrival (DoA) from a Jabra multi-channel microphone
using Generalized Cross-Correlation with Phase Transform (GCC-PHAT).
Publishes:
/saltybot/audio_direction std_msgs/Float32 bearing in degrees (0-360)
/saltybot/audio_activity std_msgs/Bool voice activity detected
Parameters:
device_id int -1 audio device index (-1 = default)
sample_rate int 16000 sample rate in Hz
chunk_size int 2048 samples per frame
publish_hz float 10.0 output publication rate
vad_threshold float 0.02 RMS energy threshold for speech
gcc_phat_max_lag int 64 max lag for correlation (determines angular resolution)
self_noise_filter bool true apply robot noise suppression
"""
from __future__ import annotations
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
import numpy as np
from scipy import signal
import sounddevice as sd
from collections import deque
import threading
import time
from std_msgs.msg import Float32, Bool
_SENSOR_QOS = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=5,
)
class GCCPHATBeamformer:
"""Generalized Cross-Correlation with Phase Transform for DoA estimation."""
def __init__(self, sample_rate: int, max_lag: int = 64):
self.sample_rate = sample_rate
self.max_lag = max_lag
self.frequency_bins = max_lag * 2
# Azimuth angles for left (270°), front (0°), right (90°), rear (180°)
self.angles = np.array([270.0, 0.0, 90.0, 180.0])
def gcc_phat(self, sig1: np.ndarray, sig2: np.ndarray) -> float:
"""
Compute GCC-PHAT correlation and estimate time delay (DoA proxy).
Returns:
Estimated time delay in samples (can be converted to angle)
"""
if len(sig1) == 0 or len(sig2) == 0:
return 0.0
# Cross-correlation in frequency domain
fft1 = np.fft.rfft(sig1, n=self.frequency_bins)
fft2 = np.fft.rfft(sig2, n=self.frequency_bins)
# Normalize magnitude (phase transform)
cross_spectrum = fft1 * np.conj(fft2)
cross_spectrum_normalized = cross_spectrum / (np.abs(cross_spectrum) + 1e-8)
# Inverse FFT to get correlation
correlation = np.fft.irfft(cross_spectrum_normalized, n=self.frequency_bins)
# Find lag with maximum correlation
lags = np.arange(-self.max_lag, self.max_lag + 1)
valid_corr = correlation[: len(lags)]
max_lag_idx = np.argmax(np.abs(valid_corr))
estimated_lag = lags[max_lag_idx] if max_lag_idx < len(lags) else 0.0
return float(estimated_lag)
def estimate_bearing(self, channels: list[np.ndarray]) -> float:
"""
Estimate bearing from multi-channel audio.
Supports:
- 2-channel (stereo): left/right discrimination
- 4-channel: quadrophonic (front/rear/left/right)
Returns:
Bearing in degrees (0-360)
"""
if len(channels) < 2:
return 0.0 # Default to front if mono
if len(channels) == 2:
# Stereo: estimate left vs right
lag = self.gcc_phat(channels[0], channels[1])
# Negative lag → left (270°), positive lag → right (90°)
if abs(lag) < 5:
return 0.0 # Front
elif lag < 0:
return 270.0 # Left
else:
return 90.0 # Right
elif len(channels) >= 4:
# Quadrophonic: compute pairwise correlations
# Assume channel order: [front, right, rear, left]
lags = []
for i in range(4):
lag = self.gcc_phat(channels[i], channels[(i + 1) % 4])
lags.append(lag)
# Select angle based on strongest correlation
max_idx = np.argmax(np.abs(lags))
return self.angles[max_idx]
return 0.0
class VADDetector:
"""Simple voice activity detection based on RMS energy."""
def __init__(self, threshold: float = 0.02, smoothing: int = 5):
self.threshold = threshold
self.smoothing = smoothing
self.history = deque(maxlen=smoothing)
def detect(self, audio_frame: np.ndarray) -> bool:
"""
Detect voice activity in audio frame.
Returns:
True if speech-like energy detected
"""
rms = np.sqrt(np.mean(audio_frame**2))
self.history.append(rms > self.threshold)
# Majority voting over window
return sum(self.history) > self.smoothing // 2
class AudioDirectionNode(Node):
def __init__(self):
super().__init__('audio_direction_estimator')
# Parameters
self.declare_parameter('device_id', -1)
self.declare_parameter('sample_rate', 16000)
self.declare_parameter('chunk_size', 2048)
self.declare_parameter('publish_hz', 10.0)
self.declare_parameter('vad_threshold', 0.02)
self.declare_parameter('gcc_phat_max_lag', 64)
self.declare_parameter('self_noise_filter', True)
self.device_id = self.get_parameter('device_id').value
self.sample_rate = self.get_parameter('sample_rate').value
self.chunk_size = self.get_parameter('chunk_size').value
pub_hz = self.get_parameter('publish_hz').value
vad_threshold = self.get_parameter('vad_threshold').value
gcc_max_lag = self.get_parameter('gcc_phat_max_lag').value
self.apply_noise_filter = self.get_parameter('self_noise_filter').value
# Publishers
self._pub_bearing = self.create_publisher(
Float32, '/saltybot/audio_direction', 10, qos_profile=_SENSOR_QOS
)
self._pub_vad = self.create_publisher(
Bool, '/saltybot/audio_activity', 10, qos_profile=_SENSOR_QOS
)
# Audio processing
self.beamformer = GCCPHATBeamformer(self.sample_rate, max_lag=gcc_max_lag)
self.vad = VADDetector(threshold=vad_threshold)
self._audio_buffer = deque(maxlen=self.chunk_size * 2)
self._lock = threading.Lock()
# Start audio stream
self._stream = None
self._running = True
self._start_audio_stream()
# Publish timer
self.create_timer(1.0 / pub_hz, self._tick)
self.get_logger().info(
f'audio_direction_estimator ready — '
f'device_id={self.device_id} sample_rate={self.sample_rate} Hz '
f'chunk_size={self.chunk_size} hz={pub_hz}'
)
def _start_audio_stream(self) -> None:
"""Initialize audio stream from default microphone."""
try:
self._stream = sd.InputStream(
device=self.device_id if self.device_id >= 0 else None,
samplerate=self.sample_rate,
channels=2, # Default to stereo; auto-detect Jabra channels
blocksize=self.chunk_size,
callback=self._audio_callback,
latency='low',
)
self._stream.start()
self.get_logger().info('Audio stream started')
except Exception as e:
self.get_logger().error(f'Failed to start audio stream: {e}')
self._stream = None
def _audio_callback(self, indata: np.ndarray, frames: int, time_info, status) -> None:
"""Callback for audio input stream."""
if status:
self.get_logger().warn(f'Audio callback status: {status}')
try:
with self._lock:
# Store stereo or mono data
if indata.shape[1] == 1:
self._audio_buffer.extend(indata.flatten())
else:
# For multi-channel, interleave or store separately
self._audio_buffer.extend(indata.flatten())
except Exception as e:
self.get_logger().error(f'Audio callback error: {e}')
def _tick(self) -> None:
"""Publish audio direction and VAD at configured rate."""
if self._stream is None or not self._running:
return
with self._lock:
if len(self._audio_buffer) < self.chunk_size:
return
audio_data = np.array(list(self._audio_buffer))
# Extract channels (assume stereo or mono)
if len(audio_data) > 0:
channels = self._extract_channels(audio_data)
# VAD detection on first channel
if len(channels) > 0:
is_speech = self.vad.detect(channels[0])
vad_msg = Bool()
vad_msg.data = is_speech
self._pub_vad.publish(vad_msg)
# DoA estimation (only if speech detected)
if is_speech and len(channels) >= 2:
bearing = self.beamformer.estimate_bearing(channels)
else:
bearing = 0.0 # Default to front when no speech
bearing_msg = Float32()
bearing_msg.data = float(bearing)
self._pub_bearing.publish(bearing_msg)
def _extract_channels(self, audio_data: np.ndarray) -> list[np.ndarray]:
"""
Extract stereo/mono channels from audio buffer.
Handles both interleaved and non-interleaved formats.
"""
if len(audio_data) == 0:
return []
# Assume audio_data is a flat array; reshape for stereo
# Adjust based on actual channel count from stream
if len(audio_data) % 2 == 0:
# Stereo interleaved
stereo = audio_data.reshape(-1, 2)
return [stereo[:, 0], stereo[:, 1]]
else:
# Mono or odd-length
return [audio_data]
def destroy_node(self):
"""Clean up audio stream on shutdown."""
self._running = False
if self._stream is not None:
self._stream.stop()
self._stream.close()
super().destroy_node()
def main(args=None):
rclpy.init(args=args)
node = AudioDirectionNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/saltybot_audio_direction
[egg_info]
tag_date = 0

View File

@ -0,0 +1,23 @@
from setuptools import setup, find_packages
setup(
name='saltybot_audio_direction',
version='0.1.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/saltybot_audio_direction']),
('share/saltybot_audio_direction', ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
author='SaltyLab',
author_email='robot@saltylab.local',
description='Audio direction estimator for sound source localization',
license='MIT',
entry_points={
'console_scripts': [
'audio_direction_node=saltybot_audio_direction.audio_direction_node:main',
],
},
)

View File

@ -0,0 +1,78 @@
"""
Basic tests for audio direction estimator.
"""
import pytest
import numpy as np
from saltybot_audio_direction.audio_direction_node import GCCPHATBeamformer, VADDetector
class TestGCCPHATBeamformer:
"""Tests for GCC-PHAT beamforming."""
def test_beamformer_init(self):
"""Test beamformer initialization."""
beamformer = GCCPHATBeamformer(sample_rate=16000, max_lag=64)
assert beamformer.sample_rate == 16000
assert beamformer.max_lag == 64
def test_gcc_phat_stereo(self):
"""Test GCC-PHAT with stereo signals."""
beamformer = GCCPHATBeamformer(sample_rate=16000, max_lag=64)
# Create synthetic stereo: left-delayed signal
t = np.arange(512) / 16000.0
sig1 = np.sin(2 * np.pi * 1000 * t) # Left mic
sig2 = np.sin(2 * np.pi * 1000 * (t - 0.004)) # Right mic (delayed)
lag = beamformer.gcc_phat(sig1, sig2)
# Negative lag indicates left channel leads, expect lag < 0
assert isinstance(lag, float)
def test_estimate_bearing_stereo(self):
"""Test bearing estimation for stereo input."""
beamformer = GCCPHATBeamformer(sample_rate=16000, max_lag=64)
t = np.arange(512) / 16000.0
sig1 = np.sin(2 * np.pi * 1000 * t)
sig2 = np.sin(2 * np.pi * 1000 * t)
bearing = beamformer.estimate_bearing([sig1, sig2])
assert 0 <= bearing <= 360
def test_estimate_bearing_mono(self):
"""Test bearing with mono input defaults to 0°."""
beamformer = GCCPHATBeamformer(sample_rate=16000)
sig = np.sin(2 * np.pi * 1000 * np.arange(512) / 16000.0)
bearing = beamformer.estimate_bearing([sig])
assert bearing == 0.0
class TestVADDetector:
"""Tests for voice activity detection."""
def test_vad_init(self):
"""Test VAD detector initialization."""
vad = VADDetector(threshold=0.02, smoothing=5)
assert vad.threshold == 0.02
assert vad.smoothing == 5
def test_vad_silence(self):
"""Test VAD detects silence."""
vad = VADDetector(threshold=0.02)
silence = np.zeros(2048) * 0.001 # Very quiet
is_speech = vad.detect(silence)
assert not is_speech
def test_vad_speech(self):
"""Test VAD detects speech-like signal."""
vad = VADDetector(threshold=0.02)
t = np.arange(2048) / 16000.0
speech = np.sin(2 * np.pi * 1000 * t) * 0.1 # ~0.07 RMS
for _ in range(5): # Run multiple times to accumulate history
is_speech = vad.detect(speech)
assert is_speech
if __name__ == '__main__':
pytest.main([__file__])