Merge pull request 'feat: GPS waypoint logger (Issue #617)' (#620) from sl-android/issue-617-waypoint-logger into main

This commit is contained in:
sl-jetson 2026-03-15 11:02:58 -04:00
commit 38df5b4ebb

705
phone/waypoint_logger.py Normal file
View File

@ -0,0 +1,705 @@
#!/usr/bin/env python3
"""
waypoint_logger.py GPS waypoint logger and route planner for SaltyBot (Issue #617)
Interactive CLI for recording, managing, and publishing GPS waypoints from
an Android/Termux phone. Uses termux-location for GPS fixes (same provider
as sensor_dashboard.py) and paho-mqtt to publish routes to the broker.
Features
Record GPS waypoint at current position (with name and optional tags)
List all saved waypoints with distance/bearing from previous
Delete a waypoint by index
Clear all waypoints in the active route
Publish the route as a Nav2-compatible waypoint list to MQTT
Subscribe to saltybot/phone/gps for live GPS if sensor_dashboard.py
is already running (avoids double GPS call)
Load/save route to JSON file (persistent between sessions)
MQTT topics
Publish: saltybot/phone/route
Subscribe: saltybot/phone/gps (live GPS from sensor_dashboard.py)
Route JSON published to MQTT
{
"route_name": "patrol_loop",
"ts": 1710000000.0,
"waypoints": [
{
"index": 0,
"name": "start",
"tags": ["patrol", "home"],
"lat": 45.123, "lon": -73.456, "alt_m": 12.3,
"accuracy_m": 2.1,
"recorded_at": 1710000000.0
}, ...
]
}
Nav2 PoseStamped list format (also included under "nav2_poses" key):
Each entry has frame_id, position {x,y,z}, orientation {x,y,z,w}
using a flat-earth approximation relative to the first waypoint.
Usage
python3 phone/waypoint_logger.py [OPTIONS]
--broker HOST MQTT broker IP (default: 192.168.1.100)
--port PORT MQTT port (default: 1883)
--file PATH Route JSON file (default: ~/saltybot_route.json)
--route NAME Route name (default: route)
--provider PROV GPS provider: gps|network|passive (default: gps)
--live-gps Subscribe to sensor_dashboard GPS instead of polling
--no-mqtt Disable MQTT (log-only mode)
--debug Verbose logging
Dependencies (Termux)
pkg install termux-api python
pip install paho-mqtt
"""
import argparse
import json
import logging
import math
import os
import subprocess
import sys
import threading
import time
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from typing import Optional
try:
import paho.mqtt.client as mqtt
MQTT_AVAILABLE = True
except ImportError:
MQTT_AVAILABLE = False
# ── Constants ─────────────────────────────────────────────────────────────────
TOPIC_ROUTE = "saltybot/phone/route"
TOPIC_GPS_LIVE = "saltybot/phone/gps"
EARTH_RADIUS_M = 6_371_000.0 # mean Earth radius in metres
DEFAULT_FILE = os.path.expanduser("~/saltybot_route.json")
# ── Data model ────────────────────────────────────────────────────────────────
@dataclass
class Waypoint:
index: int
name: str
lat: float
lon: float
alt_m: float = 0.0
accuracy_m: float = -1.0
tags: list[str] = field(default_factory=list)
recorded_at: float = field(default_factory=time.time)
def to_dict(self) -> dict:
return asdict(self)
@staticmethod
def from_dict(d: dict) -> "Waypoint":
return Waypoint(
index = int(d.get("index", 0)),
name = str(d.get("name", "")),
lat = float(d["lat"]),
lon = float(d["lon"]),
alt_m = float(d.get("alt_m", 0.0)),
accuracy_m = float(d.get("accuracy_m", -1.0)),
tags = list(d.get("tags", [])),
recorded_at = float(d.get("recorded_at", 0.0)),
)
# ── Geo maths ─────────────────────────────────────────────────────────────────
def haversine_m(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Great-circle distance between two WGS84 points (metres)."""
φ1, φ2 = math.radians(lat1), math.radians(lat2)
Δφ = math.radians(lat2 - lat1)
Δλ = math.radians(lon2 - lon1)
a = math.sin(Δφ / 2) ** 2 + math.cos(φ1) * math.cos(φ2) * math.sin(Δλ / 2) ** 2
return 2 * EARTH_RADIUS_M * math.asin(math.sqrt(a))
def bearing_deg(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Initial bearing from point-1 to point-2 (degrees, 0=N, 90=E)."""
φ1, φ2 = math.radians(lat1), math.radians(lat2)
Δλ = math.radians(lon2 - lon1)
x = math.sin(Δλ) * math.cos(φ2)
y = math.cos(φ1) * math.sin(φ2) - math.sin(φ1) * math.cos(φ2) * math.cos(Δλ)
return (math.degrees(math.atan2(x, y)) + 360) % 360
def _compass(deg: float) -> str:
"""Convert bearing degrees to 8-point compass label."""
dirs = ["N", "NE", "E", "SE", "S", "SW", "W", "NW"]
return dirs[round(deg / 45) % 8]
def flat_earth_xy(origin_lat: float, origin_lon: float,
lat: float, lon: float) -> tuple[float, float]:
"""
Simple flat-earth approximation metres east/north from origin.
Accurate within ~1% for distances < 100 km.
"""
lat_rad = math.radians((origin_lat + lat) / 2.0)
x = math.radians(lon - origin_lon) * EARTH_RADIUS_M * math.cos(lat_rad)
y = math.radians(lat - origin_lat) * EARTH_RADIUS_M
return x, y
# ── GPS acquisition ───────────────────────────────────────────────────────────
def get_gps_fix(provider: str = "gps", timeout: float = 12.0) -> Optional[dict]:
"""
Acquire a GPS fix via termux-location.
Falls back to 'network' provider if 'gps' times out.
Returns a dict with lat, lon, alt_m, accuracy_m or None on failure.
"""
def _try(prov: str, t: float) -> Optional[dict]:
try:
r = subprocess.run(
["termux-location", "-p", prov, "-r", "once"],
capture_output=True, text=True, timeout=t,
)
if r.returncode != 0 or not r.stdout.strip():
return None
raw = json.loads(r.stdout)
lat = float(raw.get("latitude", 0.0))
lon = float(raw.get("longitude", 0.0))
if lat == 0.0 and lon == 0.0:
return None
return {
"lat": lat,
"lon": lon,
"alt_m": float(raw.get("altitude", 0.0)),
"accuracy_m": float(raw.get("accuracy", -1.0)),
"provider": prov,
}
except (subprocess.TimeoutExpired, json.JSONDecodeError,
FileNotFoundError, KeyError):
return None
fix = _try(provider, timeout)
if fix is None and provider == "gps":
logging.debug("GPS timeout — retrying with network provider")
fix = _try("network", 6.0)
return fix
# ── Live GPS subscriber (optional, uses sensor_dashboard.py stream) ──────────
class LiveGPS:
"""
Subscribe to saltybot/phone/gps on MQTT and cache the latest fix.
Used when sensor_dashboard.py is already running to avoid double GPS calls.
"""
def __init__(self, broker: str, port: int):
self._lock = threading.Lock()
self._latest: Optional[dict] = None
if not MQTT_AVAILABLE:
return
self._client = mqtt.Client(client_id="wp-live-gps", clean_session=True)
self._client.on_connect = lambda c, u, f, rc: c.subscribe(TOPIC_GPS_LIVE)
self._client.on_message = self._on_msg
try:
self._client.connect(broker, port, keepalive=30)
self._client.loop_start()
except Exception as e:
logging.warning("LiveGPS connect failed: %s", e)
def _on_msg(self, client, userdata, message) -> None:
try:
data = json.loads(message.payload)
with self._lock:
self._latest = data
except json.JSONDecodeError:
pass
def get(self, max_age_s: float = 3.0) -> Optional[dict]:
with self._lock:
d = self._latest
if d is None:
return None
if time.time() - d.get("ts", 0.0) > max_age_s:
return None
return {
"lat": d.get("lat", 0.0),
"lon": d.get("lon", 0.0),
"alt_m": d.get("alt_m", 0.0),
"accuracy_m": d.get("accuracy_m", -1.0),
"provider": d.get("provider", "live"),
}
def stop(self) -> None:
if MQTT_AVAILABLE and hasattr(self, "_client"):
self._client.loop_stop()
self._client.disconnect()
# ── Route ─────────────────────────────────────────────────────────────────────
class Route:
"""Ordered list of waypoints with persistence and Nav2 export."""
def __init__(self, name: str, path: str):
self.name = name
self._path = path
self._waypoints: list[Waypoint] = []
self._load()
# ── Persistence ───────────────────────────────────────────────────────────
def _load(self) -> None:
if not os.path.exists(self._path):
return
try:
with open(self._path, "r") as f:
data = json.load(f)
self.name = data.get("route_name", self.name)
self._waypoints = [Waypoint.from_dict(w) for w in data.get("waypoints", [])]
# Re-index to keep indices contiguous after deletes
for i, wp in enumerate(self._waypoints):
wp.index = i
logging.info("Loaded %d waypoints from %s", len(self._waypoints), self._path)
except (json.JSONDecodeError, KeyError, ValueError) as e:
logging.warning("Could not load route file: %s", e)
def save(self) -> None:
with open(self._path, "w") as f:
json.dump(self.to_dict(), f, indent=2)
logging.debug("Saved route to %s", self._path)
# ── Waypoint operations ───────────────────────────────────────────────────
def add(self, name: str, lat: float, lon: float, alt_m: float,
accuracy_m: float, tags: list[str]) -> Waypoint:
wp = Waypoint(
index = len(self._waypoints),
name = name,
lat = lat,
lon = lon,
alt_m = alt_m,
accuracy_m = accuracy_m,
tags = tags,
)
self._waypoints.append(wp)
self.save()
return wp
def delete(self, index: int) -> bool:
if not (0 <= index < len(self._waypoints)):
return False
self._waypoints.pop(index)
for i, wp in enumerate(self._waypoints):
wp.index = i
self.save()
return True
def clear(self) -> int:
n = len(self._waypoints)
self._waypoints.clear()
self.save()
return n
def __len__(self) -> int:
return len(self._waypoints)
def __iter__(self):
return iter(self._waypoints)
def get(self, index: int) -> Optional[Waypoint]:
if 0 <= index < len(self._waypoints):
return self._waypoints[index]
return None
# ── Serialisation ─────────────────────────────────────────────────────────
def to_dict(self) -> dict:
return {
"route_name": self.name,
"ts": time.time(),
"waypoints": [wp.to_dict() for wp in self._waypoints],
"nav2_poses": self._nav2_poses(),
}
def _nav2_poses(self) -> list[dict]:
"""
Flat-earth Nav2 PoseStamped list (x east, y north) relative to
the first waypoint. Yaw faces the next waypoint; last faces prev.
"""
if not self._waypoints:
return []
origin = self._waypoints[0]
poses = []
for i, wp in enumerate(self._waypoints):
x, y = flat_earth_xy(origin.lat, origin.lon, wp.lat, wp.lon)
# Compute yaw: face toward next waypoint (or prev for last)
if i + 1 < len(self._waypoints):
nxt = self._waypoints[i + 1]
nx, ny = flat_earth_xy(origin.lat, origin.lon, nxt.lat, nxt.lon)
yaw = math.atan2(ny - y, nx - x)
elif i > 0:
prv = self._waypoints[i - 1]
px, py = flat_earth_xy(origin.lat, origin.lon, prv.lat, prv.lon)
yaw = math.atan2(y - py, x - px)
else:
yaw = 0.0
poses.append({
"frame_id": "map",
"position": {"x": round(x, 3), "y": round(y, 3), "z": round(wp.alt_m, 3)},
"orientation": {
"x": 0.0, "y": 0.0,
"z": round(math.sin(yaw / 2), 6),
"w": round(math.cos(yaw / 2), 6),
},
"waypoint_name": wp.name,
})
return poses
# ── MQTT publisher ────────────────────────────────────────────────────────────
class MQTTClient:
"""Simple synchronous paho wrapper for route publishing."""
def __init__(self, broker: str, port: int):
self._broker = broker
self._port = port
self._client = None
self._connected = False
if not MQTT_AVAILABLE:
return
self._client = mqtt.Client(client_id="saltybot-waypoint-logger", clean_session=True)
self._client.on_connect = self._on_connect
self._client.on_disconnect = self._on_disconnect
self._client.reconnect_delay_set(min_delay=3, max_delay=30)
try:
self._client.connect_async(broker, port, keepalive=60)
self._client.loop_start()
except Exception as e:
logging.warning("MQTT connect failed: %s", e)
def _on_connect(self, c, u, f, rc) -> None:
self._connected = (rc == 0)
if rc == 0:
logging.debug("MQTT connected")
def _on_disconnect(self, c, u, rc) -> None:
self._connected = False
def publish(self, topic: str, payload: dict, qos: int = 1) -> bool:
if self._client is None:
return False
# Wait briefly for connection
deadline = time.monotonic() + 5.0
while not self._connected and time.monotonic() < deadline:
time.sleep(0.1)
if not self._connected:
return False
info = self._client.publish(
topic, json.dumps(payload, separators=(",", ":")), qos=qos, retain=True
)
return info.rc == 0
def stop(self) -> None:
if self._client:
self._client.loop_stop()
self._client.disconnect()
# ── CLI helpers ───────────────────────────────────────────────────────────────
def _fmt_ts(epoch: float) -> str:
return datetime.fromtimestamp(epoch, tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
def _fmt_dist(metres: float) -> str:
if metres < 1000:
return f"{metres:.1f} m"
return f"{metres / 1000:.3f} km"
def _print_waypoints(route: Route) -> None:
if not len(route):
print(" (no waypoints)")
return
print(f"\n {'#':>3} {'Name':<18} {'Lat':>10} {'Lon':>11} {'Alt':>7} {'Acc':>7} {'Tags'}")
print(" " + "" * 75)
prev = None
for wp in route:
dist_str = ""
brg_str = ""
if prev is not None:
d = haversine_m(prev.lat, prev.lon, wp.lat, wp.lon)
b = bearing_deg(prev.lat, prev.lon, wp.lat, wp.lon)
dist_str = f"{_fmt_dist(d)} {_compass(b)}"
tags = ",".join(wp.tags) if wp.tags else ""
print(f" {wp.index:>3} {wp.name:<18} {wp.lat:>10.6f} {wp.lon:>11.6f}"
f" {wp.alt_m:>6.1f}m "
f"{wp.accuracy_m:>5.1f}m {tags}{dist_str}")
prev = wp
print()
def _acquire_fix(args, live_gps: Optional[LiveGPS]) -> Optional[dict]:
"""Get a GPS fix — from live MQTT stream if available, else termux-location."""
if live_gps is not None:
fix = live_gps.get(max_age_s=3.0)
if fix is not None:
return fix
print(" (live GPS stale — falling back to termux-location)")
print(" Acquiring GPS fix...", end="", flush=True)
fix = get_gps_fix(provider=args.provider)
if fix:
print(f" {fix['lat']:.6f}, {fix['lon']:.6f} ±{fix['accuracy_m']:.1f}m")
else:
print(" FAILED")
return fix
# ── Interactive menu ──────────────────────────────────────────────────────────
def _menu_record(route: Route, args, live_gps: Optional[LiveGPS]) -> None:
fix = _acquire_fix(args, live_gps)
if fix is None:
print(" Could not get GPS fix.")
return
default_name = f"wp{len(route)}"
name = input(f" Waypoint name [{default_name}]: ").strip() or default_name
tags_raw = input(" Tags (comma-separated, or Enter to skip): ").strip()
tags = [t.strip() for t in tags_raw.split(",") if t.strip()] if tags_raw else []
wp = route.add(name, fix["lat"], fix["lon"], fix["alt_m"], fix["accuracy_m"], tags)
print(f" ✓ Recorded #{wp.index}: {wp.name} ({wp.lat:.6f}, {wp.lon:.6f})")
def _menu_list(route: Route) -> None:
print(f"\n Route: '{route.name}' ({len(route)} waypoints)")
_print_waypoints(route)
if len(route) >= 2:
wps = list(route)
total = sum(
haversine_m(wps[i].lat, wps[i].lon, wps[i+1].lat, wps[i+1].lon)
for i in range(len(wps) - 1)
)
print(f" Total route distance: {_fmt_dist(total)}")
def _menu_delete(route: Route) -> None:
if not len(route):
print(" No waypoints to delete.")
return
_print_waypoints(route)
raw = input(" Enter index to delete (or Enter to cancel): ").strip()
if not raw:
return
try:
idx = int(raw)
except ValueError:
print(" Invalid index.")
return
wp = route.get(idx)
if wp is None:
print(f" No waypoint at index {idx}.")
return
confirm = input(f" Delete '{wp.name}'? [y/N]: ").strip().lower()
if confirm == "y":
route.delete(idx)
print(f" ✓ Deleted '{wp.name}'.")
else:
print(" Cancelled.")
def _menu_publish(route: Route, mqtt_client: Optional[MQTTClient]) -> None:
if not len(route):
print(" No waypoints to publish.")
return
payload = route.to_dict()
print(f"\n Publishing {len(route)} waypoints to {TOPIC_ROUTE}...")
if mqtt_client is None:
print(" (MQTT disabled — printing payload)")
print(json.dumps(payload, indent=2))
return
ok = mqtt_client.publish(TOPIC_ROUTE, payload, qos=1)
if ok:
print(f" ✓ Published to {TOPIC_ROUTE} (retained=true)")
else:
print(" ✗ MQTT publish failed — check broker connection.")
# Also print Nav2 summary
poses = payload.get("nav2_poses", [])
if poses:
print(f"\n Nav2 poses ({len(poses)}):")
for p in poses:
pos = p["position"]
print(f" {p['waypoint_name']:<18} x={pos['x']:>8.2f}m y={pos['y']:>8.2f}m")
def _menu_clear(route: Route) -> None:
if not len(route):
print(" Route is already empty.")
return
confirm = input(f" Clear all {len(route)} waypoints? [y/N]: ").strip().lower()
if confirm == "y":
n = route.clear()
print(f" ✓ Cleared {n} waypoints.")
else:
print(" Cancelled.")
def _menu_rename(route: Route) -> None:
new_name = input(f" New route name [{route.name}]: ").strip()
if new_name:
route.name = new_name
route.save()
print(f" ✓ Route renamed to '{route.name}'.")
def _menu_info(route: Route) -> None:
print(f"\n Route file: {route._path}")
print(f" Route name: {route.name}")
print(f" Waypoints: {len(route)}")
if len(route):
wps = list(route)
print(f" First wp: {wps[0].name} ({_fmt_ts(wps[0].recorded_at)})")
print(f" Last wp: {wps[-1].name} ({_fmt_ts(wps[-1].recorded_at)})")
if len(route) >= 2:
total = sum(
haversine_m(wps[i].lat, wps[i].lon, wps[i+1].lat, wps[i+1].lon)
for i in range(len(wps) - 1)
)
print(f" Total dist: {_fmt_dist(total)}")
print()
# ── Main loop ─────────────────────────────────────────────────────────────────
MENU = """
SaltyBot Waypoint Logger
r Record waypoint at current GPS position
l List all waypoints
d Delete a waypoint
p Publish route to MQTT
c Clear all waypoints
n Rename route
i Route info
q Quit
"""
def main() -> None:
parser = argparse.ArgumentParser(
description="SaltyBot GPS waypoint logger (Issue #617)"
)
parser.add_argument("--broker", default="192.168.1.100",
help="MQTT broker IP (default: 192.168.1.100)")
parser.add_argument("--port", type=int, default=1883,
help="MQTT port (default: 1883)")
parser.add_argument("--file", default=DEFAULT_FILE,
help=f"Route JSON file (default: {DEFAULT_FILE})")
parser.add_argument("--route", default="route",
help="Route name (default: route)")
parser.add_argument("--provider", default="gps",
choices=["gps", "network", "passive"],
help="GPS provider (default: gps)")
parser.add_argument("--live-gps", action="store_true",
help="Subscribe to saltybot/phone/gps for live GPS")
parser.add_argument("--no-mqtt", action="store_true",
help="Disable MQTT (log-only mode)")
parser.add_argument("--debug", action="store_true",
help="Verbose logging")
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.debug else logging.WARNING,
format="%(asctime)s [%(levelname)s] %(message)s",
)
# Load or create route
route = Route(args.route, args.file)
# MQTT client
mqtt_client: Optional[MQTTClient] = None
if not args.no_mqtt:
if MQTT_AVAILABLE:
mqtt_client = MQTTClient(args.broker, args.port)
else:
print("Warning: paho-mqtt not installed — MQTT disabled. Run: pip install paho-mqtt")
# Optional live GPS subscription
live_gps: Optional[LiveGPS] = None
if args.live_gps and MQTT_AVAILABLE:
live_gps = LiveGPS(args.broker, args.port)
print(f" Subscribed to live GPS on {TOPIC_GPS_LIVE}")
print(MENU)
print(f" Route: '{route.name}' | {len(route)} waypoints loaded | file: {args.file}")
print()
try:
while True:
try:
choice = input(" > ").strip().lower()
except EOFError:
break
if choice in ("q", "quit", "exit"):
break
elif choice in ("r", "record"):
_menu_record(route, args, live_gps)
elif choice in ("l", "list"):
_menu_list(route)
elif choice in ("d", "delete"):
_menu_delete(route)
elif choice in ("p", "publish"):
_menu_publish(route, mqtt_client)
elif choice in ("c", "clear"):
_menu_clear(route)
elif choice in ("n", "rename"):
_menu_rename(route)
elif choice in ("i", "info"):
_menu_info(route)
elif choice == "":
pass
else:
print(" Unknown command. Type r/l/d/p/c/n/i/q.")
except KeyboardInterrupt:
print("\n Interrupted.")
finally:
if live_gps:
live_gps.stop()
if mqtt_client:
mqtt_client.stop()
print(" Bye.")
if __name__ == "__main__":
main()