feat: Add Termux voice command interface (Issue #633)

phone/voice_cmd.py — listens via termux-speech-to-text, parses commands
(go forward/back, turn left/right, stop, e-stop, go to waypoint, speed
up/down, status) and publishes structured JSON to saltybot/phone/voice_cmd.
TTS confirmation via termux-tts-speak. Manual text fallback via --text flag.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
sl-android 2026-03-15 14:35:27 -04:00
parent 0e8758e9e1
commit 26bf4ab8d3

398
phone/voice_cmd.py Normal file
View File

@ -0,0 +1,398 @@
#!/usr/bin/env python3
"""
voice_cmd.py Termux voice command interface for SaltyBot (Issue #633)
Listens for voice commands via termux-speech-to-text (Android built-in STT),
parses them into structured JSON payloads, and publishes to MQTT.
Provides TTS confirmation via termux-tts-speak.
Falls back to manual text input when STT is unavailable.
MQTT Topic
saltybot/phone/voice_cmd published on each recognised command
JSON Payload
{
"ts": 1710000000.000, # Unix timestamp
"cmd": "go_forward", # command enum (see COMMANDS below)
"param": null, # optional parameter (e.g. waypoint name)
"raw": "go forward", # original recognised text
"source": "stt" # "stt" | "text"
}
Commands
go_forward, go_back, turn_left, turn_right, stop, estop,
go_waypoint (param = waypoint name), speed_up, speed_down, status
Usage
python3 phone/voice_cmd.py [OPTIONS]
--broker HOST MQTT broker IP/hostname (default: 192.168.1.100)
--port PORT MQTT broker port (default: 1883)
--qos INT MQTT QoS level 0/1/2 (default: 0)
--text Manual text fallback mode (no STT)
--no-tts Disable TTS confirmation
--timeout SECS STT listen timeout in seconds (default: 10)
--debug Verbose logging
Dependencies (Termux)
pkg install termux-api python
pip install paho-mqtt
"""
import argparse
import json
import logging
import subprocess
import sys
import threading
import time
from typing import Optional, Tuple
try:
import paho.mqtt.client as mqtt
MQTT_AVAILABLE = True
except ImportError:
MQTT_AVAILABLE = False
# ── MQTT topic ────────────────────────────────────────────────────────────────
TOPIC_VOICE_CMD = "saltybot/phone/voice_cmd"
# ── Command table ─────────────────────────────────────────────────────────────
#
# Each entry: (command_id, [phrase_fragments, ...])
# Phrases are matched as case-insensitive substrings of the recognised text.
# First match wins; order matters for disambiguation.
_CMD_TABLE = [
# E-stop must be checked before plain "stop"
("estop", ["emergency stop", "e stop", "estop", "abort"]),
("stop", ["stop", "halt", "freeze", "cancel"]),
("go_forward", ["go forward", "move forward", "forward", "ahead", "advance"]),
("go_back", ["go back", "move back", "backward", "reverse", "back up"]),
("turn_left", ["turn left", "rotate left", "left"]),
("turn_right", ["turn right", "rotate right", "right"]),
("go_waypoint", ["go to waypoint", "go to", "navigate to", "waypoint"]),
("speed_up", ["speed up", "faster", "increase speed"]),
("speed_down", ["slow down", "slower", "decrease speed", "decelerate"]),
("status", ["status", "report", "where are you", "how are you"]),
]
# TTS responses per command
_TTS_RESPONSES = {
"go_forward": "Going forward",
"go_back": "Going back",
"turn_left": "Turning left",
"turn_right": "Turning right",
"stop": "Stopping",
"estop": "Emergency stop!",
"go_waypoint": "Navigating to waypoint",
"speed_up": "Speeding up",
"speed_down": "Slowing down",
"status": "Requesting status",
}
# ── termux-api helpers ────────────────────────────────────────────────────────
def stt_listen(timeout: float = 10.0) -> Optional[str]:
"""
Call termux-speech-to-text and return the recognised text string, or None.
termux-speech-to-text returns JSON:
{"partial": "...", "text": "final text"}
or on failure returns empty / error output.
"""
try:
result = subprocess.run(
["termux-speech-to-text"],
capture_output=True,
text=True,
timeout=timeout + 5.0, # extra buffer for app round-trip
)
stdout = result.stdout.strip()
if not stdout:
logging.debug("STT: empty response (rc=%d)", result.returncode)
return None
# termux-speech-to-text may return bare text or JSON
try:
data = json.loads(stdout)
# Prefer "text" (final); fall back to "partial"
text = data.get("text") or data.get("partial") or ""
text = text.strip()
except (json.JSONDecodeError, AttributeError):
# Some versions return plain text directly
text = stdout
if not text:
logging.debug("STT: no text in response: %r", stdout)
return None
logging.info("STT recognised: %r", text)
return text
except subprocess.TimeoutExpired:
logging.warning("STT: timed out after %.1f s", timeout + 5.0)
return None
except FileNotFoundError:
logging.error("STT: termux-speech-to-text not found — install termux-api")
return None
except Exception as e:
logging.warning("STT error: %s", e)
return None
def tts_speak(text: str) -> None:
"""Speak @text via termux-tts-speak (fire-and-forget)."""
try:
subprocess.Popen(
["termux-tts-speak", text],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
except FileNotFoundError:
logging.debug("TTS: termux-tts-speak not found")
except Exception as e:
logging.debug("TTS error: %s", e)
# ── Command parser ────────────────────────────────────────────────────────────
def parse_command(text: str) -> Tuple[Optional[str], Optional[str]]:
"""
Match @text against the command table.
Returns (command_id, param) where param is non-None only for go_waypoint.
Returns (None, None) if no command matched.
"""
lower = text.lower().strip()
for cmd_id, phrases in _CMD_TABLE:
for phrase in phrases:
if phrase in lower:
param = None
if cmd_id == "go_waypoint":
# Extract waypoint name: text after "waypoint" / "go to" / "navigate to"
for marker in ("waypoint", "navigate to", "go to"):
idx = lower.find(marker)
if idx != -1:
remainder = text[idx + len(marker):].strip()
if remainder:
param = remainder
break
return cmd_id, param
return None, None
# ── MQTT publisher (same pattern as sensor_dashboard.py) ─────────────────────
class MQTTPublisher:
"""Thin paho-mqtt wrapper with auto-reconnect."""
_RECONNECT_BASE = 2.0
_RECONNECT_MAX = 60.0
def __init__(self, broker: str, port: int, qos: int = 0):
self._broker = broker
self._port = port
self._qos = qos
self._lock = threading.Lock()
self._connected = False
self._client = mqtt.Client(client_id="saltybot-voice-cmd", clean_session=True)
self._client.on_connect = self._on_connect
self._client.on_disconnect = self._on_disconnect
self._client.reconnect_delay_set(
min_delay=int(self._RECONNECT_BASE),
max_delay=int(self._RECONNECT_MAX),
)
self._connect()
def _connect(self) -> None:
try:
self._client.connect_async(self._broker, self._port, keepalive=60)
self._client.loop_start()
logging.info("MQTT connecting to %s:%d", self._broker, self._port)
except Exception as e:
logging.warning("MQTT connect error: %s", e)
def _on_connect(self, client, userdata, flags, rc) -> None:
if rc == 0:
with self._lock:
self._connected = True
logging.info("MQTT connected to %s:%d", self._broker, self._port)
else:
logging.warning("MQTT connect failed rc=%d", rc)
def _on_disconnect(self, client, userdata, rc) -> None:
with self._lock:
self._connected = False
if rc != 0:
logging.warning("MQTT disconnected (rc=%d) — paho will retry", rc)
@property
def connected(self) -> bool:
with self._lock:
return self._connected
def publish(self, topic: str, payload: dict) -> bool:
if not self.connected:
logging.debug("MQTT offline — dropping %s", topic)
return False
try:
msg = json.dumps(payload, separators=(",", ":"))
info = self._client.publish(topic, msg, qos=self._qos, retain=False)
return info.rc == mqtt.MQTT_ERR_SUCCESS
except Exception as e:
logging.warning("MQTT publish error: %s", e)
return False
def shutdown(self) -> None:
self._client.loop_stop()
self._client.disconnect()
logging.info("MQTT disconnected.")
# ── Main listen loop ──────────────────────────────────────────────────────────
def _handle_text(raw: str, source: str, publisher: MQTTPublisher,
no_tts: bool) -> bool:
"""
Parse @raw, publish command, speak confirmation.
Returns True if a command was recognised and published.
"""
cmd, param = parse_command(raw)
if cmd is None:
logging.info("No command matched for: %r", raw)
if not no_tts:
tts_speak("Sorry, I didn't understand that")
return False
payload = {
"ts": time.time(),
"cmd": cmd,
"param": param,
"raw": raw,
"source": source,
}
ok = publisher.publish(TOPIC_VOICE_CMD, payload)
log_msg = "Published %s (param=%r) — MQTT %s" % (cmd, param, "OK" if ok else "FAIL")
logging.info(log_msg)
if not no_tts:
response = _TTS_RESPONSES.get(cmd, cmd.replace("_", " "))
if param:
response = f"{response}: {param}"
tts_speak(response)
return ok
def run_stt_loop(publisher: MQTTPublisher, args: argparse.Namespace) -> None:
"""Continuous STT listen → parse → publish loop."""
logging.info("Voice command loop started (STT mode). Say a command.")
if not args.no_tts:
tts_speak("Voice commands ready")
consecutive_failures = 0
while True:
logging.info("Listening…")
text = stt_listen(timeout=args.timeout)
if text is None:
consecutive_failures += 1
logging.warning("STT failed (%d consecutive)", consecutive_failures)
if consecutive_failures >= 3:
logging.warning("STT unavailable — switch to --text mode if needed")
if not args.no_tts:
tts_speak("Speech recognition unavailable")
time.sleep(5.0)
consecutive_failures = 0
continue
consecutive_failures = 0
_handle_text(text, "stt", publisher, args.no_tts)
def run_text_loop(publisher: MQTTPublisher, args: argparse.Namespace) -> None:
"""Manual text input loop (--text / --no-stt mode)."""
logging.info("Text input mode. Type a command (Ctrl-C or 'quit' to exit).")
print("\nAvailable commands: go forward, go back, turn left, turn right, "
"stop, emergency stop, go to waypoint <name>, speed up, slow down, status")
print("Type 'quit' to exit.\n")
while True:
try:
raw = input("Command> ").strip()
except EOFError:
break
if not raw:
continue
if raw.lower() in ("quit", "exit", "q"):
break
_handle_text(raw, "text", publisher, args.no_tts)
# ── Entry point ───────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(
description="SaltyBot Termux voice command interface (Issue #633)"
)
parser.add_argument("--broker", default="192.168.1.100",
help="MQTT broker IP/hostname (default: 192.168.1.100)")
parser.add_argument("--port", type=int, default=1883,
help="MQTT broker port (default: 1883)")
parser.add_argument("--qos", type=int, default=0, choices=[0, 1, 2],
help="MQTT QoS level (default: 0)")
parser.add_argument("--text", action="store_true",
help="Manual text fallback mode (no STT)")
parser.add_argument("--no-tts", action="store_true",
help="Disable TTS confirmation")
parser.add_argument("--timeout", type=float, default=10.0,
help="STT listen timeout in seconds (default: 10)")
parser.add_argument("--debug", action="store_true",
help="Verbose logging")
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.debug else logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
if not MQTT_AVAILABLE:
logging.error("paho-mqtt not installed. Run: pip install paho-mqtt")
sys.exit(1)
publisher = MQTTPublisher(args.broker, args.port, qos=args.qos)
# Wait for initial MQTT connection
deadline = time.monotonic() + 10.0
while not publisher.connected and time.monotonic() < deadline:
time.sleep(0.2)
if not publisher.connected:
logging.warning("MQTT not connected — commands will be dropped until connected")
try:
if args.text:
run_text_loop(publisher, args)
else:
run_stt_loop(publisher, args)
except KeyboardInterrupt:
logging.info("Shutting down…")
finally:
publisher.shutdown()
logging.info("Done.")
if __name__ == "__main__":
main()