Merge pull request 'feat: Phone voice command interface (Issue #553)' (#554) from sl-android/issue-553-voice-command into main

This commit is contained in:
sl-jetson 2026-03-14 11:36:27 -04:00
commit 80e3b23aec

448
phone/voice_commander.py Normal file
View File

@ -0,0 +1,448 @@
#!/usr/bin/env python3
"""
voice_commander.py Phone-based voice command interface for SaltyBot (Issue #553)
Runs on Android/Termux. Listens for the wake word 'Hey Salty', transcribes speech
via OpenAI Whisper (local), parses robot commands, and publishes to ROS2 topic
/saltybot/voice/cmd via WebSocket bridge to Jetson Orin. Confirms commands via
termux-tts-speak.
Supported commands:
go forward / go back / go left / go right
stop / halt
follow me
go home
look at me
Usage:
python3 phone/voice_commander.py [OPTIONS]
Options:
--host HOST Jetson IP or hostname (default: 192.168.1.100)
--port PORT rosbridge WebSocket port (default: 9090)
--model MODEL Whisper model size: tiny/base/small (default: base)
--threshold FLOAT Wake word match threshold 0.0-1.0 (default: 0.6)
--record-sec FLOAT Seconds to record after wake word (default: 3.0)
--no-tts Disable TTS confirmation
--debug Verbose logging
"""
import argparse
import json
import logging
import os
import subprocess
import sys
import tempfile
import threading
import time
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import Optional
# ── Optional ROS2 ────────────────────────────────────────────────────────────
try:
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
ROS2_AVAILABLE = True
except ImportError:
ROS2_AVAILABLE = False
# ── Whisper (local STT) ───────────────────────────────────────────────────────
try:
import whisper
WHISPER_AVAILABLE = True
except ImportError:
WHISPER_AVAILABLE = False
# ── WebSocket client ──────────────────────────────────────────────────────────
try:
import websocket # websocket-client
WS_AVAILABLE = True
except ImportError:
WS_AVAILABLE = False
# ── Constants ────────────────────────────────────────────────────────────────
WAKE_WORD = "hey salty"
VOICE_CMD_TOPIC = "/saltybot/voice/cmd"
SAMPLE_RATE = 16000 # Hz required by Whisper
WAKE_RECORD_SEC = 1.5 # short clip to check for wake word
CMD_RECORD_SEC = 3.0 # command clip length after wake word
RECONNECT_DELAY = 5.0 # seconds between WS reconnects
CHUNK_BYTES = 4096
class Command(Enum):
GO_FORWARD = "go_forward"
GO_BACK = "go_back"
GO_LEFT = "go_left"
GO_RIGHT = "go_right"
STOP = "stop"
FOLLOW_ME = "follow_me"
GO_HOME = "go_home"
LOOK_AT_ME = "look_at_me"
UNKNOWN = "unknown"
# ── Command parsing ───────────────────────────────────────────────────────────
# Each entry: (list_of_trigger_phrases, Command)
COMMAND_TABLE = [
(["go forward", "move forward", "forward", "ahead", "go straight"], Command.GO_FORWARD),
(["go back", "go backward", "move back", "reverse", "back up"], Command.GO_BACK),
(["go left", "turn left", "move left", "left"], Command.GO_LEFT),
(["go right", "turn right", "move right", "right"], Command.GO_RIGHT),
(["stop", "halt", "freeze", "stay", "stand by"], Command.STOP),
(["follow me", "come here", "come with me", "follow"], Command.FOLLOW_ME),
(["go home", "return home", "return to base", "dock"], Command.GO_HOME),
(["look at me", "face me", "look here", "turn to me"], Command.LOOK_AT_ME),
]
TTS_CONFIRMATIONS = {
Command.GO_FORWARD: "Going forward",
Command.GO_BACK: "Going back",
Command.GO_LEFT: "Turning left",
Command.GO_RIGHT: "Turning right",
Command.STOP: "Stopping",
Command.FOLLOW_ME: "Following you",
Command.GO_HOME: "Heading home",
Command.LOOK_AT_ME: "Looking at you",
Command.UNKNOWN: "Sorry, I didn't understand that",
}
def parse_command(text: str) -> Command:
"""Match transcribed text against command table. Returns best match or UNKNOWN."""
text = text.lower().strip()
for phrases, cmd in COMMAND_TABLE:
for phrase in phrases:
if phrase in text:
return cmd
return Command.UNKNOWN
def contains_wake_word(text: str, threshold: float = 0.6) -> bool:
"""Check if transcribed text contains the wake word (fuzzy match)."""
text = text.lower().strip()
if WAKE_WORD in text:
return True
# Simple token overlap fallback
wake_tokens = set(WAKE_WORD.split())
text_tokens = set(text.split())
overlap = len(wake_tokens & text_tokens) / len(wake_tokens)
return overlap >= threshold
# ── Audio capture via termux-microphone-record ────────────────────────────────
def record_audio(duration_sec: float, output_path: str) -> bool:
"""
Record audio using termux-microphone-record.
Saves a 16 kHz mono WAV to output_path.
Returns True on success.
"""
# termux-microphone-record writes to a file; we start, wait, then stop.
try:
subprocess.run(
[
"termux-microphone-record",
"-l", str(int(duration_sec)), # duration in seconds
"-r", str(SAMPLE_RATE), # sample rate
"-c", "1", # mono
"-e", "aac", # encoding (aac is reliable on Android)
"-f", output_path,
],
check=True,
timeout=duration_sec + 5,
capture_output=True,
)
return Path(output_path).exists()
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError) as e:
logging.debug("termux-microphone-record error: %s", e)
return False
def tts_speak(text: str) -> None:
"""Speak text via termux-tts-speak (non-blocking)."""
try:
subprocess.Popen(
["termux-tts-speak", text],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
except FileNotFoundError:
logging.debug("termux-tts-speak not available")
# ── Whisper STT ───────────────────────────────────────────────────────────────
class WhisperSTT:
"""Thin wrapper around local Whisper model."""
def __init__(self, model_size: str = "base"):
if not WHISPER_AVAILABLE:
raise RuntimeError(
"openai-whisper not installed. Run: pip install openai-whisper"
)
logging.info("Loading Whisper model '%s'...", model_size)
self.model = whisper.load_model(model_size)
logging.info("Whisper model loaded.")
def transcribe(self, audio_path: str) -> str:
"""Transcribe audio file, return lowercase text."""
try:
result = self.model.transcribe(audio_path, language="en", fp16=False)
text = result.get("text", "").strip()
logging.debug("Whisper transcription: '%s'", text)
return text
except Exception as e:
logging.warning("Whisper transcription failed: %s", e)
return ""
# ── Publisher backends ────────────────────────────────────────────────────────
class ROS2Publisher:
"""Publish voice commands as std_msgs/String on /saltybot/voice/cmd."""
def __init__(self):
rclpy.init()
self._node = Node("voice_commander")
self._pub = self._node.create_publisher(String, VOICE_CMD_TOPIC, 10)
self._spin_thread = threading.Thread(
target=lambda: rclpy.spin(self._node), daemon=True
)
self._spin_thread.start()
def publish(self, cmd: Command, raw_text: str) -> None:
payload = json.dumps({"command": cmd.value, "raw": raw_text, "ts": time.time()})
msg = String()
msg.data = payload
self._pub.publish(msg)
logging.info("ROS2 published: %s", payload)
def shutdown(self) -> None:
self._node.destroy_node()
rclpy.shutdown()
class WebSocketPublisher:
"""Publish voice commands over rosbridge WebSocket protocol."""
def __init__(self, host: str, port: int):
self.uri = f"ws://{host}:{port}"
self._ws: Optional[websocket.WebSocket] = None
self._lock = threading.Lock()
self._connect()
def _connect(self) -> None:
try:
ws = websocket.WebSocket()
ws.connect(self.uri, timeout=5)
with self._lock:
self._ws = ws
logging.info("WebSocket connected to %s", self.uri)
except Exception as e:
logging.warning("WebSocket connect failed (%s): %s", self.uri, e)
self._ws = None
def _ensure_connected(self) -> bool:
if self._ws is not None:
return True
self._connect()
return self._ws is not None
def publish(self, cmd: Command, raw_text: str) -> None:
payload = json.dumps({"command": cmd.value, "raw": raw_text, "ts": time.time()})
# rosbridge advertise + publish message
advertise_msg = json.dumps({
"op": "advertise",
"topic": VOICE_CMD_TOPIC,
"type": "std_msgs/String",
})
publish_msg = json.dumps({
"op": "publish",
"topic": VOICE_CMD_TOPIC,
"msg": {"data": payload},
})
with self._lock:
if not self._ensure_connected():
logging.error("Cannot publish — WebSocket not connected.")
return
try:
self._ws.send(advertise_msg)
self._ws.send(publish_msg)
logging.info("WS published: %s", payload)
except Exception as e:
logging.warning("WebSocket send failed: %s", e)
self._ws = None
def shutdown(self) -> None:
with self._lock:
if self._ws:
try:
self._ws.close()
except Exception:
pass
self._ws = None
# ── Main listener loop ────────────────────────────────────────────────────────
@dataclass
class VoiceCommanderConfig:
host: str = "192.168.1.100"
port: int = 9090
model: str = "base"
wake_threshold: float = 0.6
record_sec: float = CMD_RECORD_SEC
no_tts: bool = False
debug: bool = False
class VoiceCommander:
"""
Main voice command loop.
State machine:
IDLE record short clip check for wake word
WAKE_DETECTED record command clip transcribe parse publish confirm
"""
def __init__(self, config: VoiceCommanderConfig):
self.cfg = config
self._running = False
# STT
self._stt = WhisperSTT(model_size=config.model)
# Publisher
if ROS2_AVAILABLE:
logging.info("Using ROS2 publisher backend.")
self._pub: ROS2Publisher | WebSocketPublisher = ROS2Publisher()
elif WS_AVAILABLE:
logging.info("Using WebSocket publisher backend (%s:%d).", config.host, config.port)
self._pub = WebSocketPublisher(config.host, config.port)
else:
raise RuntimeError(
"No publisher backend available. "
"Install rclpy (ROS2) or websocket-client: pip install websocket-client"
)
# ── lifecycle ──────────────────────────────────────────────────────────────
def start(self) -> None:
self._running = True
logging.info("Voice commander started. Listening for '%s'...", WAKE_WORD)
if not self.cfg.no_tts:
tts_speak("Hey Salty is ready")
try:
self._listen_loop()
except KeyboardInterrupt:
logging.info("Interrupted.")
finally:
self.stop()
def stop(self) -> None:
self._running = False
self._pub.shutdown()
logging.info("Voice commander stopped.")
# ── main loop ─────────────────────────────────────────────────────────────
def _listen_loop(self) -> None:
"""Continuously poll for wake word then capture command."""
with tempfile.TemporaryDirectory() as tmpdir:
wake_audio = os.path.join(tmpdir, "wake.aac")
cmd_audio = os.path.join(tmpdir, "cmd.aac")
while self._running:
# 1. Record short clip for wake word detection
logging.debug("Recording %.1fs for wake word...", WAKE_RECORD_SEC)
if not record_audio(WAKE_RECORD_SEC, wake_audio):
logging.debug("Wake clip recording failed, retrying.")
time.sleep(0.5)
continue
wake_text = self._stt.transcribe(wake_audio)
if not wake_text:
continue
if not contains_wake_word(wake_text, self.cfg.wake_threshold):
logging.debug("No wake word in: '%s'", wake_text)
continue
# 2. Wake word detected — acknowledge and record command
logging.info("Wake word detected! Recording command...")
if not self.cfg.no_tts:
tts_speak("Yes?")
if not record_audio(self.cfg.record_sec, cmd_audio):
logging.warning("Command clip recording failed.")
continue
cmd_text = self._stt.transcribe(cmd_audio)
if not cmd_text:
logging.info("No speech detected after wake word.")
continue
# 3. Parse and dispatch
cmd = parse_command(cmd_text)
logging.info("Parsed command: %s (from '%s')", cmd.value, cmd_text)
self._pub.publish(cmd, cmd_text)
# 4. TTS confirmation
if not self.cfg.no_tts:
tts_speak(TTS_CONFIRMATIONS[cmd])
# ── Entry point ───────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(
description="SaltyBot phone voice command interface (Issue #553)"
)
parser.add_argument("--host", default="192.168.1.100",
help="Jetson IP/hostname (default: 192.168.1.100)")
parser.add_argument("--port", type=int, default=9090,
help="rosbridge WebSocket port (default: 9090)")
parser.add_argument("--model", default="base",
choices=["tiny", "base", "small"],
help="Whisper model size (default: base)")
parser.add_argument("--threshold", type=float, default=0.6,
help="Wake word match threshold 0.0-1.0 (default: 0.6)")
parser.add_argument("--record-sec", type=float, default=CMD_RECORD_SEC,
help=f"Seconds to record command (default: {CMD_RECORD_SEC})")
parser.add_argument("--no-tts", action="store_true",
help="Disable TTS confirmation")
parser.add_argument("--debug", action="store_true",
help="Verbose logging")
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.debug else logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
if not WHISPER_AVAILABLE:
logging.error("openai-whisper not installed. Run: pip install openai-whisper")
sys.exit(1)
cfg = VoiceCommanderConfig(
host=args.host,
port=args.port,
model=args.model,
wake_threshold=args.threshold,
record_sec=args.record_sec,
no_tts=args.no_tts,
debug=args.debug,
)
VoiceCommander(cfg).start()
if __name__ == "__main__":
main()