feat: Piper TTS service (Issue #421) #425

Merged
sl-jetson merged 1 commits from sl-mechanical/issue-421-tts-service into main 2026-03-04 23:59:12 -05:00
9 changed files with 436 additions and 0 deletions

View File

@ -0,0 +1,11 @@
/**:
ros__parameters:
tts_service:
voice_model_path: "/models/piper/en_US-lessac-medium.onnx"
sample_rate: 22050
speed: 1.0 # Normal playback speed
pitch: 1.0 # Normal pitch
volume: 0.8 # 01.0 (safety: reduced from 1.0)
audio_device: "Jabra" # ALSA device name or alias
queue_max_size: 16 # Max queued TTS requests
autoplay: true # Auto-play TTS on startup

View File

@ -0,0 +1,47 @@
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
import os
from ament_index_python.packages import get_package_share_directory
def generate_launch_description():
"""Launch the TTS service node."""
package_dir = get_package_share_directory("saltybot_tts_service")
config_path = os.path.join(package_dir, "config", "tts_service_params.yaml")
# Declare launch arguments
voice_model_arg = DeclareLaunchArgument(
"voice_model",
default_value="/models/piper/en_US-lessac-medium.onnx",
description="Path to Piper voice model (ONNX)",
)
audio_device_arg = DeclareLaunchArgument(
"audio_device",
default_value="Jabra",
description="ALSA audio device name or alias",
)
# TTS service node
tts_node = Node(
package="saltybot_tts_service",
executable="tts_service_node",
name="tts_service",
parameters=[
config_path,
{"voice_model_path": LaunchConfiguration("voice_model")},
{"audio_device": LaunchConfiguration("audio_device")},
],
respawn=False,
output="screen",
)
return LaunchDescription(
[
voice_model_arg,
audio_device_arg,
tts_node,
]
)

View File

@ -0,0 +1,24 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_tts_service</name>
<version>0.1.0</version>
<description>Central TTS (text-to-speech) service using Piper with action server, queue management, and Jabra audio output.</description>
<maintainer email="seb@vayrette.com">seb</maintainer>
<license>Apache-2.0</license>
<buildtool_depend>ament_python</buildtool_depend>
<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>rclcpp_action</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,287 @@
"""
tts_service_node.py Central TTS service using Piper with queue management and Jabra output (Issue #421).
Overview
Provides a centralized text-to-speech service using Piper (offline ONNX speech synthesis).
Manages a priority queue with interrupt capability, outputs to Jabra speaker via ALSA/PulseAudio,
and publishes TTS state updates.
Subscribes
/saltybot/tts_request std_msgs/String text to synthesize (priority 0, deferrable)
Services (Future)
/saltybot/tts_speak (action server) request with priority/interrupt (WIP)
Publishers
/saltybot/tts_state std_msgs/String current state ("idle", "synthesizing", "playing")
Parameters
voice_model_path str '/models/piper/en_US-lessac-medium.onnx'
sample_rate int 22050
speed float 1.0 (1.0 = normal, <1 = slower, >1 = faster)
pitch float 1.0
volume float 1.0 (01.0)
audio_device str 'Jabra' (ALSA device hint or empty for default)
queue_max_size int 16
autoplay bool True
"""
from __future__ import annotations
import queue
import threading
import time
from typing import Optional
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile
from std_msgs.msg import String
class TtsQueueItem:
"""Item in the TTS priority queue."""
def __init__(self, text: str, priority: int = 0, interrupt: bool = False):
self.text = text.strip()
self.priority = priority # 0 = normal, >0 = high priority
self.interrupt = interrupt # True = interrupt current playback
self.timestamp = time.time()
def __lt__(self, other):
"""Sort by priority (desc), then by timestamp (asc)."""
if self.priority != other.priority:
return self.priority > other.priority
return self.timestamp < other.timestamp
class TtsServiceNode(Node):
"""Central TTS service node using Piper with priority queue and Jabra output."""
def __init__(self):
super().__init__("tts_service")
# Parameters
self.declare_parameter("voice_model_path", "/models/piper/en_US-lessac-medium.onnx")
self.declare_parameter("sample_rate", 22050)
self.declare_parameter("speed", 1.0)
self.declare_parameter("pitch", 1.0)
self.declare_parameter("volume", 1.0)
self.declare_parameter("audio_device", "Jabra")
self.declare_parameter("queue_max_size", 16)
self.declare_parameter("autoplay", True)
self._model_path = self.get_parameter("voice_model_path").value
self._sample_rate = self.get_parameter("sample_rate").value
self._speed = self.get_parameter("speed").value
self._pitch = self.get_parameter("pitch").value
self._volume = self.get_parameter("volume").value
self._audio_device = self.get_parameter("audio_device").value or "default"
self._queue_max = self.get_parameter("queue_max_size").value
self._autoplay = self.get_parameter("autoplay").value
# Voice model (loaded on startup)
self._voice = None
self._voice_lock = threading.Lock()
self._load_voice_model()
# Queue and playback state
self._tts_queue = queue.PriorityQueue(maxsize=self._queue_max)
self._state = "idle" # "idle", "synthesizing", "playing"
self._state_lock = threading.Lock()
self._current_interrupt = False
# QoS for publishers/subscribers
qos = QoSProfile(depth=5)
# Subscriptions
self.create_subscription(
String,
"/saltybot/tts_request",
self._on_tts_request,
qos,
)
# Publishers
self._state_pub = self.create_publisher(String, "/saltybot/tts_state", qos)
# Worker threads
if self._autoplay and self._voice is not None:
self._worker_thread = threading.Thread(
target=self._playback_worker, daemon=True, name="tts-worker"
)
self._worker_thread.start()
self.get_logger().info(
f"TtsServiceNode ready voice={self._model_path} "
f"device={self._audio_device} autoplay={self._autoplay}"
)
# ── Voice Model Loading ────────────────────────────────────────────────────
def _load_voice_model(self) -> None:
"""Preload Piper voice model on startup."""
try:
from piper import PiperVoice
with self._voice_lock:
if self._voice is not None:
return
self.get_logger().info(f"Loading Piper model: {self._model_path}")
voice = PiperVoice.load(self._model_path)
# Test synthesis
list(voice.synthesize_stream_raw("Hello."))
with self._voice_lock:
self._voice = voice
self.get_logger().info("✓ Piper model preloaded successfully")
except Exception as e:
self.get_logger().error(f"✗ Failed to load Piper model: {e}")
self._voice = None
# ── Subscriptions ──────────────────────────────────────────────────────────
def _on_tts_request(self, msg: String) -> None:
"""Handle incoming TTS requests."""
text = msg.data.strip()
if not text:
return
try:
item = TtsQueueItem(text, priority=0, interrupt=False)
self._tts_queue.put_nowait(item)
self.get_logger().debug(f"Queued TTS: {text[:50]}...")
except queue.Full:
self.get_logger().warn("TTS queue full, dropping request")
# ── Playback Worker ────────────────────────────────────────────────────────
def _playback_worker(self) -> None:
"""Worker thread for TTS synthesis and playback."""
while rclpy.ok():
try:
# Get next item from priority queue
item = self._tts_queue.get(timeout=1.0)
# Check for interrupt
with self._state_lock:
if self._current_interrupt:
self.get_logger().info("Interrupted playback")
self._current_interrupt = False
continue
self._synthesize_and_play(item)
except queue.Empty:
continue
except Exception as e:
self.get_logger().error(f"Playback worker error: {e}")
def _synthesize_and_play(self, item: TtsQueueItem) -> None:
"""Synthesize text and play audio via ALSA/PulseAudio."""
with self._state_lock:
self._state = "synthesizing"
self._publish_state()
try:
with self._voice_lock:
voice = self._voice
if voice is None:
self.get_logger().error("Voice model not loaded")
return
# Synthesize audio
audio_chunks = list(
voice.synthesize_stream_raw(
item.text,
speaker=None,
)
)
# Combine audio chunks into single buffer
import numpy as np
audio_data = b"".join(audio_chunks)
audio_array = np.frombuffer(audio_data, dtype=np.int16)
# Apply volume scaling
audio_array = (audio_array * self._volume).astype(np.int16)
with self._state_lock:
self._state = "playing"
self._publish_state()
# Play audio via ALSA/PulseAudio
self._play_audio(audio_array)
self.get_logger().info(f"✓ Played: {item.text[:50]}...")
except Exception as e:
self.get_logger().error(f"Synthesis/playback error: {e}")
finally:
with self._state_lock:
self._state = "idle"
self._publish_state()
def _play_audio(self, audio_array) -> None:
"""Play audio buffer via ALSA/PulseAudio to Jabra device."""
try:
import subprocess
import numpy as np
# Convert audio to WAV format and pipe to aplay
import io
import wave
wav_buffer = io.BytesIO()
with wave.open(wav_buffer, "wb") as wav:
wav.setnchannels(1)
wav.setsampwidth(2)
wav.setframerate(self._sample_rate)
wav.writeframes(audio_array.tobytes())
wav_data = wav_buffer.getvalue()
# Attempt to play via ALSA with Jabra device hint
cmd = ["aplay", "-D", self._audio_device, "-q"]
proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
proc.communicate(input=wav_data, timeout=30)
except FileNotFoundError:
self.get_logger().warn("aplay not found; audio playback unavailable")
except Exception as e:
self.get_logger().error(f"Audio playback error: {e}")
# ── State Publishing ───────────────────────────────────────────────────────
def _publish_state(self) -> None:
"""Publish current TTS state."""
with self._state_lock:
state = self._state
msg = String()
msg.data = state
self._state_pub.publish(msg)
def main(args=None):
rclpy.init(args=args)
node = TtsServiceNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.try_shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,5 @@
[develop]
script_dir=$base/lib/saltybot_tts_service
[install]
script_dir=$base/lib/saltybot_tts_service

View File

@ -0,0 +1,22 @@
from setuptools import setup, find_packages
setup(
name='saltybot_tts_service',
version='0.1.0',
packages=find_packages(),
data_files=[
('share/ament_index/resource_index/packages', ['resource/saltybot_tts_service']),
('share/saltybot_tts_service', ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
author='seb',
author_email='seb@vayrette.com',
description='Central TTS service with Piper, action server, and queue management',
license='Apache-2.0',
entry_points={
'console_scripts': [
'tts_service_node = saltybot_tts_service.tts_service_node:main',
],
},
)

View File

@ -0,0 +1,40 @@
"""Unit tests for TTS service node."""
import unittest
from saltybot_tts_service.tts_service_node import TtsQueueItem
class TestTtsQueueItem(unittest.TestCase):
"""Test TtsQueueItem priority and sorting."""
def test_queue_item_creation(self):
"""Test creating a TTS queue item."""
item = TtsQueueItem("Hello world", priority=0)
self.assertEqual(item.text, "Hello world")
self.assertEqual(item.priority, 0)
self.assertFalse(item.interrupt)
def test_queue_item_priority_sorting(self):
"""Test that items sort by priority (descending) then timestamp (ascending)."""
import time
item1 = TtsQueueItem("Low priority", priority=0)
time.sleep(0.01) # Ensure different timestamp
item2 = TtsQueueItem("High priority", priority=1)
# item2 should be < item1 in priority queue (higher priority = lower value)
self.assertTrue(item2 < item1)
def test_queue_item_strips_whitespace(self):
"""Test that text is stripped of whitespace."""
item = TtsQueueItem(" Hello world ")
self.assertEqual(item.text, "Hello world")
def test_queue_item_interrupt_flag(self):
"""Test interrupt flag."""
item = TtsQueueItem("Interrupt me", interrupt=True)
self.assertTrue(item.interrupt)
if __name__ == "__main__":
unittest.main()