diff --git a/phone/sensor_dashboard.py b/phone/sensor_dashboard.py new file mode 100644 index 0000000..c01bfa4 --- /dev/null +++ b/phone/sensor_dashboard.py @@ -0,0 +1,404 @@ +#!/usr/bin/env python3 +""" +sensor_dashboard.py — Termux phone sensor publisher for SaltyBot (Issue #574) + +Reads phone IMU, GPS, and battery via termux-api and publishes JSON payloads +to the SaltyBot MQTT broker. Designed to run on Android/Termux as a +persistent background service. + +Topics +────── + saltybot/phone/imu — accelerometer + gyroscope @ 5 Hz + saltybot/phone/gps — lat/lon/alt/accuracy @ 1 Hz + saltybot/phone/battery — percentage/charging/temp @ 1 Hz + +JSON payloads +───────────── + IMU: + {"ts": 1710000000.000, + "accel": {"x": 0.12, "y": -0.05, "z": 9.81}, + "gyro": {"x": 0.01, "y": -0.00, "z": 0.00}} + + GPS: + {"ts": 1710000000.000, + "lat": 45.123, "lon": -73.456, "alt_m": 12.3, + "accuracy_m": 3.5, "speed_ms": 0.0, "bearing_deg": 0.0, + "provider": "gps"} + + Battery: + {"ts": 1710000000.000, + "pct": 87, "charging": true, "temp_c": 28.5, + "health": "GOOD", "plugged": "AC"} + +Usage +───── + python3 phone/sensor_dashboard.py [OPTIONS] + + --broker HOST MQTT broker IP/hostname (default: 192.168.1.100) + --port PORT MQTT broker port (default: 1883) + --imu-hz FLOAT IMU publish rate Hz (default: 5.0) + --gps-hz FLOAT GPS publish rate Hz (default: 1.0) + --bat-hz FLOAT Battery publish rate Hz (default: 1.0) + --qos INT MQTT QoS level 0/1/2 (default: 0) + --no-imu Disable IMU publishing + --no-gps Disable GPS publishing + --no-battery Disable battery publishing + --debug Verbose logging + +Dependencies (install in Termux) +───────────────────────────────── + pkg install termux-api python + pip install paho-mqtt +""" + +import argparse +import json +import logging +import subprocess +import sys +import threading +import time +from typing import Optional + +try: + import paho.mqtt.client as mqtt + MQTT_AVAILABLE = True +except ImportError: + MQTT_AVAILABLE = False + +# ── MQTT topic roots ────────────────────────────────────────────────────────── + +TOPIC_IMU = "saltybot/phone/imu" +TOPIC_GPS = "saltybot/phone/gps" +TOPIC_BATTERY = "saltybot/phone/battery" + +# ── termux-api wrappers ─────────────────────────────────────────────────────── + +def _run_termux(cmd: list[str], timeout: float = 5.0) -> Optional[dict]: + """Run a termux-api command and parse its JSON output. Returns None on error.""" + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=timeout, + ) + if result.returncode != 0 or not result.stdout.strip(): + logging.debug("termux cmd %s returned %d: %s", + cmd[0], result.returncode, result.stderr.strip()) + return None + return json.loads(result.stdout) + except (subprocess.TimeoutExpired, json.JSONDecodeError, FileNotFoundError) as e: + logging.debug("termux cmd %s error: %s", cmd[0], e) + return None + + +def read_sensor_once(sensor_name: str) -> Optional[dict]: + """ + Call termux-sensor for a single reading of @sensor_name. + + termux-sensor -s -n 1 returns one sample then exits. + """ + raw = _run_termux(["termux-sensor", "-s", sensor_name, "-n", "1"], timeout=4.0) + if raw is None: + return None + # Response shape: {"": {"values": [x, y, z], ...}} + # Key may be the full sensor description string; iterate to find it. + for key, val in raw.items(): + if isinstance(val, dict) and "values" in val: + return val + return None + + +def read_imu() -> Optional[dict]: + """ + Read accelerometer and gyroscope in a single call each. + Returns dict with 'accel' and 'gyro' sub-dicts, or None if both fail. + """ + accel_data = read_sensor_once("accelerometer") + gyro_data = read_sensor_once("gyroscope") + + accel = None + if accel_data and isinstance(accel_data.get("values"), list): + v = accel_data["values"] + if len(v) >= 3: + accel = {"x": float(v[0]), "y": float(v[1]), "z": float(v[2])} + + gyro = None + if gyro_data and isinstance(gyro_data.get("values"), list): + v = gyro_data["values"] + if len(v) >= 3: + gyro = {"x": float(v[0]), "y": float(v[1]), "z": float(v[2])} + + if accel is None and gyro is None: + return None + + return { + "ts": time.time(), + "accel": accel or {"x": 0.0, "y": 0.0, "z": 0.0}, + "gyro": gyro or {"x": 0.0, "y": 0.0, "z": 0.0}, + } + + +def read_gps() -> Optional[dict]: + """Read GPS fix via termux-location.""" + raw = _run_termux(["termux-location", "-p", "gps", "-r", "once"], timeout=10.0) + if raw is None: + # Fallback: try network provider if GPS cold + raw = _run_termux( + ["termux-location", "-p", "network", "-r", "once"], timeout=6.0 + ) + if raw is None: + return None + + return { + "ts": time.time(), + "lat": float(raw.get("latitude", 0.0)), + "lon": float(raw.get("longitude", 0.0)), + "alt_m": float(raw.get("altitude", 0.0)), + "accuracy_m": float(raw.get("accuracy", -1.0)), + "speed_ms": float(raw.get("speed", 0.0)), + "bearing_deg": float(raw.get("bearing", 0.0)), + "provider": str(raw.get("provider", "unknown")), + } + + +def read_battery() -> Optional[dict]: + """Read battery status via termux-battery-status.""" + raw = _run_termux(["termux-battery-status"], timeout=4.0) + if raw is None: + return None + + return { + "ts": time.time(), + "pct": int(raw.get("percentage", -1)), + "charging": raw.get("status", "DISCHARGING") not in ("DISCHARGING", "UNKNOWN"), + "temp_c": float(raw.get("temperature", 0.0)), + "health": str(raw.get("health", "UNKNOWN")), + "plugged": str(raw.get("plugged", "UNPLUGGED")), + } + + +# ── MQTT client with auto-reconnect ─────────────────────────────────────────── + +class MQTTPublisher: + """ + Thin paho-mqtt wrapper with: + - Automatic reconnect on disconnect (exponential back-off, max 60 s) + - Thread-safe publish() — queues messages if offline + - Connection status accessible via .connected property + """ + + _RECONNECT_BASE = 2.0 # seconds + _RECONNECT_MAX = 60.0 + + def __init__(self, broker: str, port: int, qos: int = 0): + self._broker = broker + self._port = port + self._qos = qos + self._lock = threading.Lock() + self._connected = False + self._reconnect_delay = self._RECONNECT_BASE + + self._client = mqtt.Client(client_id="saltybot-phone-sensor", clean_session=True) + self._client.on_connect = self._on_connect + self._client.on_disconnect = self._on_disconnect + self._client.reconnect_delay_set( + min_delay=int(self._RECONNECT_BASE), + max_delay=int(self._RECONNECT_MAX), + ) + + self._connect() + + def _connect(self) -> None: + try: + self._client.connect_async(self._broker, self._port, keepalive=60) + self._client.loop_start() + logging.info("MQTT connecting to %s:%d ...", self._broker, self._port) + except Exception as e: + logging.warning("MQTT initial connect error: %s", e) + + def _on_connect(self, client, userdata, flags, rc) -> None: + if rc == 0: + with self._lock: + self._connected = True + self._reconnect_delay = self._RECONNECT_BASE + logging.info("MQTT connected to %s:%d", self._broker, self._port) + else: + logging.warning("MQTT connect failed rc=%d", rc) + + def _on_disconnect(self, client, userdata, rc) -> None: + with self._lock: + self._connected = False + if rc != 0: + logging.warning("MQTT unexpected disconnect (rc=%d) — paho will retry", rc) + + @property + def connected(self) -> bool: + with self._lock: + return self._connected + + def publish(self, topic: str, payload: dict) -> bool: + """Serialize @payload to JSON and publish to @topic. Returns True on success.""" + if not self.connected: + logging.debug("MQTT offline — dropping %s", topic) + return False + try: + msg = json.dumps(payload, separators=(",", ":")) + info = self._client.publish(topic, msg, qos=self._qos, retain=False) + return info.rc == mqtt.MQTT_ERR_SUCCESS + except Exception as e: + logging.warning("MQTT publish error on %s: %s", topic, e) + return False + + def shutdown(self) -> None: + self._client.loop_stop() + self._client.disconnect() + logging.info("MQTT disconnected.") + + +# ── Sensor polling threads ──────────────────────────────────────────────────── + +class SensorPoller(threading.Thread): + """ + Polls a sensor function at a fixed rate and publishes to MQTT. + Runs as a daemon thread so it exits cleanly when the main thread stops. + """ + + def __init__(self, name: str, read_fn, topic: str, + hz: float, publisher: MQTTPublisher): + super().__init__(name=name, daemon=True) + self._read_fn = read_fn + self._topic = topic + self._interval = 1.0 / hz + self._pub = publisher + self._running = False + + # Stats + self.published = 0 + self.errors = 0 + + def run(self) -> None: + self._running = True + logging.info("Poller '%s' started at %.1f Hz → %s", + self.name, 1.0 / self._interval, self._topic) + while self._running: + t0 = time.monotonic() + try: + data = self._read_fn() + if data is not None: + if self._pub.publish(self._topic, data): + self.published += 1 + logging.debug("%s: published %s", self.name, + list(data.keys())) + else: + self.errors += 1 + else: + self.errors += 1 + logging.debug("%s: read returned None", self.name) + except Exception as e: + self.errors += 1 + logging.warning("%s: read error: %s", self.name, e) + + elapsed = time.monotonic() - t0 + sleep_s = max(0.0, self._interval - elapsed) + time.sleep(sleep_s) + + def stop(self) -> None: + self._running = False + + +# ── Status logger ───────────────────────────────────────────────────────────── + +def _status_logger(pollers: list[SensorPoller], publisher: MQTTPublisher, + stop_event: threading.Event) -> None: + """Log per-poller stats every 30 s.""" + while not stop_event.wait(30.0): + parts = [f"MQTT={'OK' if publisher.connected else 'DOWN'}"] + for p in pollers: + parts.append(f"{p.name}={p.published}ok/{p.errors}err") + logging.info("Status — %s", " ".join(parts)) + + +# ── Entry point ─────────────────────────────────────────────────────────────── + +def main() -> None: + parser = argparse.ArgumentParser( + description="SaltyBot Termux sensor dashboard (Issue #574)" + ) + parser.add_argument("--broker", default="192.168.1.100", + help="MQTT broker IP/hostname (default: 192.168.1.100)") + parser.add_argument("--port", type=int, default=1883, + help="MQTT broker port (default: 1883)") + parser.add_argument("--imu-hz", type=float, default=5.0, + help="IMU publish rate Hz (default: 5.0)") + parser.add_argument("--gps-hz", type=float, default=1.0, + help="GPS publish rate Hz (default: 1.0)") + parser.add_argument("--bat-hz", type=float, default=1.0, + help="Battery publish rate Hz (default: 1.0)") + parser.add_argument("--qos", type=int, default=0, choices=[0, 1, 2], + help="MQTT QoS level (default: 0)") + parser.add_argument("--no-imu", action="store_true", help="Disable IMU") + parser.add_argument("--no-gps", action="store_true", help="Disable GPS") + parser.add_argument("--no-battery", action="store_true", help="Disable battery") + parser.add_argument("--debug", action="store_true", help="Verbose logging") + args = parser.parse_args() + + logging.basicConfig( + level=logging.DEBUG if args.debug else logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + ) + + if not MQTT_AVAILABLE: + logging.error("paho-mqtt not installed. Run: pip install paho-mqtt") + sys.exit(1) + + if args.no_imu and args.no_gps and args.no_battery: + logging.error("All sensors disabled — nothing to do.") + sys.exit(1) + + # Connect MQTT + publisher = MQTTPublisher(args.broker, args.port, qos=args.qos) + + # Wait briefly for initial connection before starting pollers + deadline = time.monotonic() + 10.0 + while not publisher.connected and time.monotonic() < deadline: + time.sleep(0.2) + if not publisher.connected: + logging.warning("MQTT not yet connected — pollers will start anyway and retry.") + + # Build pollers for enabled sensors + pollers: list[SensorPoller] = [] + if not args.no_imu: + pollers.append(SensorPoller("imu", read_imu, TOPIC_IMU, args.imu_hz, publisher)) + if not args.no_gps: + pollers.append(SensorPoller("gps", read_gps, TOPIC_GPS, args.gps_hz, publisher)) + if not args.no_battery: + pollers.append(SensorPoller("battery", read_battery, TOPIC_BATTERY, args.bat_hz, publisher)) + + for p in pollers: + p.start() + + # Status log thread + stop_evt = threading.Event() + status_thread = threading.Thread( + target=_status_logger, args=(pollers, publisher, stop_evt), daemon=True + ) + status_thread.start() + + logging.info("Sensor dashboard running — Ctrl-C to stop.") + try: + while True: + time.sleep(1.0) + except KeyboardInterrupt: + logging.info("Shutting down...") + finally: + stop_evt.set() + for p in pollers: + p.stop() + publisher.shutdown() + logging.info("Done.") + + +if __name__ == "__main__": + main()