#!/usr/bin/env python3 """ sensor_dashboard.py — Termux phone sensor publisher for SaltyBot (Issue #574) Reads phone IMU, GPS, and battery via termux-api and publishes JSON payloads to the SaltyBot MQTT broker. Designed to run on Android/Termux as a persistent background service. Topics ────── saltybot/phone/imu — accelerometer + gyroscope @ 5 Hz saltybot/phone/gps — lat/lon/alt/accuracy @ 1 Hz saltybot/phone/battery — percentage/charging/temp @ 1 Hz JSON payloads ───────────── IMU: {"ts": 1710000000.000, "accel": {"x": 0.12, "y": -0.05, "z": 9.81}, "gyro": {"x": 0.01, "y": -0.00, "z": 0.00}} GPS: {"ts": 1710000000.000, "lat": 45.123, "lon": -73.456, "alt_m": 12.3, "accuracy_m": 3.5, "speed_ms": 0.0, "bearing_deg": 0.0, "provider": "gps"} Battery: {"ts": 1710000000.000, "pct": 87, "charging": true, "temp_c": 28.5, "health": "GOOD", "plugged": "AC"} Usage ───── python3 phone/sensor_dashboard.py [OPTIONS] --broker HOST MQTT broker IP/hostname (default: 192.168.1.100) --port PORT MQTT broker port (default: 1883) --imu-hz FLOAT IMU publish rate Hz (default: 5.0) --gps-hz FLOAT GPS publish rate Hz (default: 1.0) --bat-hz FLOAT Battery publish rate Hz (default: 1.0) --qos INT MQTT QoS level 0/1/2 (default: 0) --no-imu Disable IMU publishing --no-gps Disable GPS publishing --no-battery Disable battery publishing --debug Verbose logging Dependencies (install in Termux) ───────────────────────────────── pkg install termux-api python pip install paho-mqtt """ import argparse import json import logging import subprocess import sys import threading import time from typing import Optional try: import paho.mqtt.client as mqtt MQTT_AVAILABLE = True except ImportError: MQTT_AVAILABLE = False # ── MQTT topic roots ────────────────────────────────────────────────────────── TOPIC_IMU = "saltybot/phone/imu" TOPIC_GPS = "saltybot/phone/gps" TOPIC_BATTERY = "saltybot/phone/battery" # ── termux-api wrappers ─────────────────────────────────────────────────────── def _run_termux(cmd: list[str], timeout: float = 5.0) -> Optional[dict]: """Run a termux-api command and parse its JSON output. Returns None on error.""" try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout, ) if result.returncode != 0 or not result.stdout.strip(): logging.debug("termux cmd %s returned %d: %s", cmd[0], result.returncode, result.stderr.strip()) return None return json.loads(result.stdout) except (subprocess.TimeoutExpired, json.JSONDecodeError, FileNotFoundError) as e: logging.debug("termux cmd %s error: %s", cmd[0], e) return None def read_sensor_once(sensor_name: str) -> Optional[dict]: """ Call termux-sensor for a single reading of @sensor_name. termux-sensor -s -n 1 returns one sample then exits. """ raw = _run_termux(["termux-sensor", "-s", sensor_name, "-n", "1"], timeout=4.0) if raw is None: return None # Response shape: {"": {"values": [x, y, z], ...}} # Key may be the full sensor description string; iterate to find it. for key, val in raw.items(): if isinstance(val, dict) and "values" in val: return val return None def read_imu() -> Optional[dict]: """ Read accelerometer and gyroscope in a single call each. Returns dict with 'accel' and 'gyro' sub-dicts, or None if both fail. """ accel_data = read_sensor_once("accelerometer") gyro_data = read_sensor_once("gyroscope") accel = None if accel_data and isinstance(accel_data.get("values"), list): v = accel_data["values"] if len(v) >= 3: accel = {"x": float(v[0]), "y": float(v[1]), "z": float(v[2])} gyro = None if gyro_data and isinstance(gyro_data.get("values"), list): v = gyro_data["values"] if len(v) >= 3: gyro = {"x": float(v[0]), "y": float(v[1]), "z": float(v[2])} if accel is None and gyro is None: return None return { "ts": time.time(), "accel": accel or {"x": 0.0, "y": 0.0, "z": 0.0}, "gyro": gyro or {"x": 0.0, "y": 0.0, "z": 0.0}, } def read_gps() -> Optional[dict]: """Read GPS fix via termux-location.""" raw = _run_termux(["termux-location", "-p", "gps", "-r", "once"], timeout=10.0) if raw is None: # Fallback: try network provider if GPS cold raw = _run_termux( ["termux-location", "-p", "network", "-r", "once"], timeout=6.0 ) if raw is None: return None return { "ts": time.time(), "lat": float(raw.get("latitude", 0.0)), "lon": float(raw.get("longitude", 0.0)), "alt_m": float(raw.get("altitude", 0.0)), "accuracy_m": float(raw.get("accuracy", -1.0)), "speed_ms": float(raw.get("speed", 0.0)), "bearing_deg": float(raw.get("bearing", 0.0)), "provider": str(raw.get("provider", "unknown")), } def read_battery() -> Optional[dict]: """Read battery status via termux-battery-status.""" raw = _run_termux(["termux-battery-status"], timeout=4.0) if raw is None: return None return { "ts": time.time(), "pct": int(raw.get("percentage", -1)), "charging": raw.get("status", "DISCHARGING") not in ("DISCHARGING", "UNKNOWN"), "temp_c": float(raw.get("temperature", 0.0)), "health": str(raw.get("health", "UNKNOWN")), "plugged": str(raw.get("plugged", "UNPLUGGED")), } # ── MQTT client with auto-reconnect ─────────────────────────────────────────── class MQTTPublisher: """ Thin paho-mqtt wrapper with: - Automatic reconnect on disconnect (exponential back-off, max 60 s) - Thread-safe publish() — queues messages if offline - Connection status accessible via .connected property """ _RECONNECT_BASE = 2.0 # seconds _RECONNECT_MAX = 60.0 def __init__(self, broker: str, port: int, qos: int = 0): self._broker = broker self._port = port self._qos = qos self._lock = threading.Lock() self._connected = False self._reconnect_delay = self._RECONNECT_BASE self._client = mqtt.Client(client_id="saltybot-phone-sensor", clean_session=True) self._client.on_connect = self._on_connect self._client.on_disconnect = self._on_disconnect self._client.reconnect_delay_set( min_delay=int(self._RECONNECT_BASE), max_delay=int(self._RECONNECT_MAX), ) self._connect() def _connect(self) -> None: try: self._client.connect_async(self._broker, self._port, keepalive=60) self._client.loop_start() logging.info("MQTT connecting to %s:%d ...", self._broker, self._port) except Exception as e: logging.warning("MQTT initial connect error: %s", e) def _on_connect(self, client, userdata, flags, rc) -> None: if rc == 0: with self._lock: self._connected = True self._reconnect_delay = self._RECONNECT_BASE logging.info("MQTT connected to %s:%d", self._broker, self._port) else: logging.warning("MQTT connect failed rc=%d", rc) def _on_disconnect(self, client, userdata, rc) -> None: with self._lock: self._connected = False if rc != 0: logging.warning("MQTT unexpected disconnect (rc=%d) — paho will retry", rc) @property def connected(self) -> bool: with self._lock: return self._connected def publish(self, topic: str, payload: dict) -> bool: """Serialize @payload to JSON and publish to @topic. Returns True on success.""" if not self.connected: logging.debug("MQTT offline — dropping %s", topic) return False try: msg = json.dumps(payload, separators=(",", ":")) info = self._client.publish(topic, msg, qos=self._qos, retain=False) return info.rc == mqtt.MQTT_ERR_SUCCESS except Exception as e: logging.warning("MQTT publish error on %s: %s", topic, e) return False def shutdown(self) -> None: self._client.loop_stop() self._client.disconnect() logging.info("MQTT disconnected.") # ── Sensor polling threads ──────────────────────────────────────────────────── class SensorPoller(threading.Thread): """ Polls a sensor function at a fixed rate and publishes to MQTT. Runs as a daemon thread so it exits cleanly when the main thread stops. """ def __init__(self, name: str, read_fn, topic: str, hz: float, publisher: MQTTPublisher): super().__init__(name=name, daemon=True) self._read_fn = read_fn self._topic = topic self._interval = 1.0 / hz self._pub = publisher self._running = False # Stats self.published = 0 self.errors = 0 def run(self) -> None: self._running = True logging.info("Poller '%s' started at %.1f Hz → %s", self.name, 1.0 / self._interval, self._topic) while self._running: t0 = time.monotonic() try: data = self._read_fn() if data is not None: if self._pub.publish(self._topic, data): self.published += 1 logging.debug("%s: published %s", self.name, list(data.keys())) else: self.errors += 1 else: self.errors += 1 logging.debug("%s: read returned None", self.name) except Exception as e: self.errors += 1 logging.warning("%s: read error: %s", self.name, e) elapsed = time.monotonic() - t0 sleep_s = max(0.0, self._interval - elapsed) time.sleep(sleep_s) def stop(self) -> None: self._running = False # ── Status logger ───────────────────────────────────────────────────────────── def _status_logger(pollers: list[SensorPoller], publisher: MQTTPublisher, stop_event: threading.Event) -> None: """Log per-poller stats every 30 s.""" while not stop_event.wait(30.0): parts = [f"MQTT={'OK' if publisher.connected else 'DOWN'}"] for p in pollers: parts.append(f"{p.name}={p.published}ok/{p.errors}err") logging.info("Status — %s", " ".join(parts)) # ── Entry point ─────────────────────────────────────────────────────────────── def main() -> None: parser = argparse.ArgumentParser( description="SaltyBot Termux sensor dashboard (Issue #574)" ) parser.add_argument("--broker", default="192.168.1.100", help="MQTT broker IP/hostname (default: 192.168.1.100)") parser.add_argument("--port", type=int, default=1883, help="MQTT broker port (default: 1883)") parser.add_argument("--imu-hz", type=float, default=5.0, help="IMU publish rate Hz (default: 5.0)") parser.add_argument("--gps-hz", type=float, default=1.0, help="GPS publish rate Hz (default: 1.0)") parser.add_argument("--bat-hz", type=float, default=1.0, help="Battery publish rate Hz (default: 1.0)") parser.add_argument("--qos", type=int, default=0, choices=[0, 1, 2], help="MQTT QoS level (default: 0)") parser.add_argument("--no-imu", action="store_true", help="Disable IMU") parser.add_argument("--no-gps", action="store_true", help="Disable GPS") parser.add_argument("--no-battery", action="store_true", help="Disable battery") parser.add_argument("--debug", action="store_true", help="Verbose logging") args = parser.parse_args() logging.basicConfig( level=logging.DEBUG if args.debug else logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) if not MQTT_AVAILABLE: logging.error("paho-mqtt not installed. Run: pip install paho-mqtt") sys.exit(1) if args.no_imu and args.no_gps and args.no_battery: logging.error("All sensors disabled — nothing to do.") sys.exit(1) # Connect MQTT publisher = MQTTPublisher(args.broker, args.port, qos=args.qos) # Wait briefly for initial connection before starting pollers deadline = time.monotonic() + 10.0 while not publisher.connected and time.monotonic() < deadline: time.sleep(0.2) if not publisher.connected: logging.warning("MQTT not yet connected — pollers will start anyway and retry.") # Build pollers for enabled sensors pollers: list[SensorPoller] = [] if not args.no_imu: pollers.append(SensorPoller("imu", read_imu, TOPIC_IMU, args.imu_hz, publisher)) if not args.no_gps: pollers.append(SensorPoller("gps", read_gps, TOPIC_GPS, args.gps_hz, publisher)) if not args.no_battery: pollers.append(SensorPoller("battery", read_battery, TOPIC_BATTERY, args.bat_hz, publisher)) for p in pollers: p.start() # Status log thread stop_evt = threading.Event() status_thread = threading.Thread( target=_status_logger, args=(pollers, publisher, stop_evt), daemon=True ) status_thread.start() logging.info("Sensor dashboard running — Ctrl-C to stop.") try: while True: time.sleep(1.0) except KeyboardInterrupt: logging.info("Shutting down...") finally: stop_evt.set() for p in pollers: p.stop() publisher.shutdown() logging.info("Done.") if __name__ == "__main__": main()