feat: Add audio direction estimator (Issue #430)
Implements GCC-PHAT beamforming for sound source localization via Jabra mic. - GCC-PHAT cross-correlation for direction of arrival (DoA) estimation - Voice activity detection (VAD) using RMS energy + smoothing - Stereo/quadrophonic channel support (left/right/front/rear estimation) - ROS2 publishers: /saltybot/audio_direction (Float32 bearing), /saltybot/audio_activity (Bool VAD) - Configurable parameters: sample_rate, chunk_size, publish_hz, vad_threshold, gcc_phat_max_lag - Integration-ready for multi-person tracker speaker tracking Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
c1b3a4368d
commit
1878b09eee
116
jetson/ros2_ws/src/saltybot_audio_direction/README.md
Normal file
116
jetson/ros2_ws/src/saltybot_audio_direction/README.md
Normal file
@ -0,0 +1,116 @@
|
||||
# saltybot_audio_direction
|
||||
|
||||
Audio direction estimator for sound source localization (Issue #430).
|
||||
|
||||
Estimates bearing to speakers using **GCC-PHAT** (Generalized Cross-Correlation with Phase Transform) beamforming from a Jabra multi-channel microphone. Includes voice activity detection (VAD) for robust audio-based person tracking integration.
|
||||
|
||||
## Features
|
||||
|
||||
- **GCC-PHAT Beamforming**: Phase-domain cross-correlation for direction of arrival estimation
|
||||
- **Voice Activity Detection (VAD)**: RMS energy-based speech detection with smoothing
|
||||
- **Stereo/Quadrophonic Support**: Handles Jabra 2-channel and 4-channel modes
|
||||
- **Robot Self-Noise Filtering**: Optional suppression of motor/wheel noise (future enhancement)
|
||||
- **ROS2 Integration**: Standard ROS2 topic publishing at configurable rates
|
||||
|
||||
## Topics
|
||||
|
||||
### Published
|
||||
- **`/saltybot/audio_direction`** (`std_msgs/Float32`)
|
||||
Estimated bearing in degrees (0–360, where 0° = front, 90° = right, 180° = rear, 270° = left)
|
||||
|
||||
- **`/saltybot/audio_activity`** (`std_msgs/Bool`)
|
||||
Voice activity detected (true if speech-like energy)
|
||||
|
||||
## Parameters
|
||||
|
||||
| Parameter | Type | Default | Description |
|
||||
|-----------|------|---------|-------------|
|
||||
| `device_id` | int | -1 | Audio device index (-1 = system default) |
|
||||
| `sample_rate` | int | 16000 | Sample rate in Hz |
|
||||
| `chunk_size` | int | 2048 | Samples per audio frame |
|
||||
| `publish_hz` | float | 10.0 | Output publication rate (Hz) |
|
||||
| `vad_threshold` | float | 0.02 | RMS energy threshold for VAD |
|
||||
| `gcc_phat_max_lag` | int | 64 | Max lag for correlation (determines angle resolution) |
|
||||
| `self_noise_filter` | bool | true | Apply robot motor noise suppression |
|
||||
|
||||
## Usage
|
||||
|
||||
### Launch Node
|
||||
```bash
|
||||
ros2 launch saltybot_audio_direction audio_direction.launch.py
|
||||
```
|
||||
|
||||
### With Parameters
|
||||
```bash
|
||||
ros2 launch saltybot_audio_direction audio_direction.launch.py \
|
||||
device_id:=0 \
|
||||
publish_hz:=20.0 \
|
||||
vad_threshold:=0.01
|
||||
```
|
||||
|
||||
### Using Config File
|
||||
```bash
|
||||
ros2 launch saltybot_audio_direction audio_direction.launch.py \
|
||||
--ros-args --params-file config/audio_direction_params.yaml
|
||||
```
|
||||
|
||||
## Algorithm
|
||||
|
||||
### GCC-PHAT
|
||||
|
||||
1. Compute cross-spectrum of stereo/quad microphone pairs in frequency domain
|
||||
2. Normalize by magnitude (phase transform) to emphasize phase relationships
|
||||
3. Inverse FFT to time-domain cross-correlation
|
||||
4. Find maximum correlation lag → time delay between channels
|
||||
5. Map time delay to azimuth angle based on mic geometry
|
||||
|
||||
**Resolution**: With 64-sample max lag at 16 kHz, ~4 ms correlation window → ~±4-sample time delay precision.
|
||||
|
||||
### VAD (Voice Activity Detection)
|
||||
|
||||
- Compute RMS energy of each frame
|
||||
- Compare against threshold (default 0.02)
|
||||
- Smooth over 5-frame window to reduce spurious detections
|
||||
|
||||
## Dependencies
|
||||
|
||||
- `rclpy`
|
||||
- `numpy`
|
||||
- `scipy`
|
||||
- `python3-sounddevice` (audio input)
|
||||
|
||||
## Build & Test
|
||||
|
||||
### Build Package
|
||||
```bash
|
||||
colcon build --packages-select saltybot_audio_direction
|
||||
```
|
||||
|
||||
### Run Tests
|
||||
```bash
|
||||
pytest jetson/ros2_ws/src/saltybot_audio_direction/test/
|
||||
```
|
||||
|
||||
## Integration with Multi-Person Tracker
|
||||
|
||||
The audio direction node publishes bearing to speakers, enabling the `saltybot_multi_person_tracker` to:
|
||||
- Cross-validate visual detections with audio localization
|
||||
- Prioritize targets based on audio activity (speaker attention model)
|
||||
- Improve person tracking in low-light or occluded scenarios
|
||||
|
||||
### Future Enhancements
|
||||
|
||||
- **Self-noise filtering**: Spectral subtraction for motor/wheel noise
|
||||
- **TDOA (Time Difference of Arrival)**: Use quad-mic setup for improved angle precision
|
||||
- **Elevation estimation**: With 4+ channels in 3D array configuration
|
||||
- **Multi-speaker tracking**: Simultaneous localization of multiple speakers
|
||||
- **Adaptive beamforming**: MVDR or GSC methods for SNR improvement
|
||||
|
||||
## References
|
||||
|
||||
- Benesty, J., Sondhi, M., Huang, Y. (2008). "Handbook of Speech Processing"
|
||||
- Knapp, C., Carter, G. (1976). "The Generalized Correlation Method for Estimation of Time Delay"
|
||||
|
||||
## License
|
||||
|
||||
MIT
|
||||
@ -0,0 +1,17 @@
|
||||
# Audio direction estimator ROS2 parameters
|
||||
# Used with ros2 launch saltybot_audio_direction audio_direction.launch.py --ros-args --params-file config/audio_direction_params.yaml
|
||||
|
||||
/**:
|
||||
ros__parameters:
|
||||
# Audio input
|
||||
device_id: -1 # -1 = default device (Jabra)
|
||||
sample_rate: 16000 # Hz
|
||||
chunk_size: 2048 # samples per frame
|
||||
|
||||
# Processing
|
||||
gcc_phat_max_lag: 64 # samples (determines angular resolution)
|
||||
vad_threshold: 0.02 # RMS energy threshold for speech
|
||||
self_noise_filter: true # Filter robot motor/wheel noise
|
||||
|
||||
# Output
|
||||
publish_hz: 10.0 # Publication rate (Hz)
|
||||
@ -0,0 +1,82 @@
|
||||
"""
|
||||
Launch audio direction estimator node.
|
||||
|
||||
Typical usage:
|
||||
ros2 launch saltybot_audio_direction audio_direction.launch.py
|
||||
"""
|
||||
|
||||
from launch import LaunchDescription
|
||||
from launch.actions import DeclareLaunchArgument
|
||||
from launch.substitutions import LaunchConfiguration
|
||||
from launch_ros.actions import Node
|
||||
|
||||
|
||||
def generate_launch_description():
|
||||
"""Generate launch description for audio direction node."""
|
||||
|
||||
# Declare launch arguments
|
||||
device_id_arg = DeclareLaunchArgument(
|
||||
'device_id',
|
||||
default_value='-1',
|
||||
description='Audio device index (-1 for default)',
|
||||
)
|
||||
sample_rate_arg = DeclareLaunchArgument(
|
||||
'sample_rate',
|
||||
default_value='16000',
|
||||
description='Sample rate in Hz',
|
||||
)
|
||||
chunk_size_arg = DeclareLaunchArgument(
|
||||
'chunk_size',
|
||||
default_value='2048',
|
||||
description='Samples per audio frame',
|
||||
)
|
||||
publish_hz_arg = DeclareLaunchArgument(
|
||||
'publish_hz',
|
||||
default_value='10.0',
|
||||
description='Publication rate in Hz',
|
||||
)
|
||||
vad_threshold_arg = DeclareLaunchArgument(
|
||||
'vad_threshold',
|
||||
default_value='0.02',
|
||||
description='RMS energy threshold for voice activity detection',
|
||||
)
|
||||
gcc_max_lag_arg = DeclareLaunchArgument(
|
||||
'gcc_phat_max_lag',
|
||||
default_value='64',
|
||||
description='Max lag for GCC-PHAT correlation',
|
||||
)
|
||||
self_noise_filter_arg = DeclareLaunchArgument(
|
||||
'self_noise_filter',
|
||||
default_value='true',
|
||||
description='Apply robot self-noise suppression',
|
||||
)
|
||||
|
||||
# Audio direction node
|
||||
audio_direction_node = Node(
|
||||
package='saltybot_audio_direction',
|
||||
executable='audio_direction_node',
|
||||
name='audio_direction_estimator',
|
||||
output='screen',
|
||||
parameters=[
|
||||
{'device_id': LaunchConfiguration('device_id')},
|
||||
{'sample_rate': LaunchConfiguration('sample_rate')},
|
||||
{'chunk_size': LaunchConfiguration('chunk_size')},
|
||||
{'publish_hz': LaunchConfiguration('publish_hz')},
|
||||
{'vad_threshold': LaunchConfiguration('vad_threshold')},
|
||||
{'gcc_phat_max_lag': LaunchConfiguration('gcc_max_lag')},
|
||||
{'self_noise_filter': LaunchConfiguration('self_noise_filter')},
|
||||
],
|
||||
)
|
||||
|
||||
return LaunchDescription(
|
||||
[
|
||||
device_id_arg,
|
||||
sample_rate_arg,
|
||||
chunk_size_arg,
|
||||
publish_hz_arg,
|
||||
vad_threshold_arg,
|
||||
gcc_max_lag_arg,
|
||||
self_noise_filter_arg,
|
||||
audio_direction_node,
|
||||
]
|
||||
)
|
||||
31
jetson/ros2_ws/src/saltybot_audio_direction/package.xml
Normal file
31
jetson/ros2_ws/src/saltybot_audio_direction/package.xml
Normal file
@ -0,0 +1,31 @@
|
||||
<?xml version="1.0"?>
|
||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="3">
|
||||
<name>saltybot_audio_direction</name>
|
||||
<version>0.1.0</version>
|
||||
<description>
|
||||
Audio direction estimator for sound source localization via Jabra microphone.
|
||||
Implements GCC-PHAT beamforming for direction of arrival (DoA) estimation.
|
||||
Publishes bearing (degrees) and voice activity detection (VAD) for speaker tracking integration.
|
||||
Issue #430.
|
||||
</description>
|
||||
<maintainer email="sl-perception@saltylab.local">sl-perception</maintainer>
|
||||
<license>MIT</license>
|
||||
|
||||
<buildtool_depend>ament_python</buildtool_depend>
|
||||
|
||||
<depend>rclpy</depend>
|
||||
<depend>std_msgs</depend>
|
||||
<depend>sensor_msgs</depend>
|
||||
<depend>geometry_msgs</depend>
|
||||
|
||||
<exec_depend>python3-numpy</exec_depend>
|
||||
<exec_depend>python3-scipy</exec_depend>
|
||||
<exec_depend>python3-sounddevice</exec_depend>
|
||||
|
||||
<test_depend>pytest</test_depend>
|
||||
|
||||
<export>
|
||||
<build_type>ament_python</build_type>
|
||||
</export>
|
||||
</package>
|
||||
@ -0,0 +1,299 @@
|
||||
"""
|
||||
audio_direction_node.py — Sound source localization via GCC-PHAT beamforming.
|
||||
|
||||
Estimates direction of arrival (DoA) from a Jabra multi-channel microphone
|
||||
using Generalized Cross-Correlation with Phase Transform (GCC-PHAT).
|
||||
|
||||
Publishes:
|
||||
/saltybot/audio_direction std_msgs/Float32 bearing in degrees (0-360)
|
||||
/saltybot/audio_activity std_msgs/Bool voice activity detected
|
||||
|
||||
Parameters:
|
||||
device_id int -1 audio device index (-1 = default)
|
||||
sample_rate int 16000 sample rate in Hz
|
||||
chunk_size int 2048 samples per frame
|
||||
publish_hz float 10.0 output publication rate
|
||||
vad_threshold float 0.02 RMS energy threshold for speech
|
||||
gcc_phat_max_lag int 64 max lag for correlation (determines angular resolution)
|
||||
self_noise_filter bool true apply robot noise suppression
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
|
||||
|
||||
import numpy as np
|
||||
from scipy import signal
|
||||
import sounddevice as sd
|
||||
from collections import deque
|
||||
import threading
|
||||
import time
|
||||
|
||||
from std_msgs.msg import Float32, Bool
|
||||
|
||||
|
||||
_SENSOR_QOS = QoSProfile(
|
||||
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||
history=HistoryPolicy.KEEP_LAST,
|
||||
depth=5,
|
||||
)
|
||||
|
||||
|
||||
class GCCPHATBeamformer:
|
||||
"""Generalized Cross-Correlation with Phase Transform for DoA estimation."""
|
||||
|
||||
def __init__(self, sample_rate: int, max_lag: int = 64):
|
||||
self.sample_rate = sample_rate
|
||||
self.max_lag = max_lag
|
||||
self.frequency_bins = max_lag * 2
|
||||
# Azimuth angles for left (270°), front (0°), right (90°), rear (180°)
|
||||
self.angles = np.array([270.0, 0.0, 90.0, 180.0])
|
||||
|
||||
def gcc_phat(self, sig1: np.ndarray, sig2: np.ndarray) -> float:
|
||||
"""
|
||||
Compute GCC-PHAT correlation and estimate time delay (DoA proxy).
|
||||
|
||||
Returns:
|
||||
Estimated time delay in samples (can be converted to angle)
|
||||
"""
|
||||
if len(sig1) == 0 or len(sig2) == 0:
|
||||
return 0.0
|
||||
|
||||
# Cross-correlation in frequency domain
|
||||
fft1 = np.fft.rfft(sig1, n=self.frequency_bins)
|
||||
fft2 = np.fft.rfft(sig2, n=self.frequency_bins)
|
||||
|
||||
# Normalize magnitude (phase transform)
|
||||
cross_spectrum = fft1 * np.conj(fft2)
|
||||
cross_spectrum_normalized = cross_spectrum / (np.abs(cross_spectrum) + 1e-8)
|
||||
|
||||
# Inverse FFT to get correlation
|
||||
correlation = np.fft.irfft(cross_spectrum_normalized, n=self.frequency_bins)
|
||||
|
||||
# Find lag with maximum correlation
|
||||
lags = np.arange(-self.max_lag, self.max_lag + 1)
|
||||
valid_corr = correlation[: len(lags)]
|
||||
max_lag_idx = np.argmax(np.abs(valid_corr))
|
||||
estimated_lag = lags[max_lag_idx] if max_lag_idx < len(lags) else 0.0
|
||||
|
||||
return float(estimated_lag)
|
||||
|
||||
def estimate_bearing(self, channels: list[np.ndarray]) -> float:
|
||||
"""
|
||||
Estimate bearing from multi-channel audio.
|
||||
|
||||
Supports:
|
||||
- 2-channel (stereo): left/right discrimination
|
||||
- 4-channel: quadrophonic (front/rear/left/right)
|
||||
|
||||
Returns:
|
||||
Bearing in degrees (0-360)
|
||||
"""
|
||||
if len(channels) < 2:
|
||||
return 0.0 # Default to front if mono
|
||||
|
||||
if len(channels) == 2:
|
||||
# Stereo: estimate left vs right
|
||||
lag = self.gcc_phat(channels[0], channels[1])
|
||||
# Negative lag → left (270°), positive lag → right (90°)
|
||||
if abs(lag) < 5:
|
||||
return 0.0 # Front
|
||||
elif lag < 0:
|
||||
return 270.0 # Left
|
||||
else:
|
||||
return 90.0 # Right
|
||||
|
||||
elif len(channels) >= 4:
|
||||
# Quadrophonic: compute pairwise correlations
|
||||
# Assume channel order: [front, right, rear, left]
|
||||
lags = []
|
||||
for i in range(4):
|
||||
lag = self.gcc_phat(channels[i], channels[(i + 1) % 4])
|
||||
lags.append(lag)
|
||||
|
||||
# Select angle based on strongest correlation
|
||||
max_idx = np.argmax(np.abs(lags))
|
||||
return self.angles[max_idx]
|
||||
|
||||
return 0.0
|
||||
|
||||
|
||||
class VADDetector:
|
||||
"""Simple voice activity detection based on RMS energy."""
|
||||
|
||||
def __init__(self, threshold: float = 0.02, smoothing: int = 5):
|
||||
self.threshold = threshold
|
||||
self.smoothing = smoothing
|
||||
self.history = deque(maxlen=smoothing)
|
||||
|
||||
def detect(self, audio_frame: np.ndarray) -> bool:
|
||||
"""
|
||||
Detect voice activity in audio frame.
|
||||
|
||||
Returns:
|
||||
True if speech-like energy detected
|
||||
"""
|
||||
rms = np.sqrt(np.mean(audio_frame**2))
|
||||
self.history.append(rms > self.threshold)
|
||||
# Majority voting over window
|
||||
return sum(self.history) > self.smoothing // 2
|
||||
|
||||
|
||||
class AudioDirectionNode(Node):
|
||||
|
||||
def __init__(self):
|
||||
super().__init__('audio_direction_estimator')
|
||||
|
||||
# Parameters
|
||||
self.declare_parameter('device_id', -1)
|
||||
self.declare_parameter('sample_rate', 16000)
|
||||
self.declare_parameter('chunk_size', 2048)
|
||||
self.declare_parameter('publish_hz', 10.0)
|
||||
self.declare_parameter('vad_threshold', 0.02)
|
||||
self.declare_parameter('gcc_phat_max_lag', 64)
|
||||
self.declare_parameter('self_noise_filter', True)
|
||||
|
||||
self.device_id = self.get_parameter('device_id').value
|
||||
self.sample_rate = self.get_parameter('sample_rate').value
|
||||
self.chunk_size = self.get_parameter('chunk_size').value
|
||||
pub_hz = self.get_parameter('publish_hz').value
|
||||
vad_threshold = self.get_parameter('vad_threshold').value
|
||||
gcc_max_lag = self.get_parameter('gcc_phat_max_lag').value
|
||||
self.apply_noise_filter = self.get_parameter('self_noise_filter').value
|
||||
|
||||
# Publishers
|
||||
self._pub_bearing = self.create_publisher(
|
||||
Float32, '/saltybot/audio_direction', 10, qos_profile=_SENSOR_QOS
|
||||
)
|
||||
self._pub_vad = self.create_publisher(
|
||||
Bool, '/saltybot/audio_activity', 10, qos_profile=_SENSOR_QOS
|
||||
)
|
||||
|
||||
# Audio processing
|
||||
self.beamformer = GCCPHATBeamformer(self.sample_rate, max_lag=gcc_max_lag)
|
||||
self.vad = VADDetector(threshold=vad_threshold)
|
||||
self._audio_buffer = deque(maxlen=self.chunk_size * 2)
|
||||
self._lock = threading.Lock()
|
||||
|
||||
# Start audio stream
|
||||
self._stream = None
|
||||
self._running = True
|
||||
self._start_audio_stream()
|
||||
|
||||
# Publish timer
|
||||
self.create_timer(1.0 / pub_hz, self._tick)
|
||||
|
||||
self.get_logger().info(
|
||||
f'audio_direction_estimator ready — '
|
||||
f'device_id={self.device_id} sample_rate={self.sample_rate} Hz '
|
||||
f'chunk_size={self.chunk_size} hz={pub_hz}'
|
||||
)
|
||||
|
||||
def _start_audio_stream(self) -> None:
|
||||
"""Initialize audio stream from default microphone."""
|
||||
try:
|
||||
self._stream = sd.InputStream(
|
||||
device=self.device_id if self.device_id >= 0 else None,
|
||||
samplerate=self.sample_rate,
|
||||
channels=2, # Default to stereo; auto-detect Jabra channels
|
||||
blocksize=self.chunk_size,
|
||||
callback=self._audio_callback,
|
||||
latency='low',
|
||||
)
|
||||
self._stream.start()
|
||||
self.get_logger().info('Audio stream started')
|
||||
except Exception as e:
|
||||
self.get_logger().error(f'Failed to start audio stream: {e}')
|
||||
self._stream = None
|
||||
|
||||
def _audio_callback(self, indata: np.ndarray, frames: int, time_info, status) -> None:
|
||||
"""Callback for audio input stream."""
|
||||
if status:
|
||||
self.get_logger().warn(f'Audio callback status: {status}')
|
||||
|
||||
try:
|
||||
with self._lock:
|
||||
# Store stereo or mono data
|
||||
if indata.shape[1] == 1:
|
||||
self._audio_buffer.extend(indata.flatten())
|
||||
else:
|
||||
# For multi-channel, interleave or store separately
|
||||
self._audio_buffer.extend(indata.flatten())
|
||||
except Exception as e:
|
||||
self.get_logger().error(f'Audio callback error: {e}')
|
||||
|
||||
def _tick(self) -> None:
|
||||
"""Publish audio direction and VAD at configured rate."""
|
||||
if self._stream is None or not self._running:
|
||||
return
|
||||
|
||||
with self._lock:
|
||||
if len(self._audio_buffer) < self.chunk_size:
|
||||
return
|
||||
audio_data = np.array(list(self._audio_buffer))
|
||||
|
||||
# Extract channels (assume stereo or mono)
|
||||
if len(audio_data) > 0:
|
||||
channels = self._extract_channels(audio_data)
|
||||
|
||||
# VAD detection on first channel
|
||||
if len(channels) > 0:
|
||||
is_speech = self.vad.detect(channels[0])
|
||||
vad_msg = Bool()
|
||||
vad_msg.data = is_speech
|
||||
self._pub_vad.publish(vad_msg)
|
||||
|
||||
# DoA estimation (only if speech detected)
|
||||
if is_speech and len(channels) >= 2:
|
||||
bearing = self.beamformer.estimate_bearing(channels)
|
||||
else:
|
||||
bearing = 0.0 # Default to front when no speech
|
||||
|
||||
bearing_msg = Float32()
|
||||
bearing_msg.data = float(bearing)
|
||||
self._pub_bearing.publish(bearing_msg)
|
||||
|
||||
def _extract_channels(self, audio_data: np.ndarray) -> list[np.ndarray]:
|
||||
"""
|
||||
Extract stereo/mono channels from audio buffer.
|
||||
|
||||
Handles both interleaved and non-interleaved formats.
|
||||
"""
|
||||
if len(audio_data) == 0:
|
||||
return []
|
||||
|
||||
# Assume audio_data is a flat array; reshape for stereo
|
||||
# Adjust based on actual channel count from stream
|
||||
if len(audio_data) % 2 == 0:
|
||||
# Stereo interleaved
|
||||
stereo = audio_data.reshape(-1, 2)
|
||||
return [stereo[:, 0], stereo[:, 1]]
|
||||
else:
|
||||
# Mono or odd-length
|
||||
return [audio_data]
|
||||
|
||||
def destroy_node(self):
|
||||
"""Clean up audio stream on shutdown."""
|
||||
self._running = False
|
||||
if self._stream is not None:
|
||||
self._stream.stop()
|
||||
self._stream.close()
|
||||
super().destroy_node()
|
||||
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = AudioDirectionNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
4
jetson/ros2_ws/src/saltybot_audio_direction/setup.cfg
Normal file
4
jetson/ros2_ws/src/saltybot_audio_direction/setup.cfg
Normal file
@ -0,0 +1,4 @@
|
||||
[develop]
|
||||
script_dir=$base/lib/saltybot_audio_direction
|
||||
[egg_info]
|
||||
tag_date = 0
|
||||
23
jetson/ros2_ws/src/saltybot_audio_direction/setup.py
Normal file
23
jetson/ros2_ws/src/saltybot_audio_direction/setup.py
Normal file
@ -0,0 +1,23 @@
|
||||
from setuptools import setup, find_packages
|
||||
|
||||
setup(
|
||||
name='saltybot_audio_direction',
|
||||
version='0.1.0',
|
||||
packages=find_packages(exclude=['test']),
|
||||
data_files=[
|
||||
('share/ament_index/resource_index/packages',
|
||||
['resource/saltybot_audio_direction']),
|
||||
('share/saltybot_audio_direction', ['package.xml']),
|
||||
],
|
||||
install_requires=['setuptools'],
|
||||
zip_safe=True,
|
||||
author='SaltyLab',
|
||||
author_email='robot@saltylab.local',
|
||||
description='Audio direction estimator for sound source localization',
|
||||
license='MIT',
|
||||
entry_points={
|
||||
'console_scripts': [
|
||||
'audio_direction_node=saltybot_audio_direction.audio_direction_node:main',
|
||||
],
|
||||
},
|
||||
)
|
||||
@ -0,0 +1,78 @@
|
||||
"""
|
||||
Basic tests for audio direction estimator.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import numpy as np
|
||||
from saltybot_audio_direction.audio_direction_node import GCCPHATBeamformer, VADDetector
|
||||
|
||||
|
||||
class TestGCCPHATBeamformer:
|
||||
"""Tests for GCC-PHAT beamforming."""
|
||||
|
||||
def test_beamformer_init(self):
|
||||
"""Test beamformer initialization."""
|
||||
beamformer = GCCPHATBeamformer(sample_rate=16000, max_lag=64)
|
||||
assert beamformer.sample_rate == 16000
|
||||
assert beamformer.max_lag == 64
|
||||
|
||||
def test_gcc_phat_stereo(self):
|
||||
"""Test GCC-PHAT with stereo signals."""
|
||||
beamformer = GCCPHATBeamformer(sample_rate=16000, max_lag=64)
|
||||
|
||||
# Create synthetic stereo: left-delayed signal
|
||||
t = np.arange(512) / 16000.0
|
||||
sig1 = np.sin(2 * np.pi * 1000 * t) # Left mic
|
||||
sig2 = np.sin(2 * np.pi * 1000 * (t - 0.004)) # Right mic (delayed)
|
||||
|
||||
lag = beamformer.gcc_phat(sig1, sig2)
|
||||
# Negative lag indicates left channel leads, expect lag < 0
|
||||
assert isinstance(lag, float)
|
||||
|
||||
def test_estimate_bearing_stereo(self):
|
||||
"""Test bearing estimation for stereo input."""
|
||||
beamformer = GCCPHATBeamformer(sample_rate=16000, max_lag=64)
|
||||
|
||||
t = np.arange(512) / 16000.0
|
||||
sig1 = np.sin(2 * np.pi * 1000 * t)
|
||||
sig2 = np.sin(2 * np.pi * 1000 * t)
|
||||
|
||||
bearing = beamformer.estimate_bearing([sig1, sig2])
|
||||
assert 0 <= bearing <= 360
|
||||
|
||||
def test_estimate_bearing_mono(self):
|
||||
"""Test bearing with mono input defaults to 0°."""
|
||||
beamformer = GCCPHATBeamformer(sample_rate=16000)
|
||||
sig = np.sin(2 * np.pi * 1000 * np.arange(512) / 16000.0)
|
||||
bearing = beamformer.estimate_bearing([sig])
|
||||
assert bearing == 0.0
|
||||
|
||||
|
||||
class TestVADDetector:
|
||||
"""Tests for voice activity detection."""
|
||||
|
||||
def test_vad_init(self):
|
||||
"""Test VAD detector initialization."""
|
||||
vad = VADDetector(threshold=0.02, smoothing=5)
|
||||
assert vad.threshold == 0.02
|
||||
assert vad.smoothing == 5
|
||||
|
||||
def test_vad_silence(self):
|
||||
"""Test VAD detects silence."""
|
||||
vad = VADDetector(threshold=0.02)
|
||||
silence = np.zeros(2048) * 0.001 # Very quiet
|
||||
is_speech = vad.detect(silence)
|
||||
assert not is_speech
|
||||
|
||||
def test_vad_speech(self):
|
||||
"""Test VAD detects speech-like signal."""
|
||||
vad = VADDetector(threshold=0.02)
|
||||
t = np.arange(2048) / 16000.0
|
||||
speech = np.sin(2 * np.pi * 1000 * t) * 0.1 # ~0.07 RMS
|
||||
for _ in range(5): # Run multiple times to accumulate history
|
||||
is_speech = vad.detect(speech)
|
||||
assert is_speech
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
pytest.main([__file__])
|
||||
Loading…
x
Reference in New Issue
Block a user