#!/usr/bin/env python3 """ waypoint_logger.py — GPS waypoint logger and route planner for SaltyBot (Issue #617) Interactive CLI for recording, managing, and publishing GPS waypoints from an Android/Termux phone. Uses termux-location for GPS fixes (same provider as sensor_dashboard.py) and paho-mqtt to publish routes to the broker. Features ──────── • Record GPS waypoint at current position (with name and optional tags) • List all saved waypoints with distance/bearing from previous • Delete a waypoint by index • Clear all waypoints in the active route • Publish the route as a Nav2-compatible waypoint list to MQTT • Subscribe to saltybot/phone/gps for live GPS if sensor_dashboard.py is already running (avoids double GPS call) • Load/save route to JSON file (persistent between sessions) MQTT topics ─────────── Publish: saltybot/phone/route Subscribe: saltybot/phone/gps (live GPS from sensor_dashboard.py) Route JSON published to MQTT ───────────────────────────── { "route_name": "patrol_loop", "ts": 1710000000.0, "waypoints": [ { "index": 0, "name": "start", "tags": ["patrol", "home"], "lat": 45.123, "lon": -73.456, "alt_m": 12.3, "accuracy_m": 2.1, "recorded_at": 1710000000.0 }, ... ] } Nav2 PoseStamped list format (also included under "nav2_poses" key): Each entry has frame_id, position {x,y,z}, orientation {x,y,z,w} using a flat-earth approximation relative to the first waypoint. Usage ───── python3 phone/waypoint_logger.py [OPTIONS] --broker HOST MQTT broker IP (default: 192.168.1.100) --port PORT MQTT port (default: 1883) --file PATH Route JSON file (default: ~/saltybot_route.json) --route NAME Route name (default: route) --provider PROV GPS provider: gps|network|passive (default: gps) --live-gps Subscribe to sensor_dashboard GPS instead of polling --no-mqtt Disable MQTT (log-only mode) --debug Verbose logging Dependencies (Termux) ────────────────────── pkg install termux-api python pip install paho-mqtt """ import argparse import json import logging import math import os import subprocess import sys import threading import time from dataclasses import dataclass, field, asdict from datetime import datetime, timezone from typing import Optional try: import paho.mqtt.client as mqtt MQTT_AVAILABLE = True except ImportError: MQTT_AVAILABLE = False # ── Constants ───────────────────────────────────────────────────────────────── TOPIC_ROUTE = "saltybot/phone/route" TOPIC_GPS_LIVE = "saltybot/phone/gps" EARTH_RADIUS_M = 6_371_000.0 # mean Earth radius in metres DEFAULT_FILE = os.path.expanduser("~/saltybot_route.json") # ── Data model ──────────────────────────────────────────────────────────────── @dataclass class Waypoint: index: int name: str lat: float lon: float alt_m: float = 0.0 accuracy_m: float = -1.0 tags: list[str] = field(default_factory=list) recorded_at: float = field(default_factory=time.time) def to_dict(self) -> dict: return asdict(self) @staticmethod def from_dict(d: dict) -> "Waypoint": return Waypoint( index = int(d.get("index", 0)), name = str(d.get("name", "")), lat = float(d["lat"]), lon = float(d["lon"]), alt_m = float(d.get("alt_m", 0.0)), accuracy_m = float(d.get("accuracy_m", -1.0)), tags = list(d.get("tags", [])), recorded_at = float(d.get("recorded_at", 0.0)), ) # ── Geo maths ───────────────────────────────────────────────────────────────── def haversine_m(lat1: float, lon1: float, lat2: float, lon2: float) -> float: """Great-circle distance between two WGS84 points (metres).""" φ1, φ2 = math.radians(lat1), math.radians(lat2) Δφ = math.radians(lat2 - lat1) Δλ = math.radians(lon2 - lon1) a = math.sin(Δφ / 2) ** 2 + math.cos(φ1) * math.cos(φ2) * math.sin(Δλ / 2) ** 2 return 2 * EARTH_RADIUS_M * math.asin(math.sqrt(a)) def bearing_deg(lat1: float, lon1: float, lat2: float, lon2: float) -> float: """Initial bearing from point-1 to point-2 (degrees, 0=N, 90=E).""" φ1, φ2 = math.radians(lat1), math.radians(lat2) Δλ = math.radians(lon2 - lon1) x = math.sin(Δλ) * math.cos(φ2) y = math.cos(φ1) * math.sin(φ2) - math.sin(φ1) * math.cos(φ2) * math.cos(Δλ) return (math.degrees(math.atan2(x, y)) + 360) % 360 def _compass(deg: float) -> str: """Convert bearing degrees to 8-point compass label.""" dirs = ["N", "NE", "E", "SE", "S", "SW", "W", "NW"] return dirs[round(deg / 45) % 8] def flat_earth_xy(origin_lat: float, origin_lon: float, lat: float, lon: float) -> tuple[float, float]: """ Simple flat-earth approximation — metres east/north from origin. Accurate within ~1% for distances < 100 km. """ lat_rad = math.radians((origin_lat + lat) / 2.0) x = math.radians(lon - origin_lon) * EARTH_RADIUS_M * math.cos(lat_rad) y = math.radians(lat - origin_lat) * EARTH_RADIUS_M return x, y # ── GPS acquisition ─────────────────────────────────────────────────────────── def get_gps_fix(provider: str = "gps", timeout: float = 12.0) -> Optional[dict]: """ Acquire a GPS fix via termux-location. Falls back to 'network' provider if 'gps' times out. Returns a dict with lat, lon, alt_m, accuracy_m or None on failure. """ def _try(prov: str, t: float) -> Optional[dict]: try: r = subprocess.run( ["termux-location", "-p", prov, "-r", "once"], capture_output=True, text=True, timeout=t, ) if r.returncode != 0 or not r.stdout.strip(): return None raw = json.loads(r.stdout) lat = float(raw.get("latitude", 0.0)) lon = float(raw.get("longitude", 0.0)) if lat == 0.0 and lon == 0.0: return None return { "lat": lat, "lon": lon, "alt_m": float(raw.get("altitude", 0.0)), "accuracy_m": float(raw.get("accuracy", -1.0)), "provider": prov, } except (subprocess.TimeoutExpired, json.JSONDecodeError, FileNotFoundError, KeyError): return None fix = _try(provider, timeout) if fix is None and provider == "gps": logging.debug("GPS timeout — retrying with network provider") fix = _try("network", 6.0) return fix # ── Live GPS subscriber (optional, uses sensor_dashboard.py stream) ────────── class LiveGPS: """ Subscribe to saltybot/phone/gps on MQTT and cache the latest fix. Used when sensor_dashboard.py is already running to avoid double GPS calls. """ def __init__(self, broker: str, port: int): self._lock = threading.Lock() self._latest: Optional[dict] = None if not MQTT_AVAILABLE: return self._client = mqtt.Client(client_id="wp-live-gps", clean_session=True) self._client.on_connect = lambda c, u, f, rc: c.subscribe(TOPIC_GPS_LIVE) self._client.on_message = self._on_msg try: self._client.connect(broker, port, keepalive=30) self._client.loop_start() except Exception as e: logging.warning("LiveGPS connect failed: %s", e) def _on_msg(self, client, userdata, message) -> None: try: data = json.loads(message.payload) with self._lock: self._latest = data except json.JSONDecodeError: pass def get(self, max_age_s: float = 3.0) -> Optional[dict]: with self._lock: d = self._latest if d is None: return None if time.time() - d.get("ts", 0.0) > max_age_s: return None return { "lat": d.get("lat", 0.0), "lon": d.get("lon", 0.0), "alt_m": d.get("alt_m", 0.0), "accuracy_m": d.get("accuracy_m", -1.0), "provider": d.get("provider", "live"), } def stop(self) -> None: if MQTT_AVAILABLE and hasattr(self, "_client"): self._client.loop_stop() self._client.disconnect() # ── Route ───────────────────────────────────────────────────────────────────── class Route: """Ordered list of waypoints with persistence and Nav2 export.""" def __init__(self, name: str, path: str): self.name = name self._path = path self._waypoints: list[Waypoint] = [] self._load() # ── Persistence ─────────────────────────────────────────────────────────── def _load(self) -> None: if not os.path.exists(self._path): return try: with open(self._path, "r") as f: data = json.load(f) self.name = data.get("route_name", self.name) self._waypoints = [Waypoint.from_dict(w) for w in data.get("waypoints", [])] # Re-index to keep indices contiguous after deletes for i, wp in enumerate(self._waypoints): wp.index = i logging.info("Loaded %d waypoints from %s", len(self._waypoints), self._path) except (json.JSONDecodeError, KeyError, ValueError) as e: logging.warning("Could not load route file: %s", e) def save(self) -> None: with open(self._path, "w") as f: json.dump(self.to_dict(), f, indent=2) logging.debug("Saved route to %s", self._path) # ── Waypoint operations ─────────────────────────────────────────────────── def add(self, name: str, lat: float, lon: float, alt_m: float, accuracy_m: float, tags: list[str]) -> Waypoint: wp = Waypoint( index = len(self._waypoints), name = name, lat = lat, lon = lon, alt_m = alt_m, accuracy_m = accuracy_m, tags = tags, ) self._waypoints.append(wp) self.save() return wp def delete(self, index: int) -> bool: if not (0 <= index < len(self._waypoints)): return False self._waypoints.pop(index) for i, wp in enumerate(self._waypoints): wp.index = i self.save() return True def clear(self) -> int: n = len(self._waypoints) self._waypoints.clear() self.save() return n def __len__(self) -> int: return len(self._waypoints) def __iter__(self): return iter(self._waypoints) def get(self, index: int) -> Optional[Waypoint]: if 0 <= index < len(self._waypoints): return self._waypoints[index] return None # ── Serialisation ───────────────────────────────────────────────────────── def to_dict(self) -> dict: return { "route_name": self.name, "ts": time.time(), "waypoints": [wp.to_dict() for wp in self._waypoints], "nav2_poses": self._nav2_poses(), } def _nav2_poses(self) -> list[dict]: """ Flat-earth Nav2 PoseStamped list (x east, y north) relative to the first waypoint. Yaw faces the next waypoint; last faces prev. """ if not self._waypoints: return [] origin = self._waypoints[0] poses = [] for i, wp in enumerate(self._waypoints): x, y = flat_earth_xy(origin.lat, origin.lon, wp.lat, wp.lon) # Compute yaw: face toward next waypoint (or prev for last) if i + 1 < len(self._waypoints): nxt = self._waypoints[i + 1] nx, ny = flat_earth_xy(origin.lat, origin.lon, nxt.lat, nxt.lon) yaw = math.atan2(ny - y, nx - x) elif i > 0: prv = self._waypoints[i - 1] px, py = flat_earth_xy(origin.lat, origin.lon, prv.lat, prv.lon) yaw = math.atan2(y - py, x - px) else: yaw = 0.0 poses.append({ "frame_id": "map", "position": {"x": round(x, 3), "y": round(y, 3), "z": round(wp.alt_m, 3)}, "orientation": { "x": 0.0, "y": 0.0, "z": round(math.sin(yaw / 2), 6), "w": round(math.cos(yaw / 2), 6), }, "waypoint_name": wp.name, }) return poses # ── MQTT publisher ──────────────────────────────────────────────────────────── class MQTTClient: """Simple synchronous paho wrapper for route publishing.""" def __init__(self, broker: str, port: int): self._broker = broker self._port = port self._client = None self._connected = False if not MQTT_AVAILABLE: return self._client = mqtt.Client(client_id="saltybot-waypoint-logger", clean_session=True) self._client.on_connect = self._on_connect self._client.on_disconnect = self._on_disconnect self._client.reconnect_delay_set(min_delay=3, max_delay=30) try: self._client.connect_async(broker, port, keepalive=60) self._client.loop_start() except Exception as e: logging.warning("MQTT connect failed: %s", e) def _on_connect(self, c, u, f, rc) -> None: self._connected = (rc == 0) if rc == 0: logging.debug("MQTT connected") def _on_disconnect(self, c, u, rc) -> None: self._connected = False def publish(self, topic: str, payload: dict, qos: int = 1) -> bool: if self._client is None: return False # Wait briefly for connection deadline = time.monotonic() + 5.0 while not self._connected and time.monotonic() < deadline: time.sleep(0.1) if not self._connected: return False info = self._client.publish( topic, json.dumps(payload, separators=(",", ":")), qos=qos, retain=True ) return info.rc == 0 def stop(self) -> None: if self._client: self._client.loop_stop() self._client.disconnect() # ── CLI helpers ─────────────────────────────────────────────────────────────── def _fmt_ts(epoch: float) -> str: return datetime.fromtimestamp(epoch, tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") def _fmt_dist(metres: float) -> str: if metres < 1000: return f"{metres:.1f} m" return f"{metres / 1000:.3f} km" def _print_waypoints(route: Route) -> None: if not len(route): print(" (no waypoints)") return print(f"\n {'#':>3} {'Name':<18} {'Lat':>10} {'Lon':>11} {'Alt':>7} {'Acc':>7} {'Tags'}") print(" " + "─" * 75) prev = None for wp in route: dist_str = "" brg_str = "" if prev is not None: d = haversine_m(prev.lat, prev.lon, wp.lat, wp.lon) b = bearing_deg(prev.lat, prev.lon, wp.lat, wp.lon) dist_str = f" ↑ {_fmt_dist(d)} {_compass(b)}" tags = ",".join(wp.tags) if wp.tags else "—" print(f" {wp.index:>3} {wp.name:<18} {wp.lat:>10.6f} {wp.lon:>11.6f}" f" {wp.alt_m:>6.1f}m " f"{wp.accuracy_m:>5.1f}m {tags}{dist_str}") prev = wp print() def _acquire_fix(args, live_gps: Optional[LiveGPS]) -> Optional[dict]: """Get a GPS fix — from live MQTT stream if available, else termux-location.""" if live_gps is not None: fix = live_gps.get(max_age_s=3.0) if fix is not None: return fix print(" (live GPS stale — falling back to termux-location)") print(" Acquiring GPS fix...", end="", flush=True) fix = get_gps_fix(provider=args.provider) if fix: print(f" {fix['lat']:.6f}, {fix['lon']:.6f} ±{fix['accuracy_m']:.1f}m") else: print(" FAILED") return fix # ── Interactive menu ────────────────────────────────────────────────────────── def _menu_record(route: Route, args, live_gps: Optional[LiveGPS]) -> None: fix = _acquire_fix(args, live_gps) if fix is None: print(" Could not get GPS fix.") return default_name = f"wp{len(route)}" name = input(f" Waypoint name [{default_name}]: ").strip() or default_name tags_raw = input(" Tags (comma-separated, or Enter to skip): ").strip() tags = [t.strip() for t in tags_raw.split(",") if t.strip()] if tags_raw else [] wp = route.add(name, fix["lat"], fix["lon"], fix["alt_m"], fix["accuracy_m"], tags) print(f" ✓ Recorded #{wp.index}: {wp.name} ({wp.lat:.6f}, {wp.lon:.6f})") def _menu_list(route: Route) -> None: print(f"\n Route: '{route.name}' ({len(route)} waypoints)") _print_waypoints(route) if len(route) >= 2: wps = list(route) total = sum( haversine_m(wps[i].lat, wps[i].lon, wps[i+1].lat, wps[i+1].lon) for i in range(len(wps) - 1) ) print(f" Total route distance: {_fmt_dist(total)}") def _menu_delete(route: Route) -> None: if not len(route): print(" No waypoints to delete.") return _print_waypoints(route) raw = input(" Enter index to delete (or Enter to cancel): ").strip() if not raw: return try: idx = int(raw) except ValueError: print(" Invalid index.") return wp = route.get(idx) if wp is None: print(f" No waypoint at index {idx}.") return confirm = input(f" Delete '{wp.name}'? [y/N]: ").strip().lower() if confirm == "y": route.delete(idx) print(f" ✓ Deleted '{wp.name}'.") else: print(" Cancelled.") def _menu_publish(route: Route, mqtt_client: Optional[MQTTClient]) -> None: if not len(route): print(" No waypoints to publish.") return payload = route.to_dict() print(f"\n Publishing {len(route)} waypoints to {TOPIC_ROUTE}...") if mqtt_client is None: print(" (MQTT disabled — printing payload)") print(json.dumps(payload, indent=2)) return ok = mqtt_client.publish(TOPIC_ROUTE, payload, qos=1) if ok: print(f" ✓ Published to {TOPIC_ROUTE} (retained=true)") else: print(" ✗ MQTT publish failed — check broker connection.") # Also print Nav2 summary poses = payload.get("nav2_poses", []) if poses: print(f"\n Nav2 poses ({len(poses)}):") for p in poses: pos = p["position"] print(f" {p['waypoint_name']:<18} x={pos['x']:>8.2f}m y={pos['y']:>8.2f}m") def _menu_clear(route: Route) -> None: if not len(route): print(" Route is already empty.") return confirm = input(f" Clear all {len(route)} waypoints? [y/N]: ").strip().lower() if confirm == "y": n = route.clear() print(f" ✓ Cleared {n} waypoints.") else: print(" Cancelled.") def _menu_rename(route: Route) -> None: new_name = input(f" New route name [{route.name}]: ").strip() if new_name: route.name = new_name route.save() print(f" ✓ Route renamed to '{route.name}'.") def _menu_info(route: Route) -> None: print(f"\n Route file: {route._path}") print(f" Route name: {route.name}") print(f" Waypoints: {len(route)}") if len(route): wps = list(route) print(f" First wp: {wps[0].name} ({_fmt_ts(wps[0].recorded_at)})") print(f" Last wp: {wps[-1].name} ({_fmt_ts(wps[-1].recorded_at)})") if len(route) >= 2: total = sum( haversine_m(wps[i].lat, wps[i].lon, wps[i+1].lat, wps[i+1].lon) for i in range(len(wps) - 1) ) print(f" Total dist: {_fmt_dist(total)}") print() # ── Main loop ───────────────────────────────────────────────────────────────── MENU = """ ┌─ SaltyBot Waypoint Logger ──────────────────────────┐ │ r Record waypoint at current GPS position │ │ l List all waypoints │ │ d Delete a waypoint │ │ p Publish route to MQTT │ │ c Clear all waypoints │ │ n Rename route │ │ i Route info │ │ q Quit │ └─────────────────────────────────────────────────────┘ """ def main() -> None: parser = argparse.ArgumentParser( description="SaltyBot GPS waypoint logger (Issue #617)" ) parser.add_argument("--broker", default="192.168.1.100", help="MQTT broker IP (default: 192.168.1.100)") parser.add_argument("--port", type=int, default=1883, help="MQTT port (default: 1883)") parser.add_argument("--file", default=DEFAULT_FILE, help=f"Route JSON file (default: {DEFAULT_FILE})") parser.add_argument("--route", default="route", help="Route name (default: route)") parser.add_argument("--provider", default="gps", choices=["gps", "network", "passive"], help="GPS provider (default: gps)") parser.add_argument("--live-gps", action="store_true", help="Subscribe to saltybot/phone/gps for live GPS") parser.add_argument("--no-mqtt", action="store_true", help="Disable MQTT (log-only mode)") parser.add_argument("--debug", action="store_true", help="Verbose logging") args = parser.parse_args() logging.basicConfig( level=logging.DEBUG if args.debug else logging.WARNING, format="%(asctime)s [%(levelname)s] %(message)s", ) # Load or create route route = Route(args.route, args.file) # MQTT client mqtt_client: Optional[MQTTClient] = None if not args.no_mqtt: if MQTT_AVAILABLE: mqtt_client = MQTTClient(args.broker, args.port) else: print("Warning: paho-mqtt not installed — MQTT disabled. Run: pip install paho-mqtt") # Optional live GPS subscription live_gps: Optional[LiveGPS] = None if args.live_gps and MQTT_AVAILABLE: live_gps = LiveGPS(args.broker, args.port) print(f" Subscribed to live GPS on {TOPIC_GPS_LIVE}") print(MENU) print(f" Route: '{route.name}' | {len(route)} waypoints loaded | file: {args.file}") print() try: while True: try: choice = input(" > ").strip().lower() except EOFError: break if choice in ("q", "quit", "exit"): break elif choice in ("r", "record"): _menu_record(route, args, live_gps) elif choice in ("l", "list"): _menu_list(route) elif choice in ("d", "delete"): _menu_delete(route) elif choice in ("p", "publish"): _menu_publish(route, mqtt_client) elif choice in ("c", "clear"): _menu_clear(route) elif choice in ("n", "rename"): _menu_rename(route) elif choice in ("i", "info"): _menu_info(route) elif choice == "": pass else: print(" Unknown command. Type r/l/d/p/c/n/i/q.") except KeyboardInterrupt: print("\n Interrupted.") finally: if live_gps: live_gps.stop() if mqtt_client: mqtt_client.stop() print(" Bye.") if __name__ == "__main__": main()