feat: Termux sensor dashboard (Issue #574) #578

Merged
sl-jetson merged 1 commits from sl-android/issue-574-sensor-dashboard into main 2026-03-14 12:13:52 -04:00

404
phone/sensor_dashboard.py Normal file
View File

@ -0,0 +1,404 @@
#!/usr/bin/env python3
"""
sensor_dashboard.py Termux phone sensor publisher for SaltyBot (Issue #574)
Reads phone IMU, GPS, and battery via termux-api and publishes JSON payloads
to the SaltyBot MQTT broker. Designed to run on Android/Termux as a
persistent background service.
Topics
saltybot/phone/imu accelerometer + gyroscope @ 5 Hz
saltybot/phone/gps lat/lon/alt/accuracy @ 1 Hz
saltybot/phone/battery percentage/charging/temp @ 1 Hz
JSON payloads
IMU:
{"ts": 1710000000.000,
"accel": {"x": 0.12, "y": -0.05, "z": 9.81},
"gyro": {"x": 0.01, "y": -0.00, "z": 0.00}}
GPS:
{"ts": 1710000000.000,
"lat": 45.123, "lon": -73.456, "alt_m": 12.3,
"accuracy_m": 3.5, "speed_ms": 0.0, "bearing_deg": 0.0,
"provider": "gps"}
Battery:
{"ts": 1710000000.000,
"pct": 87, "charging": true, "temp_c": 28.5,
"health": "GOOD", "plugged": "AC"}
Usage
python3 phone/sensor_dashboard.py [OPTIONS]
--broker HOST MQTT broker IP/hostname (default: 192.168.1.100)
--port PORT MQTT broker port (default: 1883)
--imu-hz FLOAT IMU publish rate Hz (default: 5.0)
--gps-hz FLOAT GPS publish rate Hz (default: 1.0)
--bat-hz FLOAT Battery publish rate Hz (default: 1.0)
--qos INT MQTT QoS level 0/1/2 (default: 0)
--no-imu Disable IMU publishing
--no-gps Disable GPS publishing
--no-battery Disable battery publishing
--debug Verbose logging
Dependencies (install in Termux)
pkg install termux-api python
pip install paho-mqtt
"""
import argparse
import json
import logging
import subprocess
import sys
import threading
import time
from typing import Optional
try:
import paho.mqtt.client as mqtt
MQTT_AVAILABLE = True
except ImportError:
MQTT_AVAILABLE = False
# ── MQTT topic roots ──────────────────────────────────────────────────────────
TOPIC_IMU = "saltybot/phone/imu"
TOPIC_GPS = "saltybot/phone/gps"
TOPIC_BATTERY = "saltybot/phone/battery"
# ── termux-api wrappers ───────────────────────────────────────────────────────
def _run_termux(cmd: list[str], timeout: float = 5.0) -> Optional[dict]:
"""Run a termux-api command and parse its JSON output. Returns None on error."""
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout,
)
if result.returncode != 0 or not result.stdout.strip():
logging.debug("termux cmd %s returned %d: %s",
cmd[0], result.returncode, result.stderr.strip())
return None
return json.loads(result.stdout)
except (subprocess.TimeoutExpired, json.JSONDecodeError, FileNotFoundError) as e:
logging.debug("termux cmd %s error: %s", cmd[0], e)
return None
def read_sensor_once(sensor_name: str) -> Optional[dict]:
"""
Call termux-sensor for a single reading of @sensor_name.
termux-sensor -s <name> -n 1 returns one sample then exits.
"""
raw = _run_termux(["termux-sensor", "-s", sensor_name, "-n", "1"], timeout=4.0)
if raw is None:
return None
# Response shape: {"<sensor_name>": {"values": [x, y, z], ...}}
# Key may be the full sensor description string; iterate to find it.
for key, val in raw.items():
if isinstance(val, dict) and "values" in val:
return val
return None
def read_imu() -> Optional[dict]:
"""
Read accelerometer and gyroscope in a single call each.
Returns dict with 'accel' and 'gyro' sub-dicts, or None if both fail.
"""
accel_data = read_sensor_once("accelerometer")
gyro_data = read_sensor_once("gyroscope")
accel = None
if accel_data and isinstance(accel_data.get("values"), list):
v = accel_data["values"]
if len(v) >= 3:
accel = {"x": float(v[0]), "y": float(v[1]), "z": float(v[2])}
gyro = None
if gyro_data and isinstance(gyro_data.get("values"), list):
v = gyro_data["values"]
if len(v) >= 3:
gyro = {"x": float(v[0]), "y": float(v[1]), "z": float(v[2])}
if accel is None and gyro is None:
return None
return {
"ts": time.time(),
"accel": accel or {"x": 0.0, "y": 0.0, "z": 0.0},
"gyro": gyro or {"x": 0.0, "y": 0.0, "z": 0.0},
}
def read_gps() -> Optional[dict]:
"""Read GPS fix via termux-location."""
raw = _run_termux(["termux-location", "-p", "gps", "-r", "once"], timeout=10.0)
if raw is None:
# Fallback: try network provider if GPS cold
raw = _run_termux(
["termux-location", "-p", "network", "-r", "once"], timeout=6.0
)
if raw is None:
return None
return {
"ts": time.time(),
"lat": float(raw.get("latitude", 0.0)),
"lon": float(raw.get("longitude", 0.0)),
"alt_m": float(raw.get("altitude", 0.0)),
"accuracy_m": float(raw.get("accuracy", -1.0)),
"speed_ms": float(raw.get("speed", 0.0)),
"bearing_deg": float(raw.get("bearing", 0.0)),
"provider": str(raw.get("provider", "unknown")),
}
def read_battery() -> Optional[dict]:
"""Read battery status via termux-battery-status."""
raw = _run_termux(["termux-battery-status"], timeout=4.0)
if raw is None:
return None
return {
"ts": time.time(),
"pct": int(raw.get("percentage", -1)),
"charging": raw.get("status", "DISCHARGING") not in ("DISCHARGING", "UNKNOWN"),
"temp_c": float(raw.get("temperature", 0.0)),
"health": str(raw.get("health", "UNKNOWN")),
"plugged": str(raw.get("plugged", "UNPLUGGED")),
}
# ── MQTT client with auto-reconnect ───────────────────────────────────────────
class MQTTPublisher:
"""
Thin paho-mqtt wrapper with:
- Automatic reconnect on disconnect (exponential back-off, max 60 s)
- Thread-safe publish() queues messages if offline
- Connection status accessible via .connected property
"""
_RECONNECT_BASE = 2.0 # seconds
_RECONNECT_MAX = 60.0
def __init__(self, broker: str, port: int, qos: int = 0):
self._broker = broker
self._port = port
self._qos = qos
self._lock = threading.Lock()
self._connected = False
self._reconnect_delay = self._RECONNECT_BASE
self._client = mqtt.Client(client_id="saltybot-phone-sensor", clean_session=True)
self._client.on_connect = self._on_connect
self._client.on_disconnect = self._on_disconnect
self._client.reconnect_delay_set(
min_delay=int(self._RECONNECT_BASE),
max_delay=int(self._RECONNECT_MAX),
)
self._connect()
def _connect(self) -> None:
try:
self._client.connect_async(self._broker, self._port, keepalive=60)
self._client.loop_start()
logging.info("MQTT connecting to %s:%d ...", self._broker, self._port)
except Exception as e:
logging.warning("MQTT initial connect error: %s", e)
def _on_connect(self, client, userdata, flags, rc) -> None:
if rc == 0:
with self._lock:
self._connected = True
self._reconnect_delay = self._RECONNECT_BASE
logging.info("MQTT connected to %s:%d", self._broker, self._port)
else:
logging.warning("MQTT connect failed rc=%d", rc)
def _on_disconnect(self, client, userdata, rc) -> None:
with self._lock:
self._connected = False
if rc != 0:
logging.warning("MQTT unexpected disconnect (rc=%d) — paho will retry", rc)
@property
def connected(self) -> bool:
with self._lock:
return self._connected
def publish(self, topic: str, payload: dict) -> bool:
"""Serialize @payload to JSON and publish to @topic. Returns True on success."""
if not self.connected:
logging.debug("MQTT offline — dropping %s", topic)
return False
try:
msg = json.dumps(payload, separators=(",", ":"))
info = self._client.publish(topic, msg, qos=self._qos, retain=False)
return info.rc == mqtt.MQTT_ERR_SUCCESS
except Exception as e:
logging.warning("MQTT publish error on %s: %s", topic, e)
return False
def shutdown(self) -> None:
self._client.loop_stop()
self._client.disconnect()
logging.info("MQTT disconnected.")
# ── Sensor polling threads ────────────────────────────────────────────────────
class SensorPoller(threading.Thread):
"""
Polls a sensor function at a fixed rate and publishes to MQTT.
Runs as a daemon thread so it exits cleanly when the main thread stops.
"""
def __init__(self, name: str, read_fn, topic: str,
hz: float, publisher: MQTTPublisher):
super().__init__(name=name, daemon=True)
self._read_fn = read_fn
self._topic = topic
self._interval = 1.0 / hz
self._pub = publisher
self._running = False
# Stats
self.published = 0
self.errors = 0
def run(self) -> None:
self._running = True
logging.info("Poller '%s' started at %.1f Hz → %s",
self.name, 1.0 / self._interval, self._topic)
while self._running:
t0 = time.monotonic()
try:
data = self._read_fn()
if data is not None:
if self._pub.publish(self._topic, data):
self.published += 1
logging.debug("%s: published %s", self.name,
list(data.keys()))
else:
self.errors += 1
else:
self.errors += 1
logging.debug("%s: read returned None", self.name)
except Exception as e:
self.errors += 1
logging.warning("%s: read error: %s", self.name, e)
elapsed = time.monotonic() - t0
sleep_s = max(0.0, self._interval - elapsed)
time.sleep(sleep_s)
def stop(self) -> None:
self._running = False
# ── Status logger ─────────────────────────────────────────────────────────────
def _status_logger(pollers: list[SensorPoller], publisher: MQTTPublisher,
stop_event: threading.Event) -> None:
"""Log per-poller stats every 30 s."""
while not stop_event.wait(30.0):
parts = [f"MQTT={'OK' if publisher.connected else 'DOWN'}"]
for p in pollers:
parts.append(f"{p.name}={p.published}ok/{p.errors}err")
logging.info("Status — %s", " ".join(parts))
# ── Entry point ───────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(
description="SaltyBot Termux sensor dashboard (Issue #574)"
)
parser.add_argument("--broker", default="192.168.1.100",
help="MQTT broker IP/hostname (default: 192.168.1.100)")
parser.add_argument("--port", type=int, default=1883,
help="MQTT broker port (default: 1883)")
parser.add_argument("--imu-hz", type=float, default=5.0,
help="IMU publish rate Hz (default: 5.0)")
parser.add_argument("--gps-hz", type=float, default=1.0,
help="GPS publish rate Hz (default: 1.0)")
parser.add_argument("--bat-hz", type=float, default=1.0,
help="Battery publish rate Hz (default: 1.0)")
parser.add_argument("--qos", type=int, default=0, choices=[0, 1, 2],
help="MQTT QoS level (default: 0)")
parser.add_argument("--no-imu", action="store_true", help="Disable IMU")
parser.add_argument("--no-gps", action="store_true", help="Disable GPS")
parser.add_argument("--no-battery", action="store_true", help="Disable battery")
parser.add_argument("--debug", action="store_true", help="Verbose logging")
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.debug else logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
if not MQTT_AVAILABLE:
logging.error("paho-mqtt not installed. Run: pip install paho-mqtt")
sys.exit(1)
if args.no_imu and args.no_gps and args.no_battery:
logging.error("All sensors disabled — nothing to do.")
sys.exit(1)
# Connect MQTT
publisher = MQTTPublisher(args.broker, args.port, qos=args.qos)
# Wait briefly for initial connection before starting pollers
deadline = time.monotonic() + 10.0
while not publisher.connected and time.monotonic() < deadline:
time.sleep(0.2)
if not publisher.connected:
logging.warning("MQTT not yet connected — pollers will start anyway and retry.")
# Build pollers for enabled sensors
pollers: list[SensorPoller] = []
if not args.no_imu:
pollers.append(SensorPoller("imu", read_imu, TOPIC_IMU, args.imu_hz, publisher))
if not args.no_gps:
pollers.append(SensorPoller("gps", read_gps, TOPIC_GPS, args.gps_hz, publisher))
if not args.no_battery:
pollers.append(SensorPoller("battery", read_battery, TOPIC_BATTERY, args.bat_hz, publisher))
for p in pollers:
p.start()
# Status log thread
stop_evt = threading.Event()
status_thread = threading.Thread(
target=_status_logger, args=(pollers, publisher, stop_evt), daemon=True
)
status_thread.start()
logging.info("Sensor dashboard running — Ctrl-C to stop.")
try:
while True:
time.sleep(1.0)
except KeyboardInterrupt:
logging.info("Shutting down...")
finally:
stop_evt.set()
for p in pollers:
p.stop()
publisher.shutdown()
logging.info("Done.")
if __name__ == "__main__":
main()