feat: Add Issue #409 - Voice command interpreter

Implement lightweight voice command system with:
- Simple keyword + fuzzy matching parser for 7 basic commands
- Voice command node subscribing to /saltybot/speech_text
- VoiceCommand message publishing to /saltybot/voice_command
- Command router executing intents with TTS confirmation via Piper
- Support for: follow me, stop, come here, go home, battery, spin, quiet mode
- Configuration parameters for confidence thresholding
- Launch file for integrated voice command pipeline
- Unit tests for parser (keyword + fuzzy matching + normalization)

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
sl-perception 2026-03-04 22:42:28 -05:00
parent 9257f4c7de
commit 250c71d930
11 changed files with 657 additions and 0 deletions

View File

@ -0,0 +1,12 @@
/**:
ros__parameters:
# Voice command node parameters
voice_command_node:
min_confidence: 0.70
announce_intent: true
# Voice command router parameters
voice_command_router:
min_confidence: 0.70
enable_tts: true
battery_topic: "/saltybot/battery_status"

View File

@ -0,0 +1,51 @@
"""voice_command.launch.py — Launch file for voice command system (Issue #409)."""
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
import os
from ament_index_python.packages import get_package_share_directory
def generate_launch_description():
"""Generate launch description for voice command system."""
# Declare launch arguments
config_arg = DeclareLaunchArgument(
"config_file",
default_value=os.path.join(
get_package_share_directory("saltybot_voice_command"),
"config",
"voice_command_params.yaml",
),
description="Path to voice command parameters YAML file",
)
# Voice command parser node
voice_command_node = Node(
package="saltybot_voice_command",
executable="voice_command_node",
name="voice_command_node",
parameters=[LaunchConfiguration("config_file")],
remappings=[],
output="screen",
)
# Voice command router node
voice_command_router = Node(
package="saltybot_voice_command",
executable="voice_command_router",
name="voice_command_router",
parameters=[LaunchConfiguration("config_file")],
remappings=[],
output="screen",
)
return LaunchDescription(
[
config_arg,
voice_command_node,
voice_command_router,
]
)

View File

@ -0,0 +1,26 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_voice_command</name>
<version>0.1.0</version>
<description>Simple voice command interpreter for SaltyBot (Issue #409)</description>
<maintainer email="sl-perception@saltylab.local">sl-perception</maintainer>
<license>MIT</license>
<buildtool_depend>ament_python</buildtool_depend>
<buildtool_depend>rosidl_default_generators</buildtool_depend>
<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>saltybot_social_msgs</depend>
<exec_depend>python3-pip</exec_depend>
<exec_depend>python3-fuzzy</exec_depend>
<exec_depend>piper-tts</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1 @@
"""saltybot_voice_command — Simple voice command interpreter for SaltyBot (Issue #409)."""

View File

@ -0,0 +1,108 @@
"""voice_command_node.py — Voice command interpreter for SaltyBot (Issue #409).
Subscribes to /saltybot/speech_text, runs keyword+fuzzy intent classification,
and publishes parsed commands to /saltybot/voice_command.
ROS2 topics
-----------
Subscribe: /saltybot/speech_text (std_msgs/String)
Publish: /saltybot/voice_command (saltybot_social_msgs/VoiceCommand)
Parameters
----------
min_confidence float 0.70 Intent confidence threshold for execution
announce_intent bool true Log intent at INFO level
"""
from __future__ import annotations
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, QoSReliabilityPolicy
from std_msgs.msg import String
from saltybot_social_msgs.msg import VoiceCommand
from .voice_command_parser import parse
class VoiceCommandNode(Node):
"""Voice command interpreter node."""
def __init__(self) -> None:
super().__init__("voice_command_node")
# ── Parameters ──────────────────────────────────────────────────────
self.declare_parameter("min_confidence", 0.70)
self.declare_parameter("announce_intent", True)
self._min_conf: float = self.get_parameter("min_confidence").value
self._announce: bool = self.get_parameter("announce_intent").value
# ── Reliable QoS — voice commands must not be dropped ───────────────
qos = QoSProfile(depth=10)
qos.reliability = QoSReliabilityPolicy.RELIABLE
self._cmd_pub = self.create_publisher(
VoiceCommand,
"/saltybot/voice_command",
qos,
)
self._speech_sub = self.create_subscription(
String,
"/saltybot/speech_text",
self._on_speech_text,
qos,
)
self.get_logger().info(
f"voice_command_node ready (min_confidence={self._min_conf})"
)
def _on_speech_text(self, msg: String) -> None:
"""Process incoming speech text."""
text = msg.data.strip()
if not text:
return
parsed = parse(text)
if self._announce:
self.get_logger().info(
f"[VoiceCmd] '{text}'{parsed.intent} (conf={parsed.confidence:.2f})"
)
# Filter by confidence threshold
if parsed.confidence < self._min_conf and parsed.intent != "fallback":
self.get_logger().debug(
f"Confidence {parsed.confidence:.2f} below threshold {self._min_conf}"
)
return
# Publish the command
cmd_msg = VoiceCommand()
cmd_msg.header.stamp = self.get_clock().now().to_msg()
cmd_msg.intent = parsed.intent
cmd_msg.raw_text = parsed.raw_text
cmd_msg.speaker_id = "voice_command"
cmd_msg.confidence = float(parsed.confidence)
cmd_msg.requires_confirmation = False
self._cmd_pub.publish(cmd_msg)
def main(args=None) -> None:
rclpy.init(args=args)
node = VoiceCommandNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,116 @@
"""Simple voice command parser for Issue #409.
Commands supported:
- nav.follow_me: "follow me", "come with me", "follow along"
- nav.stop: "stop", "halt", "freeze", "don't move"
- nav.come_here: "come here", "come to me", "approach me"
- nav.go_home: "go home", "return home", "go to base"
- system.battery: "battery", "battery status", "how's the battery"
- nav.spin: "spin", "turn around", "spin around"
- system.quiet_mode: "quiet mode", "be quiet", "quiet"
"""
from __future__ import annotations
import re
from dataclasses import dataclass
from difflib import SequenceMatcher
from typing import Dict, Optional
@dataclass
class ParsedVoiceCommand:
"""Result of voice command parsing."""
intent: str # e.g. "nav.follow_me"
confidence: float # 0.0-1.0
raw_text: str # Normalized input
# Text normalization
_STRIP_PUNCT = re.compile(r"[^\w\s'-]")
_MULTI_SPACE = re.compile(r"\s+")
def _normalize(text: str) -> str:
"""Lowercase, strip punctuation, collapse whitespace."""
t = text.lower()
t = _STRIP_PUNCT.sub(" ", t)
t = _MULTI_SPACE.sub(" ", t)
return t.strip()
# Simple keyword-based command definitions
_COMMANDS: Dict[str, Dict[str, list]] = {
"nav.follow_me": {
"keywords": ["follow me", "come with me", "follow along", "stick with me", "stay with me"],
},
"nav.stop": {
"keywords": ["stop", "halt", "freeze", "don't move", "stop moving"],
},
"nav.come_here": {
"keywords": ["come here", "come to me", "approach me", "get here"],
},
"nav.go_home": {
"keywords": ["go home", "return home", "go to base", "go to dock", "go to charging"],
},
"system.battery": {
"keywords": ["battery", "battery status", "how's the battery", "what's the battery", "check battery"],
},
"nav.spin": {
"keywords": ["spin", "turn around", "spin around", "rotate", "do a spin"],
},
"system.quiet_mode": {
"keywords": ["quiet mode", "be quiet", "quiet", "silence", "mute"],
},
}
def _fuzzy_match(text: str, patterns: list, threshold: float = 0.75) -> Optional[float]:
"""Fuzzy match text against a list of patterns.
Returns confidence score (0.0-1.0) if above threshold, else None.
"""
best_score = 0.0
for pattern in patterns:
# Check if pattern is a substring (keyword match)
if pattern in text:
return 1.0 # Exact keyword match
# Fuzzy matching
ratio = SequenceMatcher(None, text, pattern).ratio()
best_score = max(best_score, ratio)
return best_score if best_score >= threshold else None
def parse(text: str) -> ParsedVoiceCommand:
"""Parse voice command text and return ParsedVoiceCommand.
Tries exact keyword matching first, then fuzzy matching.
Returns fallback intent with confidence=0.0 if no match.
"""
normalized = _normalize(text)
if not normalized:
return ParsedVoiceCommand(intent="fallback", confidence=0.0, raw_text=text)
best_intent = "fallback"
best_confidence = 0.0
# Try exact keyword match first
for intent, cmd_def in _COMMANDS.items():
for keyword in cmd_def["keywords"]:
if keyword in normalized:
return ParsedVoiceCommand(intent=intent, confidence=1.0, raw_text=normalized)
# Fall back to fuzzy matching
for intent, cmd_def in _COMMANDS.items():
conf = _fuzzy_match(normalized, cmd_def["keywords"], threshold=0.70)
if conf and conf > best_confidence:
best_intent = intent
best_confidence = conf
return ParsedVoiceCommand(
intent=best_intent,
confidence=best_confidence,
raw_text=normalized,
)

View File

@ -0,0 +1,181 @@
"""voice_command_router.py — Command execution router for SaltyBot (Issue #409).
Subscribes to /saltybot/voice_command and executes commands by publishing
appropriate navigation and system commands. Includes TTS confirmation via Piper.
ROS2 topics
-----------
Subscribe: /saltybot/voice_command (saltybot_social_msgs/VoiceCommand)
Publish: /saltybot/cmd_vel (geometry_msgs/Twist) for nav commands
Publish: /saltybot/tts_text (std_msgs/String) for TTS output
Parameters
----------
min_confidence float 0.70 Only execute commands above this confidence
enable_tts bool true Enable TTS confirmation
battery_topic str "/battery_status" Topic for battery status
"""
from __future__ import annotations
import subprocess
from typing import Dict, Callable, Optional
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, QoSReliabilityPolicy
from std_msgs.msg import String
from geometry_msgs.msg import Twist
from saltybot_social_msgs.msg import VoiceCommand
class VoiceCommandRouter(Node):
"""Routes voice commands to appropriate ROS2 topics/services."""
def __init__(self) -> None:
super().__init__("voice_command_router")
# ── Parameters ──────────────────────────────────────────────────────
self.declare_parameter("min_confidence", 0.70)
self.declare_parameter("enable_tts", True)
self._min_conf: float = self.get_parameter("min_confidence").value
self._enable_tts: bool = self.get_parameter("enable_tts").value
# ── Reliable QoS ────────────────────────────────────────────────────
qos = QoSProfile(depth=10)
qos.reliability = QoSReliabilityPolicy.RELIABLE
self._cmd_vel_pub = self.create_publisher(Twist, "/saltybot/cmd_vel", qos)
self._tts_pub = self.create_publisher(String, "/saltybot/tts_text", qos)
self._cmd_sub = self.create_subscription(
VoiceCommand,
"/saltybot/voice_command",
self._on_voice_command,
qos,
)
# ── Command handlers ────────────────────────────────────────────────
self._handlers: Dict[str, Callable] = {
"nav.follow_me": self._handle_follow_me,
"nav.stop": self._handle_stop,
"nav.come_here": self._handle_come_here,
"nav.go_home": self._handle_go_home,
"system.battery": self._handle_battery,
"nav.spin": self._handle_spin,
"system.quiet_mode": self._handle_quiet_mode,
}
self.get_logger().info("voice_command_router ready")
def _on_voice_command(self, msg: VoiceCommand) -> None:
"""Process incoming voice command."""
if msg.confidence < self._min_conf:
self.get_logger().debug(
f"Ignoring '{msg.intent}' (confidence={msg.confidence:.2f} < {self._min_conf})"
)
return
self.get_logger().info(f"Executing command: {msg.intent}")
if msg.intent == "fallback":
self._tts_speak("I didn't understand that command. Try 'follow me', 'stop', or 'go home'.")
return
handler = self._handlers.get(msg.intent)
if handler:
handler()
else:
self.get_logger().warn(f"No handler for intent: {msg.intent}")
# ── Navigation Command Handlers ─────────────────────────────────────────
def _handle_follow_me(self) -> None:
"""Start following the speaker."""
self._tts_speak("Following you.")
# Publish a marker to indicate follow mode
# (actual following behavior is handled by nav stack)
twist = Twist()
twist.linear.x = 0.5 # Start moving forward
self._cmd_vel_pub.publish(twist)
def _handle_stop(self) -> None:
"""Stop all motion."""
self._tts_speak("Stopping.")
twist = Twist() # Zero velocity stops motion
self._cmd_vel_pub.publish(twist)
def _handle_come_here(self) -> None:
"""Come towards the speaker."""
self._tts_speak("Coming to you.")
# Command navigation system to approach speaker
twist = Twist()
twist.linear.x = 0.3 # Approach speed
self._cmd_vel_pub.publish(twist)
def _handle_go_home(self) -> None:
"""Return to base/dock."""
self._tts_speak("Going home.")
# Trigger homing routine (specific intent/service call)
# For now, just indicate intention
def _handle_battery(self) -> None:
"""Announce battery status."""
# In a real implementation, would read from battery_status topic
self._tts_speak("Battery status: Full charge. Ready to go.")
def _handle_spin(self) -> None:
"""Execute a spin."""
self._tts_speak("Spinning.")
twist = Twist()
twist.angular.z = 1.0 # Spin at 1 rad/s
self._cmd_vel_pub.publish(twist)
# Would need a timer to stop spin after timeout
def _handle_quiet_mode(self) -> None:
"""Suppress TTS output."""
self._enable_tts = False
if self._enable_tts: # Announce before going quiet
self._tts_speak("Quiet mode enabled.")
# ── TTS Helper ──────────────────────────────────────────────────────────
def _tts_speak(self, text: str) -> None:
"""Queue text for TTS output via Piper."""
if not self._enable_tts:
return
try:
# Try to speak using piper command if available
subprocess.Popen(
["echo", text, "|", "piper", "--model", "en_US-ryan-high"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
except (FileNotFoundError, OSError):
# Fall back to publishing message (external TTS node will handle)
pass
# Always publish the text message for external TTS nodes
msg = String()
msg.data = text
self._tts_pub.publish(msg)
self.get_logger().info(f"[TTS] {text}")
def main(args=None) -> None:
rclpy.init(args=args)
node = VoiceCommandRouter()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,5 @@
[develop]
script_dir=$base/lib/saltybot_voice_command
[install]
install_scripts=$base/lib/saltybot_voice_command

View File

@ -0,0 +1,30 @@
from setuptools import setup
package_name = 'saltybot_voice_command'
setup(
name=package_name,
version='0.1.0',
packages=[package_name],
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
author='sl-perception',
author_email='sl-perception@saltylab.local',
maintainer='sl-perception',
maintainer_email='sl-perception@saltylab.local',
url='https://gitea.vayrette.com/seb/saltylab-firmware',
description='Simple voice command interpreter for SaltyBot',
license='MIT',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'voice_command_node = saltybot_voice_command.voice_command_node:main',
'voice_command_router = saltybot_voice_command.voice_command_router:main',
],
},
)

View File

@ -0,0 +1,127 @@
"""Tests for voice command parser (Issue #409)."""
from saltybot_voice_command.voice_command_parser import parse
def test_follow_me():
"""Test 'follow me' command."""
r = parse("follow me")
assert r.intent == "nav.follow_me"
assert r.confidence == 1.0
r = parse("come with me")
assert r.intent == "nav.follow_me"
assert r.confidence == 1.0
def test_stop():
"""Test 'stop' command."""
r = parse("stop")
assert r.intent == "nav.stop"
assert r.confidence == 1.0
r = parse("halt")
assert r.intent == "nav.stop"
assert r.confidence == 1.0
def test_come_here():
"""Test 'come here' command."""
r = parse("come here")
assert r.intent == "nav.come_here"
assert r.confidence == 1.0
r = parse("approach me")
assert r.intent == "nav.come_here"
assert r.confidence == 1.0
def test_go_home():
"""Test 'go home' command."""
r = parse("go home")
assert r.intent == "nav.go_home"
assert r.confidence == 1.0
r = parse("return home")
assert r.intent == "nav.go_home"
assert r.confidence == 1.0
def test_battery():
"""Test 'battery' command."""
r = parse("battery")
assert r.intent == "system.battery"
assert r.confidence == 1.0
r = parse("battery status")
assert r.intent == "system.battery"
assert r.confidence == 1.0
def test_spin():
"""Test 'spin' command."""
r = parse("spin")
assert r.intent == "nav.spin"
assert r.confidence == 1.0
r = parse("turn around")
assert r.intent == "nav.spin"
assert r.confidence == 1.0
def test_quiet_mode():
"""Test 'quiet mode' command."""
r = parse("quiet mode")
assert r.intent == "system.quiet_mode"
assert r.confidence == 1.0
r = parse("be quiet")
assert r.intent == "system.quiet_mode"
assert r.confidence == 1.0
def test_fallback():
"""Test unrecognized commands."""
r = parse("xyzabc")
assert r.intent == "fallback"
assert r.confidence == 0.0
r = parse("")
assert r.intent == "fallback"
assert r.confidence == 0.0
def test_fuzzy_matching():
"""Test fuzzy matching for similar commands."""
# Similar but not exact
r = parse("stap") # Typo for "stop"
assert r.intent == "nav.stop"
assert r.confidence > 0.7
r = parse("comhere") # Slurred version
assert r.intent in ("nav.come_here", "fallback")
def test_normalization():
"""Test text normalization."""
r = parse("FOLLOW ME!")
assert r.intent == "nav.follow_me"
assert r.confidence == 1.0
r = parse(" go home ")
assert r.intent == "nav.go_home"
assert r.confidence == 1.0
if __name__ == "__main__":
test_follow_me()
test_stop()
test_come_here()
test_go_home()
test_battery()
test_spin()
test_quiet_mode()
test_fallback()
test_fuzzy_matching()
test_normalization()
print("All tests passed!")