#!/usr/bin/env python3 """ voice_cmd.py — Termux voice command interface for SaltyBot (Issue #633) Listens for voice commands via termux-speech-to-text (Android built-in STT), parses them into structured JSON payloads, and publishes to MQTT. Provides TTS confirmation via termux-tts-speak. Falls back to manual text input when STT is unavailable. MQTT Topic ────────── saltybot/phone/voice_cmd — published on each recognised command JSON Payload ──────────── { "ts": 1710000000.000, # Unix timestamp "cmd": "go_forward", # command enum (see COMMANDS below) "param": null, # optional parameter (e.g. waypoint name) "raw": "go forward", # original recognised text "source": "stt" # "stt" | "text" } Commands ──────── go_forward, go_back, turn_left, turn_right, stop, estop, go_waypoint (param = waypoint name), speed_up, speed_down, status Usage ───── python3 phone/voice_cmd.py [OPTIONS] --broker HOST MQTT broker IP/hostname (default: 192.168.1.100) --port PORT MQTT broker port (default: 1883) --qos INT MQTT QoS level 0/1/2 (default: 0) --text Manual text fallback mode (no STT) --no-tts Disable TTS confirmation --timeout SECS STT listen timeout in seconds (default: 10) --debug Verbose logging Dependencies (Termux) ───────────────────── pkg install termux-api python pip install paho-mqtt """ import argparse import json import logging import subprocess import sys import threading import time from typing import Optional, Tuple try: import paho.mqtt.client as mqtt MQTT_AVAILABLE = True except ImportError: MQTT_AVAILABLE = False # ── MQTT topic ──────────────────────────────────────────────────────────────── TOPIC_VOICE_CMD = "saltybot/phone/voice_cmd" # ── Command table ───────────────────────────────────────────────────────────── # # Each entry: (command_id, [phrase_fragments, ...]) # Phrases are matched as case-insensitive substrings of the recognised text. # First match wins; order matters for disambiguation. _CMD_TABLE = [ # E-stop must be checked before plain "stop" ("estop", ["emergency stop", "e stop", "estop", "abort"]), ("stop", ["stop", "halt", "freeze", "cancel"]), ("go_forward", ["go forward", "move forward", "forward", "ahead", "advance"]), ("go_back", ["go back", "move back", "backward", "reverse", "back up"]), ("turn_left", ["turn left", "rotate left", "left"]), ("turn_right", ["turn right", "rotate right", "right"]), ("go_waypoint", ["go to waypoint", "go to", "navigate to", "waypoint"]), ("speed_up", ["speed up", "faster", "increase speed"]), ("speed_down", ["slow down", "slower", "decrease speed", "decelerate"]), ("status", ["status", "report", "where are you", "how are you"]), ] # TTS responses per command _TTS_RESPONSES = { "go_forward": "Going forward", "go_back": "Going back", "turn_left": "Turning left", "turn_right": "Turning right", "stop": "Stopping", "estop": "Emergency stop!", "go_waypoint": "Navigating to waypoint", "speed_up": "Speeding up", "speed_down": "Slowing down", "status": "Requesting status", } # ── termux-api helpers ──────────────────────────────────────────────────────── def stt_listen(timeout: float = 10.0) -> Optional[str]: """ Call termux-speech-to-text and return the recognised text string, or None. termux-speech-to-text returns JSON: {"partial": "...", "text": "final text"} or on failure returns empty / error output. """ try: result = subprocess.run( ["termux-speech-to-text"], capture_output=True, text=True, timeout=timeout + 5.0, # extra buffer for app round-trip ) stdout = result.stdout.strip() if not stdout: logging.debug("STT: empty response (rc=%d)", result.returncode) return None # termux-speech-to-text may return bare text or JSON try: data = json.loads(stdout) # Prefer "text" (final); fall back to "partial" text = data.get("text") or data.get("partial") or "" text = text.strip() except (json.JSONDecodeError, AttributeError): # Some versions return plain text directly text = stdout if not text: logging.debug("STT: no text in response: %r", stdout) return None logging.info("STT recognised: %r", text) return text except subprocess.TimeoutExpired: logging.warning("STT: timed out after %.1f s", timeout + 5.0) return None except FileNotFoundError: logging.error("STT: termux-speech-to-text not found — install termux-api") return None except Exception as e: logging.warning("STT error: %s", e) return None def tts_speak(text: str) -> None: """Speak @text via termux-tts-speak (fire-and-forget).""" try: subprocess.Popen( ["termux-tts-speak", text], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) except FileNotFoundError: logging.debug("TTS: termux-tts-speak not found") except Exception as e: logging.debug("TTS error: %s", e) # ── Command parser ──────────────────────────────────────────────────────────── def parse_command(text: str) -> Tuple[Optional[str], Optional[str]]: """ Match @text against the command table. Returns (command_id, param) where param is non-None only for go_waypoint. Returns (None, None) if no command matched. """ lower = text.lower().strip() for cmd_id, phrases in _CMD_TABLE: for phrase in phrases: if phrase in lower: param = None if cmd_id == "go_waypoint": # Extract waypoint name: text after "waypoint" / "go to" / "navigate to" for marker in ("waypoint", "navigate to", "go to"): idx = lower.find(marker) if idx != -1: remainder = text[idx + len(marker):].strip() if remainder: param = remainder break return cmd_id, param return None, None # ── MQTT publisher (same pattern as sensor_dashboard.py) ───────────────────── class MQTTPublisher: """Thin paho-mqtt wrapper with auto-reconnect.""" _RECONNECT_BASE = 2.0 _RECONNECT_MAX = 60.0 def __init__(self, broker: str, port: int, qos: int = 0): self._broker = broker self._port = port self._qos = qos self._lock = threading.Lock() self._connected = False self._client = mqtt.Client(client_id="saltybot-voice-cmd", clean_session=True) self._client.on_connect = self._on_connect self._client.on_disconnect = self._on_disconnect self._client.reconnect_delay_set( min_delay=int(self._RECONNECT_BASE), max_delay=int(self._RECONNECT_MAX), ) self._connect() def _connect(self) -> None: try: self._client.connect_async(self._broker, self._port, keepalive=60) self._client.loop_start() logging.info("MQTT connecting to %s:%d …", self._broker, self._port) except Exception as e: logging.warning("MQTT connect error: %s", e) def _on_connect(self, client, userdata, flags, rc) -> None: if rc == 0: with self._lock: self._connected = True logging.info("MQTT connected to %s:%d", self._broker, self._port) else: logging.warning("MQTT connect failed rc=%d", rc) def _on_disconnect(self, client, userdata, rc) -> None: with self._lock: self._connected = False if rc != 0: logging.warning("MQTT disconnected (rc=%d) — paho will retry", rc) @property def connected(self) -> bool: with self._lock: return self._connected def publish(self, topic: str, payload: dict) -> bool: if not self.connected: logging.debug("MQTT offline — dropping %s", topic) return False try: msg = json.dumps(payload, separators=(",", ":")) info = self._client.publish(topic, msg, qos=self._qos, retain=False) return info.rc == mqtt.MQTT_ERR_SUCCESS except Exception as e: logging.warning("MQTT publish error: %s", e) return False def shutdown(self) -> None: self._client.loop_stop() self._client.disconnect() logging.info("MQTT disconnected.") # ── Main listen loop ────────────────────────────────────────────────────────── def _handle_text(raw: str, source: str, publisher: MQTTPublisher, no_tts: bool) -> bool: """ Parse @raw, publish command, speak confirmation. Returns True if a command was recognised and published. """ cmd, param = parse_command(raw) if cmd is None: logging.info("No command matched for: %r", raw) if not no_tts: tts_speak("Sorry, I didn't understand that") return False payload = { "ts": time.time(), "cmd": cmd, "param": param, "raw": raw, "source": source, } ok = publisher.publish(TOPIC_VOICE_CMD, payload) log_msg = "Published %s (param=%r) — MQTT %s" % (cmd, param, "OK" if ok else "FAIL") logging.info(log_msg) if not no_tts: response = _TTS_RESPONSES.get(cmd, cmd.replace("_", " ")) if param: response = f"{response}: {param}" tts_speak(response) return ok def run_stt_loop(publisher: MQTTPublisher, args: argparse.Namespace) -> None: """Continuous STT listen → parse → publish loop.""" logging.info("Voice command loop started (STT mode). Say a command.") if not args.no_tts: tts_speak("Voice commands ready") consecutive_failures = 0 while True: logging.info("Listening…") text = stt_listen(timeout=args.timeout) if text is None: consecutive_failures += 1 logging.warning("STT failed (%d consecutive)", consecutive_failures) if consecutive_failures >= 3: logging.warning("STT unavailable — switch to --text mode if needed") if not args.no_tts: tts_speak("Speech recognition unavailable") time.sleep(5.0) consecutive_failures = 0 continue consecutive_failures = 0 _handle_text(text, "stt", publisher, args.no_tts) def run_text_loop(publisher: MQTTPublisher, args: argparse.Namespace) -> None: """Manual text input loop (--text / --no-stt mode).""" logging.info("Text input mode. Type a command (Ctrl-C or 'quit' to exit).") print("\nAvailable commands: go forward, go back, turn left, turn right, " "stop, emergency stop, go to waypoint , speed up, slow down, status") print("Type 'quit' to exit.\n") while True: try: raw = input("Command> ").strip() except EOFError: break if not raw: continue if raw.lower() in ("quit", "exit", "q"): break _handle_text(raw, "text", publisher, args.no_tts) # ── Entry point ─────────────────────────────────────────────────────────────── def main() -> None: parser = argparse.ArgumentParser( description="SaltyBot Termux voice command interface (Issue #633)" ) parser.add_argument("--broker", default="192.168.1.100", help="MQTT broker IP/hostname (default: 192.168.1.100)") parser.add_argument("--port", type=int, default=1883, help="MQTT broker port (default: 1883)") parser.add_argument("--qos", type=int, default=0, choices=[0, 1, 2], help="MQTT QoS level (default: 0)") parser.add_argument("--text", action="store_true", help="Manual text fallback mode (no STT)") parser.add_argument("--no-tts", action="store_true", help="Disable TTS confirmation") parser.add_argument("--timeout", type=float, default=10.0, help="STT listen timeout in seconds (default: 10)") parser.add_argument("--debug", action="store_true", help="Verbose logging") args = parser.parse_args() logging.basicConfig( level=logging.DEBUG if args.debug else logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) if not MQTT_AVAILABLE: logging.error("paho-mqtt not installed. Run: pip install paho-mqtt") sys.exit(1) publisher = MQTTPublisher(args.broker, args.port, qos=args.qos) # Wait for initial MQTT connection deadline = time.monotonic() + 10.0 while not publisher.connected and time.monotonic() < deadline: time.sleep(0.2) if not publisher.connected: logging.warning("MQTT not connected — commands will be dropped until connected") try: if args.text: run_text_loop(publisher, args) else: run_stt_loop(publisher, args) except KeyboardInterrupt: logging.info("Shutting down…") finally: publisher.shutdown() logging.info("Done.") if __name__ == "__main__": main()