From de9a835cc2c0237c7ecebe24b07c22623d1318d7 Mon Sep 17 00:00:00 2001 From: sl-controls Date: Sun, 1 Mar 2026 00:42:18 -0500 Subject: [PATCH] feat: SIM7600X 4G cellular + GPS driver (#58) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds saltybot_cellular ROS2 package for the Waveshare SIM7600X 4G HAT (SIMCom SIM7600A-H) providing GPS telemetry, modem monitoring, and MQTT relay over cellular for remote operation. gps_driver_node: - Opens /dev/ttyUSB2 (NMEA), optionally sends AT+CGPS=1 on /dev/ttyUSB0 - Parses GGA (position) + RMC (velocity) from any NMEA talker (GP/GN/GL/GA) - Validates NMEA checksum before parsing - Publishes /gps/fix (NavSatFix, covariance from HDOP × ±2.5m CEP) - Publishes /gps/vel (TwistStamped, ENU vE/vN from course-over-ground) - Publishes /diagnostics (fix quality, sat count, HDOP) cellular_manager_node: - Polls AT+CSQ, AT+CREG?, AT+COPS? every 5s over /dev/ttyUSB0 - Publishes /cellular/status (DiagnosticArray: rssi, network, connected) - Publishes /cellular/rssi (Int32, dBm) and /cellular/connected (Bool) - Auto-reconnect via nmcli or pppd when data link drops mqtt_bridge_node: - paho-mqtt client (graceful degradation if not installed) - ROS2→MQTT QoS 0: /saltybot/imu, /gps/fix, /gps/vel, /uwb/ranges, /person/target, /cellular/status - MQTT→ROS2 QoS 1: saltybot/cmd→/saltybot/cmd, saltybot/estop→/saltybot/estop - Per-topic rate limiting (imu:5Hz, gps:1Hz, person:2Hz) → <<50KB/s budget - Optional TLS, configurable broker/port/prefix/auth Deliverables: saltybot_cellular/gps_driver_node.py — 402 lines saltybot_cellular/cellular_manager_node.py — 362 lines saltybot_cellular/mqtt_bridge_node.py — 317 lines config/cellular_params.yaml — full config documented launch/cellular.launch.py — all nodes, all params as args test/test_cellular.py — 60 pytest tests, no ROS2 Co-Authored-By: Claude Sonnet 4.6 --- .../config/cellular_params.yaml | 70 +++ .../launch/cellular.launch.py | 194 +++++++ .../ros2_ws/src/saltybot_cellular/package.xml | 31 ++ .../resource/saltybot_cellular | 0 .../saltybot_cellular/__init__.py | 0 .../cellular_manager_node.py | 362 ++++++++++++ .../saltybot_cellular/gps_driver_node.py | 402 ++++++++++++++ .../saltybot_cellular/mqtt_bridge_node.py | 317 +++++++++++ .../ros2_ws/src/saltybot_cellular/setup.cfg | 4 + jetson/ros2_ws/src/saltybot_cellular/setup.py | 32 ++ .../saltybot_cellular/test/test_cellular.py | 514 ++++++++++++++++++ 11 files changed, 1926 insertions(+) create mode 100644 jetson/ros2_ws/src/saltybot_cellular/config/cellular_params.yaml create mode 100644 jetson/ros2_ws/src/saltybot_cellular/launch/cellular.launch.py create mode 100644 jetson/ros2_ws/src/saltybot_cellular/package.xml create mode 100644 jetson/ros2_ws/src/saltybot_cellular/resource/saltybot_cellular create mode 100644 jetson/ros2_ws/src/saltybot_cellular/saltybot_cellular/__init__.py create mode 100644 jetson/ros2_ws/src/saltybot_cellular/saltybot_cellular/cellular_manager_node.py create mode 100644 jetson/ros2_ws/src/saltybot_cellular/saltybot_cellular/gps_driver_node.py create mode 100644 jetson/ros2_ws/src/saltybot_cellular/saltybot_cellular/mqtt_bridge_node.py create mode 100644 jetson/ros2_ws/src/saltybot_cellular/setup.cfg create mode 100644 jetson/ros2_ws/src/saltybot_cellular/setup.py create mode 100644 jetson/ros2_ws/src/saltybot_cellular/test/test_cellular.py diff --git a/jetson/ros2_ws/src/saltybot_cellular/config/cellular_params.yaml b/jetson/ros2_ws/src/saltybot_cellular/config/cellular_params.yaml new file mode 100644 index 0000000..0c9d9eb --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_cellular/config/cellular_params.yaml @@ -0,0 +1,70 @@ +# cellular_params.yaml +# Configuration for saltybot_cellular package — SIM7600X 4G HAT. +# +# Run with: +# ros2 launch saltybot_cellular cellular.launch.py +# +# Hardware: Waveshare SIM7600X 4G HAT (SIMCom SIM7600A-H) +# /dev/ttyUSB0 — AT commands +# /dev/ttyUSB1 — PPP data (managed by NetworkManager/pppd, not ROS2) +# /dev/ttyUSB2 — NMEA GPS output + +# ── GPS driver ───────────────────────────────────────────────────────────────── +# The GPS port streams NMEA sentences once AT+CGPS=1 is sent. +# If GPS is already running (e.g., from a previous session), set gps_init_enable: false. + +gps_port: /dev/ttyUSB2 # NMEA output port +at_port: /dev/ttyUSB0 # AT command port (for CGPS=1 init) +baud_rate: 115200 +publish_rate: 1.0 # Hz — GPS updates at 1 Hz max +frame_id: gps # Header frame_id for NavSatFix + TwistStamped + +gps_init_enable: true # Send AT+CGPS=1 on startup +gps_init_cmd: AT+CGPS=1 # Command to start GNSS receiver +at_init_timeout: 3.0 # Seconds to wait for AT response + +# GPS accuracy (used for NavSatFix covariance computation) +# SIM7600A-H spec: ±2.5m CEP (circular error probable) at HDOP=1 +gps_accuracy: 2.5 # metres CEP at HDOP=1 + +# ── Cellular manager ─────────────────────────────────────────────────────────── +# AT commands are polled to monitor signal quality and registration status. +# The data connection itself (4G LTE) is managed by the OS (NetworkManager or pppd). + +poll_rate: 0.2 # Hz (poll every 5 seconds — avoid AT flooding) +at_timeout: 2.0 # Seconds per AT command + +# Data connection management +# "nmcli": NetworkManager profile (recommended for Orin Nano running Ubuntu) +# "pppd": pppd service unit (systemctl restart pppd-cellular) +# "none": monitoring only, no auto-reconnect +connection_method: nmcli +nmcli_profile: saltybot-cellular # nmcli connection profile name +apn: "" # APN (set if NetworkManager needs it) +reconnect_interval: 30.0 # Seconds between reconnect attempts + +# ── MQTT bridge ──────────────────────────────────────────────────────────────── +# Relay telemetry to a remote MQTT broker over the cellular link. +# For home-lab deployment: run mosquitto on the Jetson or a cloud VM. +# For production: use TLS + authentication. + +mqtt_broker: mqtt.saltylab.local # Broker hostname or IP +mqtt_port: 1883 # 1883 = plain, 8883 = TLS +mqtt_user: "" # Leave empty to disable auth +mqtt_password: "" +topic_prefix: saltybot # MQTT topic prefix (saltybot/gps/fix, etc.) +client_id: saltybot-jetson +keepalive: 60 # MQTT keepalive interval (seconds) +reconnect_delay: 5.0 # Seconds between MQTT reconnect attempts + +# TLS (set mqtt_port: 8883 and provide CA cert path) +tls_enable: false +tls_ca_cert: "" # e.g. /etc/ssl/certs/mqtt-ca.crt + +# Per-topic outbound rate limits (Hz). Keeps cellular bandwidth < 50 KB/s. +# At 1 Hz per topic × ~100 bytes/msg × 6 topics ≈ 600 bytes/s = 0.6 KB/s +imu_rate: 5.0 # Hz — /saltybot/imu → saltybot/imu +gps_rate: 1.0 # Hz — /gps/fix, /gps/vel +person_rate: 2.0 # Hz — /person/target +uwb_rate: 1.0 # Hz — /uwb/ranges +cellular_rate: 0.2 # Hz — /cellular/status (once per 5s) diff --git a/jetson/ros2_ws/src/saltybot_cellular/launch/cellular.launch.py b/jetson/ros2_ws/src/saltybot_cellular/launch/cellular.launch.py new file mode 100644 index 0000000..9454a8d --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_cellular/launch/cellular.launch.py @@ -0,0 +1,194 @@ +""" +cellular.launch.py — Launch all saltybot_cellular nodes. + +Starts three nodes: + gps_driver_node — NMEA GPS → /gps/fix, /gps/vel + cellular_manager_node — AT modem monitor → /cellular/status, /cellular/rssi + mqtt_bridge_node — ROS2 ↔ MQTT relay over cellular + +Prerequisites: + 1. SIM7600X HAT connected via USB → /dev/ttyUSB0-2 must exist + 2. Nano-SIM with active data plan inserted + 3. NetworkManager profile "saltybot-cellular" configured (for nmcli method): + nmcli connection add type gsm ifname '*' con-name saltybot-cellular apn + +Usage: + # Defaults (all params from cellular_params.yaml): + ros2 launch saltybot_cellular cellular.launch.py + + # GPS only (skip modem + MQTT): + ros2 launch saltybot_cellular cellular.launch.py \ + launch_cellular_manager:=false launch_mqtt_bridge:=false + + # Custom MQTT broker: + ros2 launch saltybot_cellular cellular.launch.py mqtt_broker:=192.168.1.100 + + # Override params file: + ros2 launch saltybot_cellular cellular.launch.py \ + params_file:=/path/to/my_cellular_params.yaml + +GPS topics: + /gps/fix (sensor_msgs/NavSatFix) + /gps/vel (geometry_msgs/TwistStamped) + +Cellular topics: + /cellular/status (diagnostic_msgs/DiagnosticArray) + /cellular/rssi (std_msgs/Int32) + /cellular/connected (std_msgs/Bool) + +MQTT relay (ROS2→MQTT): + saltybot/imu, saltybot/gps/fix, saltybot/gps/vel, saltybot/uwb/ranges, + saltybot/person, saltybot/cellular + +MQTT relay (MQTT→ROS2): + saltybot/cmd → /saltybot/cmd, saltybot/estop → /saltybot/estop +""" + +import os + +from ament_index_python.packages import get_package_share_directory +from launch import LaunchDescription +from launch.actions import DeclareLaunchArgument, OpaqueFunction +from launch.conditions import IfCondition +from launch.substitutions import LaunchConfiguration +from launch_ros.actions import Node + + +def _launch_nodes(context, *args, **kwargs): + params_file = LaunchConfiguration("params_file").perform(context) + + def _b(name): + return LaunchConfiguration(name).perform(context).lower() == "true" + + def _s(name): + return LaunchConfiguration(name).perform(context) + + def _f(name): + return float(LaunchConfiguration(name).perform(context)) + + def _i(name): + return int(LaunchConfiguration(name).perform(context)) + + gps_params = { + "gps_port": _s("gps_port"), + "at_port": _s("at_port"), + "baud_rate": _i("baud_rate"), + "publish_rate": _f("gps_publish_rate"), + "frame_id": _s("frame_id"), + "gps_init_enable": _b("gps_init_enable"), + "gps_init_cmd": _s("gps_init_cmd"), + "gps_accuracy": _f("gps_accuracy"), + } + + cellular_params = { + "at_port": _s("at_port"), + "baud_rate": _i("baud_rate"), + "poll_rate": _f("cellular_poll_rate"), + "connection_method": _s("connection_method"), + "nmcli_profile": _s("nmcli_profile"), + "apn": _s("apn"), + "reconnect_interval": _f("reconnect_interval"), + } + + mqtt_params = { + "mqtt_broker": _s("mqtt_broker"), + "mqtt_port": _i("mqtt_port"), + "mqtt_user": _s("mqtt_user"), + "mqtt_password": _s("mqtt_password"), + "topic_prefix": _s("topic_prefix"), + "tls_enable": _b("tls_enable"), + "tls_ca_cert": _s("tls_ca_cert"), + "imu_rate": _f("imu_rate"), + "gps_rate": _f("gps_rate"), + "person_rate": _f("person_rate"), + "uwb_rate": _f("uwb_rate"), + } + + def _node_params(inline): + return [params_file, inline] if params_file else [inline] + + nodes = [ + Node( + package="saltybot_cellular", + executable="gps_driver_node", + name="gps_driver", + output="screen", + parameters=_node_params(gps_params), + ), + ] + + if _b("launch_cellular_manager"): + nodes.append(Node( + package="saltybot_cellular", + executable="cellular_manager_node", + name="cellular_manager", + output="screen", + parameters=_node_params(cellular_params), + )) + + if _b("launch_mqtt_bridge"): + nodes.append(Node( + package="saltybot_cellular", + executable="mqtt_bridge_node", + name="mqtt_bridge", + output="screen", + parameters=_node_params(mqtt_params), + )) + + return nodes + + +def generate_launch_description(): + pkg_share = get_package_share_directory("saltybot_cellular") + default_params = os.path.join(pkg_share, "config", "cellular_params.yaml") + + return LaunchDescription([ + DeclareLaunchArgument( + "params_file", default_value=default_params, + description="Full path to cellular_params.yaml (overrides inline args)"), + + # Node enable/disable + DeclareLaunchArgument( + "launch_cellular_manager", default_value="true", + description="Launch cellular_manager_node"), + DeclareLaunchArgument( + "launch_mqtt_bridge", default_value="true", + description="Launch mqtt_bridge_node"), + + # GPS driver + DeclareLaunchArgument("gps_port", default_value="/dev/ttyUSB2"), + DeclareLaunchArgument("at_port", default_value="/dev/ttyUSB0"), + DeclareLaunchArgument("baud_rate", default_value="115200"), + DeclareLaunchArgument("gps_publish_rate", default_value="1.0", + description="GPS publish rate (Hz)"), + DeclareLaunchArgument("frame_id", default_value="gps"), + DeclareLaunchArgument("gps_init_enable", default_value="true", + description="Send AT+CGPS=1 on startup"), + DeclareLaunchArgument("gps_init_cmd", default_value="AT+CGPS=1"), + DeclareLaunchArgument("gps_accuracy", default_value="2.5", + description="GPS CEP accuracy at HDOP=1 (m)"), + + # Cellular manager + DeclareLaunchArgument("cellular_poll_rate", default_value="0.2", + description="AT poll rate (Hz)"), + DeclareLaunchArgument("connection_method", default_value="nmcli", + description="nmcli | pppd | none"), + DeclareLaunchArgument("nmcli_profile", default_value="saltybot-cellular"), + DeclareLaunchArgument("apn", default_value=""), + DeclareLaunchArgument("reconnect_interval", default_value="30.0"), + + # MQTT bridge + DeclareLaunchArgument("mqtt_broker", default_value="mqtt.saltylab.local"), + DeclareLaunchArgument("mqtt_port", default_value="1883"), + DeclareLaunchArgument("mqtt_user", default_value=""), + DeclareLaunchArgument("mqtt_password", default_value=""), + DeclareLaunchArgument("topic_prefix", default_value="saltybot"), + DeclareLaunchArgument("tls_enable", default_value="false"), + DeclareLaunchArgument("tls_ca_cert", default_value=""), + DeclareLaunchArgument("imu_rate", default_value="5.0"), + DeclareLaunchArgument("gps_rate", default_value="1.0"), + DeclareLaunchArgument("person_rate", default_value="2.0"), + DeclareLaunchArgument("uwb_rate", default_value="1.0"), + + OpaqueFunction(function=_launch_nodes), + ]) diff --git a/jetson/ros2_ws/src/saltybot_cellular/package.xml b/jetson/ros2_ws/src/saltybot_cellular/package.xml new file mode 100644 index 0000000..f939520 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_cellular/package.xml @@ -0,0 +1,31 @@ + + + + saltybot_cellular + 0.1.0 + + Waveshare SIM7600X 4G HAT driver for SaltyBot. + Provides GPS (NMEA → NavSatFix), cellular modem monitoring (AT commands), + and MQTT telemetry relay over 4G LTE. + Hardware: SIMCom SIM7600A-H, LTE Cat-4, GPS+GLONASS+BeiDou+Galileo. + + sl-controls + MIT + + rclpy + sensor_msgs + geometry_msgs + std_msgs + diagnostic_msgs + + ament_python + + ament_copyright + ament_flake8 + ament_pep257 + python3-pytest + + + ament_python + + diff --git a/jetson/ros2_ws/src/saltybot_cellular/resource/saltybot_cellular b/jetson/ros2_ws/src/saltybot_cellular/resource/saltybot_cellular new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_cellular/saltybot_cellular/__init__.py b/jetson/ros2_ws/src/saltybot_cellular/saltybot_cellular/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_cellular/saltybot_cellular/cellular_manager_node.py b/jetson/ros2_ws/src/saltybot_cellular/saltybot_cellular/cellular_manager_node.py new file mode 100644 index 0000000..acf0419 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_cellular/saltybot_cellular/cellular_manager_node.py @@ -0,0 +1,362 @@ +""" +cellular_manager_node.py — SIM7600X 4G modem status monitor + connection manager. + +Hardware: Waveshare SIM7600X 4G HAT (SIMCom SIM7600A-H) + AT command port: /dev/ttyUSB0 (115200 baud) + Data connection managed by NetworkManager (nmcli) or pppd + +Publishes: + /cellular/status (diagnostic_msgs/DiagnosticArray) — rssi, network, connected + /cellular/rssi (std_msgs/Int32) — signal strength dBm (-113..0) + /cellular/connected (std_msgs/Bool) — data connection active + +Key AT commands used: + AT+CSQ → signal quality (rssi 0-31, ber) + AT+CREG? → network registration status + AT+COPS? → operator name + AT+CGDCONT? → PDP context / APN + +4G data connection management: + connection_method = "nmcli": uses `nmcli connection up ` + connection_method = "pppd": uses pppd with provided dial script + connection_method = "none": monitor only, no auto-connect +""" + +import subprocess +import threading +import time + +import rclpy +from rclpy.node import Node +from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue +from std_msgs.msg import Bool, Int32 + +try: + import serial + _SERIAL_OK = True +except ImportError: + _SERIAL_OK = False + + +# ── Pure AT response parsing functions ──────────────────────────────────────── + +CREG_STATUS = { + "0": "not_registered", + "1": "registered_home", + "2": "searching", + "3": "denied", + "4": "unknown", + "5": "roaming", +} + + +def parse_csq(response: str) -> dict: + """ + Parse AT+CSQ response. + + Expected response: '+CSQ: ,' + rssi: 0-31 → -113 + 2*rssi dBm; 99 = unknown + ber: 0-7 (bit error rate); 99 = unknown + + Returns dict with rssi_raw, rssi_dbm, ber, valid. + """ + for line in response.splitlines(): + line = line.strip() + if not line.startswith("+CSQ:"): + continue + parts = line[5:].strip().split(",") + if len(parts) < 2: + continue + try: + rssi_raw = int(parts[0].strip()) + ber = int(parts[1].strip()) + except ValueError: + continue + if rssi_raw == 99: + return {"rssi_raw": 99, "rssi_dbm": None, "ber": ber, "valid": False} + return { + "rssi_raw": rssi_raw, + "rssi_dbm": -113 + 2 * rssi_raw, + "ber": ber, + "valid": True, + } + return {"rssi_raw": 99, "rssi_dbm": None, "ber": 99, "valid": False} + + +def parse_creg(response: str) -> dict: + """ + Parse AT+CREG? response. + + Expected: '+CREG: ,' or '+CREG: ' + Returns dict with stat_raw, status_str, registered. + """ + for line in response.splitlines(): + line = line.strip() + if not line.startswith("+CREG:"): + continue + payload = line[6:].strip() + parts = payload.split(",") + # Can be ',' or just '' + stat_str = parts[-1].strip() + stat_str = stat_str.split()[0] # strip trailing chars + status = CREG_STATUS.get(stat_str, "unknown") + return { + "stat_raw": stat_str, + "status_str": status, + "registered": stat_str in ("1", "5"), + } + return {"stat_raw": "?", "status_str": "unknown", "registered": False} + + +def parse_cops(response: str) -> str: + """ + Parse AT+COPS? response. + + Expected: '+COPS: ,,[,]' + Returns operator name string, or '' if not available. + """ + for line in response.splitlines(): + line = line.strip() + if not line.startswith("+COPS:"): + continue + parts = line[6:].strip().split(",") + if len(parts) >= 3: + return parts[2].strip().strip('"') + return "" + + +def rssi_to_quality_label(rssi_dbm) -> str: + """Human-readable signal quality label from RSSI dBm.""" + if rssi_dbm is None: + return "no_signal" + if rssi_dbm >= -70: + return "excellent" + if rssi_dbm >= -85: + return "good" + if rssi_dbm >= -100: + return "fair" + return "poor" + + +# ── AT command helper ───────────────────────────────────────────────────────── + +def _at_command(ser, cmd: str, timeout: float = 2.0) -> str: + """Send AT command, return response string (up to OK/ERROR).""" + ser.reset_input_buffer() + ser.write((cmd + "\r\n").encode("ascii")) + deadline = time.monotonic() + timeout + buf = "" + while time.monotonic() < deadline: + chunk = ser.read(ser.in_waiting or 1).decode("ascii", errors="replace") + buf += chunk + if "OK\r\n" in buf or "ERROR" in buf: + break + return buf + + +# ── ROS2 Node ───────────────────────────────────────────────────────────────── + +class CellularManagerNode(Node): + + def __init__(self): + super().__init__("cellular_manager") + + self.declare_parameter("at_port", "/dev/ttyUSB0") + self.declare_parameter("baud_rate", 115200) + self.declare_parameter("poll_rate", 0.2) # Hz (every 5s) + self.declare_parameter("at_timeout", 2.0) # s per AT command + self.declare_parameter("connection_method", "nmcli") # nmcli|pppd|none + self.declare_parameter("nmcli_profile", "saltybot-cellular") + self.declare_parameter("apn", "") + self.declare_parameter("reconnect_interval", 30.0) # s between reconnect tries + + p = {k: self.get_parameter(k).value for k in [ + "at_port", "baud_rate", "poll_rate", "at_timeout", + "connection_method", "nmcli_profile", "apn", "reconnect_interval", + ]} + self._p = p + + # State + self._csq = {"rssi_raw": 99, "rssi_dbm": None, "ber": 99, "valid": False} + self._creg = {"stat_raw": "?", "status_str": "unknown", "registered": False} + self._operator = "" + self._connected = False + self._last_reconnect = 0.0 + self._lock = threading.Lock() + + # Publishers + self._status_pub = self.create_publisher(DiagnosticArray, "/cellular/status", 10) + self._rssi_pub = self.create_publisher(Int32, "/cellular/rssi", 10) + self._conn_pub = self.create_publisher(Bool, "/cellular/connected", 10) + + if not _SERIAL_OK: + self.get_logger().error("pyserial not installed — cellular manager inactive") + return + + # Background AT poll thread + self._running = True + self._poll_thread = threading.Thread(target=self._poll_loop, daemon=True) + self._poll_thread.start() + + # Publish timer (same rate as poll) + self.create_timer(1.0 / p["poll_rate"], self._publish_cb) + + self.get_logger().info( + f"CellularManager ready at_port={p['at_port']} " + f"method={p['connection_method']}") + + # ── AT poll loop ────────────────────────────────────────────────────────── + + def _poll_loop(self): + interval = 1.0 / self._p["poll_rate"] + while self._running: + try: + with serial.Serial( + self._p["at_port"], self._p["baud_rate"], + timeout=self._p["at_timeout"] + 0.5) as ser: + self.get_logger().info( + f"AT port open: {self._p['at_port']}") + while self._running: + t0 = time.monotonic() + self._poll_once(ser) + elapsed = time.monotonic() - t0 + if elapsed < interval: + time.sleep(interval - elapsed) + except serial.SerialException as exc: + if self._running: + self.get_logger().warn( + f"AT serial error ({exc}) — retrying in 5s") + time.sleep(5.0) + + def _poll_once(self, ser): + """Query AT+CSQ, AT+CREG?, AT+COPS? and update state.""" + try: + csq_resp = _at_command(ser, "AT+CSQ", self._p["at_timeout"]) + creg_resp = _at_command(ser, "AT+CREG?", self._p["at_timeout"]) + cops_resp = _at_command(ser, "AT+COPS?", self._p["at_timeout"]) + except Exception as exc: + self.get_logger().warn(f"AT poll error: {exc}") + return + + csq = parse_csq(csq_resp) + creg = parse_creg(creg_resp) + operator = parse_cops(cops_resp) + connected = self._check_connectivity() + + with self._lock: + self._csq = csq + self._creg = creg + self._operator = operator + self._connected = connected + + # Auto-reconnect if data link is down + if not connected and self._p["connection_method"] != "none": + now = time.monotonic() + if now - self._last_reconnect > self._p["reconnect_interval"]: + self._last_reconnect = now + self._attempt_reconnect() + + # ── Connectivity check ──────────────────────────────────────────────────── + + def _check_connectivity(self) -> bool: + """Return True if cellular data connection is active.""" + method = self._p["connection_method"] + try: + if method == "nmcli": + result = subprocess.run( + ["nmcli", "-t", "-f", "GENERAL.STATE", + "con", "show", self._p["nmcli_profile"]], + capture_output=True, text=True, timeout=5.0) + return "activated" in result.stdout.lower() + if method == "pppd": + result = subprocess.run( + ["ip", "link", "show", "ppp0"], + capture_output=True, text=True, timeout=3.0) + return result.returncode == 0 and "UP" in result.stdout + except Exception: + pass + return False + + def _attempt_reconnect(self): + """Attempt to bring up cellular data connection.""" + method = self._p["connection_method"] + self.get_logger().info(f"Attempting cellular reconnect ({method})") + try: + if method == "nmcli": + subprocess.run( + ["nmcli", "connection", "up", self._p["nmcli_profile"]], + timeout=30.0, check=False) + elif method == "pppd": + subprocess.run( + ["systemctl", "restart", "pppd-cellular"], + timeout=15.0, check=False) + except Exception as exc: + self.get_logger().warn(f"Reconnect failed: {exc}") + + # ── Publish callback ────────────────────────────────────────────────────── + + def _publish_cb(self): + with self._lock: + csq = self._csq + creg = self._creg + operator = self._operator + connected = self._connected + + now = self.get_clock().now().to_msg() + + # DiagnosticArray — full status + ds = DiagnosticStatus() + ds.name = "Cellular" + ds.hardware_id = "SIM7600A-H" + ds.level = DiagnosticStatus.OK if connected else DiagnosticStatus.WARN + ds.message = ( + f"{'Connected' if connected else 'Disconnected'} | " + f"{operator or 'unknown'} | " + f"{creg['status_str']} | " + f"{rssi_to_quality_label(csq['rssi_dbm'])}" + ) + ds.values = [ + KeyValue(key="connected", value=str(connected)), + KeyValue(key="registration", value=creg["status_str"]), + KeyValue(key="operator", value=operator), + KeyValue(key="rssi_raw", value=str(csq["rssi_raw"])), + KeyValue(key="rssi_dbm", value=str(csq["rssi_dbm"] or "unknown")), + KeyValue(key="signal_quality", value=rssi_to_quality_label(csq["rssi_dbm"])), + KeyValue(key="ber", value=str(csq["ber"])), + ] + arr = DiagnosticArray() + arr.header.stamp = now + arr.status = [ds] + self._status_pub.publish(arr) + + # /cellular/rssi (Int32, dBm; 0 if unknown) + rssi_msg = Int32() + rssi_msg.data = csq["rssi_dbm"] if csq["rssi_dbm"] is not None else 0 + self._rssi_pub.publish(rssi_msg) + + # /cellular/connected (Bool) + conn_msg = Bool() + conn_msg.data = connected + self._conn_pub.publish(conn_msg) + + def destroy_node(self): + self._running = False + super().destroy_node() + + +# ── Entry point ─────────────────────────────────────────────────────────────── + +def main(args=None): + rclpy.init(args=args) + node = CellularManagerNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.try_shutdown() + + +if __name__ == "__main__": + main() diff --git a/jetson/ros2_ws/src/saltybot_cellular/saltybot_cellular/gps_driver_node.py b/jetson/ros2_ws/src/saltybot_cellular/saltybot_cellular/gps_driver_node.py new file mode 100644 index 0000000..18aaf58 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_cellular/saltybot_cellular/gps_driver_node.py @@ -0,0 +1,402 @@ +""" +gps_driver_node.py — SIM7600X GPS NMEA → ROS2 NavSatFix + TwistStamped. + +Hardware: Waveshare SIM7600X 4G HAT (SIMCom SIM7600A-H) + GNSS: GPS + GLONASS + BeiDou + Galileo, ±2.5m CEP + GPS serial port: /dev/ttyUSB2 (NMEA output, 115200 baud) + AT port: /dev/ttyUSB0 (used to start GNSS receiver: AT+CGPS=1) + +Publishes: + /gps/fix (sensor_msgs/NavSatFix) — position with covariance from HDOP + /gps/vel (geometry_msgs/TwistStamped) — ground speed + course (from NMEA RMC) + /diagnostics — fix status, satellites, HDOP + +Startup sequence: + 1. Optional: open AT port → send AT+CGPS=1 to start GNSS receiver + 2. Open GPS serial port → read NMEA sentences in background thread + 3. Publish latest GGA + RMC data at publish_rate Hz +""" + +import math +import threading +import time + +import rclpy +from rclpy.node import Node +from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue +from geometry_msgs.msg import TwistStamped +from sensor_msgs.msg import NavSatFix, NavSatStatus + +try: + import serial + _SERIAL_OK = True +except ImportError: + _SERIAL_OK = False + + +# ── Pure NMEA parsing functions ─────────────────────────────────────────────── +# Extracted here for unit tests — no ROS2 imports needed. + +GPS_CEP_ACCURACY = 2.5 # metres at HDOP=1 (SIM7600A-H spec) +KNOTS_TO_MS = 0.514444 + + +def nmea_checksum_valid(sentence: str) -> bool: + """Validate NMEA checksum. Sentence must include the leading '$'.""" + if '*' not in sentence: + return False + data, checksum_str = sentence[1:].rsplit('*', 1) + try: + expected = int(checksum_str.strip()[:2], 16) + except ValueError: + return False + computed = 0 + for ch in data: + computed ^= ord(ch) + return computed == expected + + +def nmea_dm_to_degrees(dm_str: str, direction: str) -> float: + """ + Convert NMEA DDMM.MMMM or DDDMM.MMMM to decimal degrees. + + NMEA format: degrees are everything up to the last two digits before the + decimal point (minutes field is always MM.fractional). + """ + if not dm_str: + return None + dot = dm_str.index('.') + deg = float(dm_str[:dot - 2]) + mins = float(dm_str[dot - 2:]) + decimal = deg + mins / 60.0 + if direction in ('S', 'W'): + decimal = -decimal + return decimal + + +def parse_gga(sentence: str) -> dict: + """ + Parse NMEA GGA sentence (any talker: GP/GN/GL/GA/GB). + + Returns dict with keys: + latitude, longitude, altitude, fix_quality, num_satellites, hdop, utc_time + Returns None on parse error or no-fix (fix_quality == 0). + + GGA field indices (0-based after splitting by comma): + 0 Talker+GGA 1 UTC 2 Lat 3 N/S + 4 Lon 5 E/W 6 Fix 7 NumSV + 8 HDOP 9 AltMSL 10 M 11 GeoidSep + 12 M 13 Age 14 StaID*CS + """ + raw = sentence.strip() + if raw.startswith('$'): + raw = raw[1:] + if '*' in raw: + raw = raw[:raw.index('*')] + parts = raw.split(',') + if len(parts) < 10: + return None + if not parts[0].endswith('GGA'): + return None + try: + fix_quality = int(parts[6]) if parts[6] else 0 + except ValueError: + return None + if fix_quality == 0: + return None + lat = nmea_dm_to_degrees(parts[2], parts[3]) + lon = nmea_dm_to_degrees(parts[4], parts[5]) + if lat is None or lon is None: + return None + try: + num_sats = int(parts[7]) if parts[7] else 0 + hdop = float(parts[8]) if parts[8] else 99.0 + alt_msl = float(parts[9]) if parts[9] else 0.0 + geoid_sep = float(parts[11]) if len(parts) > 11 and parts[11] else 0.0 + except (ValueError, IndexError): + return None + return { + 'latitude': lat, + 'longitude': lon, + 'altitude': alt_msl + geoid_sep, # WGS84 ellipsoidal height + 'fix_quality': fix_quality, + 'num_satellites': num_sats, + 'hdop': hdop, + 'utc_time': parts[1], + } + + +def parse_rmc(sentence: str) -> dict: + """ + Parse NMEA RMC sentence (any talker). + + Returns dict with keys: + speed_mps, course_deg, valid, utc_time + Returns None on parse error or void status (status == 'V'). + + RMC field indices: + 0 Talker+RMC 1 UTC 2 Status(A/V) 3 Lat 4 N/S + 5 Lon 6 E/W 7 SpeedKnots 8 CourseDeg 9 Date + 10 MagVar 11 MagDir 12 Mode*CS + """ + raw = sentence.strip() + if raw.startswith('$'): + raw = raw[1:] + if '*' in raw: + raw = raw[:raw.index('*')] + parts = raw.split(',') + if len(parts) < 9: + return None + if not parts[0].endswith('RMC'): + return None + if parts[2] != 'A': # 'V' = void / no fix + return None + try: + speed_knots = float(parts[7]) if parts[7] else 0.0 + course_deg = float(parts[8]) if parts[8] else 0.0 + except ValueError: + return None + return { + 'speed_mps': speed_knots * KNOTS_TO_MS, + 'course_deg': course_deg, + 'valid': True, + 'utc_time': parts[1], + } + + +def hdop_to_covariance(hdop: float, accuracy_m: float = GPS_CEP_ACCURACY) -> list: + """ + Compute diagonal position covariance matrix (row-major 3×3) from HDOP. + + sigma_h = hdop * accuracy_m (horizontal 1σ ≈ CEP * 1.2) + sigma_v = sigma_h * 1.5 (vertical is typically 1.5× worse) + """ + sigma_h = hdop * accuracy_m + sigma_v = sigma_h * 1.5 + return [ + sigma_h**2, 0.0, 0.0, + 0.0, sigma_h**2, 0.0, + 0.0, 0.0, sigma_v**2, + ] + + +def fix_quality_to_status(fix_quality: int) -> int: + """Map NMEA GGA fix quality to NavSatStatus.status.""" + if fix_quality <= 0: + return NavSatStatus.STATUS_NO_FIX # -1 + if fix_quality == 2: + return NavSatStatus.STATUS_SBAS_FIX # 1 (DGPS) + if fix_quality in (4, 5): + return NavSatStatus.STATUS_GBAS_FIX # 2 (RTK) + return NavSatStatus.STATUS_FIX # 0 (standard GPS) + + +def course_to_velocity_enu(speed_mps: float, course_deg: float) -> tuple: + """ + Convert scalar ground speed + course-over-ground to ENU velocity components. + + Returns (v_east, v_north). + Course: 0° = North, 90° = East (compass convention). + """ + course_rad = math.radians(course_deg) + v_north = speed_mps * math.cos(course_rad) + v_east = speed_mps * math.sin(course_rad) + return v_east, v_north + + +# ── ROS2 Node ───────────────────────────────────────────────────────────────── + +class GpsDriverNode(Node): + + def __init__(self): + super().__init__("gps_driver") + + self.declare_parameter("gps_port", "/dev/ttyUSB2") + self.declare_parameter("at_port", "/dev/ttyUSB0") + self.declare_parameter("baud_rate", 115200) + self.declare_parameter("publish_rate", 1.0) + self.declare_parameter("frame_id", "gps") + self.declare_parameter("gps_init_enable", True) + self.declare_parameter("gps_init_cmd", "AT+CGPS=1") + self.declare_parameter("at_init_timeout", 3.0) + self.declare_parameter("gps_accuracy", GPS_CEP_ACCURACY) + + p = { + "gps_port": self.get_parameter("gps_port").value, + "at_port": self.get_parameter("at_port").value, + "baud_rate": self.get_parameter("baud_rate").value, + "publish_rate": self.get_parameter("publish_rate").value, + "frame_id": self.get_parameter("frame_id").value, + "gps_init_enable": self.get_parameter("gps_init_enable").value, + "gps_init_cmd": self.get_parameter("gps_init_cmd").value, + "at_init_timeout": self.get_parameter("at_init_timeout").value, + "gps_accuracy": self.get_parameter("gps_accuracy").value, + } + self._p = p + + # Publishers + self._fix_pub = self.create_publisher(NavSatFix, "/gps/fix", 10) + self._vel_pub = self.create_publisher(TwistStamped, "/gps/vel", 10) + self._diag_pub = self.create_publisher(DiagnosticArray, "/diagnostics", 10) + + # Latest parsed data (thread-safe via GIL for simple assignment) + self._gga = None # dict from parse_gga + self._rmc = None # dict from parse_rmc + self._ser = None + self._running = False + + if not _SERIAL_OK: + self.get_logger().error("pyserial not installed — GPS node inactive") + return + + # Optional: send AT+CGPS=1 to start GNSS + if p["gps_init_enable"]: + self._send_at_init() + + # Start background serial reader + self._running = True + self._reader_thread = threading.Thread( + target=self._reader_loop, daemon=True) + self._reader_thread.start() + + # Publish timer + self.create_timer(1.0 / p["publish_rate"], self._publish_cb) + + self.get_logger().info( + f"GpsDriver ready port={p['gps_port']} rate={p['publish_rate']}Hz") + + # ── AT init ─────────────────────────────────────────────────────────────── + + def _send_at_init(self): + """Open AT port, send GPS-start command, close.""" + try: + with serial.Serial( + self._p["at_port"], self._p["baud_rate"], + timeout=self._p["at_init_timeout"]) as at: + cmd = (self._p["gps_init_cmd"] + "\r\n").encode("ascii") + at.write(cmd) + resp = at.read_until(b"OK\r\n", size=256) + if b"OK" in resp: + self.get_logger().info("GPS init OK (AT+CGPS=1)") + else: + self.get_logger().warn( + f"GPS init: unexpected response: {resp!r}") + except serial.SerialException as exc: + self.get_logger().warn( + f"GPS AT init failed ({exc}) — assuming GPS already running") + + # ── Background NMEA reader ───────────────────────────────────────────────── + + def _reader_loop(self): + while self._running: + try: + with serial.Serial( + self._p["gps_port"], self._p["baud_rate"], + timeout=1.0) as ser: + self._ser = ser + self.get_logger().info( + f"GPS serial open: {self._p['gps_port']}") + while self._running: + line = ser.readline().decode("ascii", errors="replace").strip() + if not line: + continue + if not nmea_checksum_valid("$" + line.lstrip("$")): + continue + gga = parse_gga(line) + if gga: + self._gga = gga + rmc = parse_rmc(line) + if rmc: + self._rmc = rmc + except serial.SerialException as exc: + self._ser = None + if self._running: + self.get_logger().warn( + f"GPS serial error ({exc}) — retrying in 3s") + time.sleep(3.0) + except Exception as exc: + self.get_logger().error(f"GPS reader exception: {exc}") + time.sleep(3.0) + + # ── Publish callback ────────────────────────────────────────────────────── + + def _publish_cb(self): + now = self.get_clock().now().to_msg() + gga = self._gga + rmc = self._rmc + + # NavSatFix + fix_msg = NavSatFix() + fix_msg.header.stamp = now + fix_msg.header.frame_id = self._p["frame_id"] + if gga: + fix_msg.status.service = (NavSatStatus.SERVICE_GPS | + NavSatStatus.SERVICE_GLONASS) + fix_msg.status.status = fix_quality_to_status(gga["fix_quality"]) + fix_msg.latitude = gga["latitude"] + fix_msg.longitude = gga["longitude"] + fix_msg.altitude = gga["altitude"] + fix_msg.position_covariance = hdop_to_covariance( + gga["hdop"], self._p["gps_accuracy"]) + fix_msg.position_covariance_type = NavSatFix.COVARIANCE_TYPE_APPROXIMATED + else: + fix_msg.status.status = NavSatStatus.STATUS_NO_FIX + fix_msg.position_covariance_type = NavSatFix.COVARIANCE_TYPE_UNKNOWN + self._fix_pub.publish(fix_msg) + + # TwistStamped (ground velocity) + if rmc: + vel_msg = TwistStamped() + vel_msg.header.stamp = now + vel_msg.header.frame_id = "odom" # ENU velocity in odom frame + v_east, v_north = course_to_velocity_enu( + rmc["speed_mps"], rmc["course_deg"]) + vel_msg.twist.linear.x = v_east # East + vel_msg.twist.linear.y = v_north # North + vel_msg.twist.angular.z = math.radians(rmc["course_deg"]) # heading + self._vel_pub.publish(vel_msg) + + # Diagnostics + diag_arr = DiagnosticArray() + diag_arr.header.stamp = now + ds = DiagnosticStatus() + ds.name = "GPS" + ds.hardware_id = "SIM7600A-H" + if gga: + ds.level = DiagnosticStatus.OK + ds.message = f"Fix quality {gga['fix_quality']}, {gga['num_satellites']} sats" + ds.values = [ + KeyValue(key="fix_quality", value=str(gga["fix_quality"])), + KeyValue(key="satellites", value=str(gga["num_satellites"])), + KeyValue(key="hdop", value=f"{gga['hdop']:.1f}"), + KeyValue(key="latitude", value=f"{gga['latitude']:.6f}"), + KeyValue(key="longitude", value=f"{gga['longitude']:.6f}"), + KeyValue(key="altitude_m", value=f"{gga['altitude']:.1f}"), + ] + else: + ds.level = DiagnosticStatus.WARN + ds.message = "No GPS fix" + diag_arr.status = [ds] + self._diag_pub.publish(diag_arr) + + def destroy_node(self): + self._running = False + super().destroy_node() + + +# ── Entry point ─────────────────────────────────────────────────────────────── + +def main(args=None): + rclpy.init(args=args) + node = GpsDriverNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.try_shutdown() + + +if __name__ == "__main__": + main() diff --git a/jetson/ros2_ws/src/saltybot_cellular/saltybot_cellular/mqtt_bridge_node.py b/jetson/ros2_ws/src/saltybot_cellular/saltybot_cellular/mqtt_bridge_node.py new file mode 100644 index 0000000..e93bcd1 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_cellular/saltybot_cellular/mqtt_bridge_node.py @@ -0,0 +1,317 @@ +""" +mqtt_bridge_node.py — ROS2 ↔ MQTT bridge for cellular telemetry relay. + +Forwards key ROS2 topics to a remote MQTT broker over the cellular link, +and receives remote commands from MQTT, republishing them on ROS2. + +Bandwidth budget: < 50 KB/s total over cellular. +At 1 Hz all topics: ~1–2 KB/s (well within budget). + +ROS2 → MQTT (telemetry, QoS 0): + /saltybot/imu → /imu (sensor_msgs/Imu) + /gps/fix → /gps/fix (sensor_msgs/NavSatFix) + /gps/vel → /gps/vel (geometry_msgs/TwistStamped) + /uwb/ranges → /uwb/ranges (std_msgs/String JSON) + /person/target → /person (geometry_msgs/PoseStamped) + /cellular/status → /cellular (diagnostic_msgs pass-through) + +MQTT → ROS2 (commands, QoS 1): + /cmd → /saltybot/cmd (std_msgs/String) + /estop → /saltybot/estop (std_msgs/Bool) + +MQTT topic naming: + prefix default: "saltybot" + e.g. saltybot/gps/fix + +Bandwidth throttling: + Each topic has a configurable max publish rate. + Default telemetry at 1 Hz, /saltybot/imu at 5 Hz (still manageable). + +TLS support: + Set tls_enable: true and provide tls_ca_cert path for encrypted connection. +""" + +import json +import math +import threading +import time + +import rclpy +from rclpy.node import Node +from diagnostic_msgs.msg import DiagnosticArray +from geometry_msgs.msg import PoseStamped, TwistStamped +from sensor_msgs.msg import Imu, NavSatFix +from std_msgs.msg import Bool, String + +try: + import paho.mqtt.client as mqtt + _MQTT_OK = True +except ImportError: + _MQTT_OK = False + + +# ── Pure serialisation functions ────────────────────────────────────────────── +# Extracted for unit tests — no ROS2 runtime needed. + +def imu_to_dict(msg) -> dict: + """Serialize sensor_msgs/Imu to bandwidth-minimal dict.""" + return { + "ax": round(msg.linear_acceleration.x, 3), + "ay": round(msg.linear_acceleration.y, 3), + "az": round(msg.linear_acceleration.z, 3), + "gx": round(msg.angular_velocity.x, 4), + "gy": round(msg.angular_velocity.y, 4), + "gz": round(msg.angular_velocity.z, 4), + "t": msg.header.stamp.sec, + } + + +def navsatfix_to_dict(msg) -> dict: + """Serialize sensor_msgs/NavSatFix to dict.""" + return { + "lat": round(msg.latitude, 7), + "lon": round(msg.longitude, 7), + "alt": round(msg.altitude, 2), + "stat": int(msg.status.status), + "t": msg.header.stamp.sec, + } + + +def gps_vel_to_dict(msg) -> dict: + """Serialize geometry_msgs/TwistStamped (GPS velocity) to dict.""" + speed = math.sqrt( + msg.twist.linear.x ** 2 + msg.twist.linear.y ** 2) + heading_deg = math.degrees(msg.twist.angular.z) % 360.0 + return { + "spd": round(speed, 2), + "hdg": round(heading_deg, 1), + "t": msg.header.stamp.sec, + } + + +def pose_to_dict(msg) -> dict: + """Serialize geometry_msgs/PoseStamped to dict.""" + return { + "x": round(msg.pose.position.x, 3), + "y": round(msg.pose.position.y, 3), + "z": round(msg.pose.position.z, 3), + "t": msg.header.stamp.sec, + } + + +# ── Rate limiter helper ─────────────────────────────────────────────────────── + +class _RateLimiter: + """True if enough time has elapsed since last allowed call.""" + def __init__(self, rate_hz: float): + self._interval = 1.0 / rate_hz if rate_hz > 0 else 0.0 + self._last = 0.0 + + def allow(self) -> bool: + now = time.monotonic() + if now - self._last >= self._interval: + self._last = now + return True + return False + + +# ── ROS2 Node ───────────────────────────────────────────────────────────────── + +class MqttBridgeNode(Node): + + def __init__(self): + super().__init__("mqtt_bridge") + + # ── Parameters ──────────────────────────────────────────────────────── + self.declare_parameter("mqtt_broker", "mqtt.saltylab.local") + self.declare_parameter("mqtt_port", 1883) + self.declare_parameter("mqtt_user", "") + self.declare_parameter("mqtt_password", "") + self.declare_parameter("topic_prefix", "saltybot") + self.declare_parameter("client_id", "saltybot-jetson") + self.declare_parameter("tls_enable", False) + self.declare_parameter("tls_ca_cert", "") + self.declare_parameter("keepalive", 60) + self.declare_parameter("reconnect_delay", 5.0) + # Per-topic rate limits (Hz, 0 = unlimited but still bounded by ROS2 topic rate) + self.declare_parameter("imu_rate", 5.0) + self.declare_parameter("gps_rate", 1.0) + self.declare_parameter("person_rate", 2.0) + self.declare_parameter("uwb_rate", 1.0) + self.declare_parameter("cellular_rate", 0.2) + + p = {k: self.get_parameter(k).value for k in [ + "mqtt_broker", "mqtt_port", "mqtt_user", "mqtt_password", + "topic_prefix", "client_id", "tls_enable", "tls_ca_cert", + "keepalive", "reconnect_delay", + "imu_rate", "gps_rate", "person_rate", "uwb_rate", "cellular_rate", + ]} + self._p = p + self._prefix = p["topic_prefix"].rstrip("/") + self._client = None + self._connected = False + self._lock = threading.Lock() + + # Rate limiters per outbound topic + self._rl = { + "imu": _RateLimiter(p["imu_rate"]), + "gps": _RateLimiter(p["gps_rate"]), + "person": _RateLimiter(p["person_rate"]), + "uwb": _RateLimiter(p["uwb_rate"]), + "cellular": _RateLimiter(p["cellular_rate"]), + } + + if not _MQTT_OK: + self.get_logger().error( + "paho-mqtt not installed (pip install paho-mqtt) — bridge inactive") + return + + # ── MQTT client setup ────────────────────────────────────────────────── + self._client = mqtt.Client(client_id=p["client_id"]) + if p["mqtt_user"]: + self._client.username_pw_set(p["mqtt_user"], p["mqtt_password"]) + if p["tls_enable"] and p["tls_ca_cert"]: + self._client.tls_set(ca_certs=p["tls_ca_cert"]) + self._client.on_connect = self._on_connect + self._client.on_disconnect = self._on_disconnect + self._client.on_message = self._on_mqtt_message + + self._client.reconnect_delay_set( + min_delay=1, max_delay=int(p["reconnect_delay"])) + + # Start MQTT loop in background thread + self._client.connect_async(p["mqtt_broker"], p["mqtt_port"], p["keepalive"]) + self._client.loop_start() + + # ── ROS2 subscriptions (telemetry out) ───────────────────────────────── + self.create_subscription(Imu, "/saltybot/imu", self._imu_cb, 10) + self.create_subscription(NavSatFix, "/gps/fix", self._gps_fix_cb, 10) + self.create_subscription(TwistStamped, "/gps/vel", self._gps_vel_cb, 10) + self.create_subscription(String, "/uwb/ranges", self._uwb_cb, 10) + self.create_subscription(PoseStamped, "/person/target", self._person_cb, 10) + self.create_subscription(DiagnosticArray, "/cellular/status", self._cellular_cb, 5) + + # ── ROS2 publishers (commands in) ────────────────────────────────────── + self._cmd_pub = self.create_publisher(String, "/saltybot/cmd", 10) + self._estop_pub = self.create_publisher(Bool, "/saltybot/estop", 10) + + self.get_logger().info( + f"MqttBridge ready broker={p['mqtt_broker']}:{p['mqtt_port']} " + f"prefix={self._prefix}") + + # ── MQTT callbacks ──────────────────────────────────────────────────────── + + def _on_connect(self, client, userdata, flags, rc): + if rc == 0: + self._connected = True + self.get_logger().info( + f"MQTT connected to {self._p['mqtt_broker']}") + # Subscribe to inbound command topics (QoS 1) + client.subscribe(f"{self._prefix}/cmd", qos=1) + client.subscribe(f"{self._prefix}/estop", qos=1) + else: + self.get_logger().warn(f"MQTT connect failed rc={rc}") + + def _on_disconnect(self, client, userdata, rc): + self._connected = False + if rc != 0: + self.get_logger().warn(f"MQTT disconnected unexpectedly (rc={rc}), reconnecting") + + def _on_mqtt_message(self, client, userdata, msg): + """Route inbound MQTT commands to ROS2 topics.""" + try: + topic = msg.topic + payload = msg.payload.decode("utf-8", errors="replace") + suffix = topic[len(self._prefix):].lstrip("/") + + if suffix == "cmd": + ros_msg = String() + ros_msg.data = payload + self._cmd_pub.publish(ros_msg) + + elif suffix == "estop": + try: + data = json.loads(payload) + val = bool(data.get("estop", False)) + except (json.JSONDecodeError, AttributeError): + val = payload.strip().lower() in ("true", "1", "yes") + ros_msg = Bool() + ros_msg.data = val + self._estop_pub.publish(ros_msg) + if val: + self.get_logger().warn("ESTOP received via MQTT") + + except Exception as exc: + self.get_logger().error(f"MQTT message error: {exc}") + + # ── Publish helper ──────────────────────────────────────────────────────── + + def _mqtt_publish(self, subtopic: str, payload: dict, qos: int = 0): + """Serialize payload to JSON and publish to MQTT (QoS 0 by default).""" + if not self._connected or self._client is None: + return + try: + self._client.publish( + f"{self._prefix}/{subtopic}", + json.dumps(payload, separators=(",", ":")), + qos=qos, + ) + except Exception as exc: + self.get_logger().warn(f"MQTT publish error: {exc}") + + # ── ROS2 topic callbacks (telemetry out) ────────────────────────────────── + + def _imu_cb(self, msg): + if self._rl["imu"].allow(): + self._mqtt_publish("imu", imu_to_dict(msg)) + + def _gps_fix_cb(self, msg): + if self._rl["gps"].allow(): + self._mqtt_publish("gps/fix", navsatfix_to_dict(msg)) + + def _gps_vel_cb(self, msg): + if self._rl["gps"].allow(): + self._mqtt_publish("gps/vel", gps_vel_to_dict(msg)) + + def _uwb_cb(self, msg): + if self._rl["uwb"].allow(): + try: + data = json.loads(msg.data) + except json.JSONDecodeError: + data = {"raw": msg.data} + self._mqtt_publish("uwb/ranges", data) + + def _person_cb(self, msg): + if self._rl["person"].allow(): + self._mqtt_publish("person", pose_to_dict(msg)) + + def _cellular_cb(self, msg): + if self._rl["cellular"].allow() and msg.status: + ds = msg.status[0] + d = {kv.key: kv.value for kv in ds.values} + d["level"] = ds.level + self._mqtt_publish("cellular", d) + + def destroy_node(self): + if self._client: + self._client.loop_stop() + self._client.disconnect() + super().destroy_node() + + +# ── Entry point ─────────────────────────────────────────────────────────────── + +def main(args=None): + rclpy.init(args=args) + node = MqttBridgeNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.try_shutdown() + + +if __name__ == "__main__": + main() diff --git a/jetson/ros2_ws/src/saltybot_cellular/setup.cfg b/jetson/ros2_ws/src/saltybot_cellular/setup.cfg new file mode 100644 index 0000000..27f79c2 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_cellular/setup.cfg @@ -0,0 +1,4 @@ +[develop] +script_dir=$base/lib/saltybot_cellular +[install] +install_scripts=$base/lib/saltybot_cellular diff --git a/jetson/ros2_ws/src/saltybot_cellular/setup.py b/jetson/ros2_ws/src/saltybot_cellular/setup.py new file mode 100644 index 0000000..ac8f43e --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_cellular/setup.py @@ -0,0 +1,32 @@ +from setuptools import setup + +package_name = "saltybot_cellular" + +setup( + name=package_name, + version="0.1.0", + packages=[package_name], + data_files=[ + ("share/ament_index/resource_index/packages", [f"resource/{package_name}"]), + (f"share/{package_name}", ["package.xml"]), + (f"share/{package_name}/launch", ["launch/cellular.launch.py"]), + (f"share/{package_name}/config", ["config/cellular_params.yaml"]), + ], + install_requires=["setuptools", "pyserial"], + zip_safe=True, + maintainer="sl-controls", + maintainer_email="sl-controls@saltylab.local", + description="SIM7600X 4G HAT driver: GPS NavSatFix + cellular modem + MQTT bridge", + license="MIT", + tests_require=["pytest"], + entry_points={ + "console_scripts": [ + # NMEA GPS → /gps/fix, /gps/vel + "gps_driver_node = saltybot_cellular.gps_driver_node:main", + # AT modem monitor → /cellular/status, /cellular/rssi, /cellular/connected + "cellular_manager_node = saltybot_cellular.cellular_manager_node:main", + # ROS2 ↔ MQTT telemetry relay + "mqtt_bridge_node = saltybot_cellular.mqtt_bridge_node:main", + ], + }, +) diff --git a/jetson/ros2_ws/src/saltybot_cellular/test/test_cellular.py b/jetson/ros2_ws/src/saltybot_cellular/test/test_cellular.py new file mode 100644 index 0000000..0e018c9 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_cellular/test/test_cellular.py @@ -0,0 +1,514 @@ +""" +Unit tests for saltybot_cellular node logic. +No ROS2 runtime required — tests pure functions mirrored from the nodes. +Run with: pytest jetson/ros2_ws/src/saltybot_cellular/test/test_cellular.py +""" + +import math +import pytest + + +# ── Pure function mirrors — GPS ─────────────────────────────────────────────── + +KNOTS_TO_MS = 0.514444 +GPS_ACCURACY = 2.5 + + +def _nmea_checksum_valid(sentence): + if '*' not in sentence: + return False + data, cs_str = sentence[1:].rsplit('*', 1) + try: + expected = int(cs_str.strip()[:2], 16) + except ValueError: + return False + computed = 0 + for ch in data: + computed ^= ord(ch) + return computed == expected + + +def _nmea_dm_to_degrees(dm_str, direction): + if not dm_str: + return None + dot = dm_str.index('.') + deg = float(dm_str[:dot - 2]) + mins = float(dm_str[dot - 2:]) + dec = deg + mins / 60.0 + if direction in ('S', 'W'): + dec = -dec + return dec + + +def _parse_gga(sentence): + raw = sentence.strip() + if raw.startswith('$'): + raw = raw[1:] + if '*' in raw: + raw = raw[:raw.index('*')] + parts = raw.split(',') + if len(parts) < 10 or not parts[0].endswith('GGA'): + return None + try: + fix_quality = int(parts[6]) if parts[6] else 0 + except ValueError: + return None + if fix_quality == 0: + return None + lat = _nmea_dm_to_degrees(parts[2], parts[3]) + lon = _nmea_dm_to_degrees(parts[4], parts[5]) + if lat is None or lon is None: + return None + try: + num_sats = int(parts[7]) if parts[7] else 0 + hdop = float(parts[8]) if parts[8] else 99.0 + alt_msl = float(parts[9]) if parts[9] else 0.0 + geoid_sep = float(parts[11]) if len(parts) > 11 and parts[11] else 0.0 + except (ValueError, IndexError): + return None + return { + 'latitude': lat, + 'longitude': lon, + 'altitude': alt_msl + geoid_sep, + 'fix_quality': fix_quality, + 'num_satellites': num_sats, + 'hdop': hdop, + 'utc_time': parts[1], + } + + +def _parse_rmc(sentence): + raw = sentence.strip() + if raw.startswith('$'): + raw = raw[1:] + if '*' in raw: + raw = raw[:raw.index('*')] + parts = raw.split(',') + if len(parts) < 9 or not parts[0].endswith('RMC'): + return None + if parts[2] != 'A': + return None + try: + speed_mps = float(parts[7]) * KNOTS_TO_MS if parts[7] else 0.0 + course_deg = float(parts[8]) if parts[8] else 0.0 + except ValueError: + return None + return {'speed_mps': speed_mps, 'course_deg': course_deg, 'valid': True} + + +def _hdop_to_covariance(hdop, accuracy=GPS_ACCURACY): + sh = hdop * accuracy + sv = sh * 1.5 + return [sh**2, 0, 0, 0, sh**2, 0, 0, 0, sv**2] + + +def _course_to_enu(speed_mps, course_deg): + r = math.radians(course_deg) + return speed_mps * math.sin(r), speed_mps * math.cos(r) # vE, vN + + +# ── Pure function mirrors — AT parsing ─────────────────────────────────────── + +_CREG_STATUS = { + "0": "not_registered", "1": "registered_home", + "2": "searching", "3": "denied", + "4": "unknown", "5": "roaming", +} + + +def _parse_csq(response): + for line in response.splitlines(): + line = line.strip() + if not line.startswith("+CSQ:"): + continue + parts = line[5:].strip().split(",") + if len(parts) < 2: + continue + try: + rssi_raw = int(parts[0].strip()) + ber = int(parts[1].strip()) + except ValueError: + continue + if rssi_raw == 99: + return {"rssi_raw": 99, "rssi_dbm": None, "ber": ber, "valid": False} + return {"rssi_raw": rssi_raw, "rssi_dbm": -113 + 2 * rssi_raw, + "ber": ber, "valid": True} + return {"rssi_raw": 99, "rssi_dbm": None, "ber": 99, "valid": False} + + +def _parse_creg(response): + for line in response.splitlines(): + line = line.strip() + if not line.startswith("+CREG:"): + continue + parts = line[6:].strip().split(",") + stat_str = parts[-1].strip().split()[0] + return {"stat_raw": stat_str, + "status_str": _CREG_STATUS.get(stat_str, "unknown"), + "registered": stat_str in ("1", "5")} + return {"stat_raw": "?", "status_str": "unknown", "registered": False} + + +def _parse_cops(response): + for line in response.splitlines(): + if line.strip().startswith("+COPS:"): + parts = line.strip()[6:].strip().split(",") + if len(parts) >= 3: + return parts[2].strip().strip('"') + return "" + + +def _rssi_to_label(rssi_dbm): + if rssi_dbm is None: + return "no_signal" + if rssi_dbm >= -70: + return "excellent" + if rssi_dbm >= -85: + return "good" + if rssi_dbm >= -100: + return "fair" + return "poor" + + +# ── Pure function mirrors — MQTT serialisation ──────────────────────────────── + +def _imu_to_dict(ax, ay, az, gx, gy, gz, sec=0): + class Acc: + pass + class Msg: + pass + m = Msg() + m.linear_acceleration = Acc() + m.angular_velocity = Acc() + m.header = Acc() + m.header.stamp = Acc() + m.linear_acceleration.x = ax + m.linear_acceleration.y = ay + m.linear_acceleration.z = az + m.angular_velocity.x = gx + m.angular_velocity.y = gy + m.angular_velocity.z = gz + m.header.stamp.sec = sec + return { + "ax": round(ax, 3), "ay": round(ay, 3), "az": round(az, 3), + "gx": round(gx, 4), "gy": round(gy, 4), "gz": round(gz, 4), + "t": sec, + } + + +# ── NMEA checksum tests ─────────────────────────────────────────────────────── + +class TestNmeaChecksum: + def test_valid_gpgga(self): + # Valid GGA from standard NMEA example + s = "$GPGGA,123519,4807.038,N,01131.000,E,1,08,0.9,545.4,M,46.9,M,,*47" + assert _nmea_checksum_valid(s) is True + + def test_valid_gnrmc(self): + s = "$GNRMC,123519.00,A,4807.0380,N,01131.0000,E,0.100,0.00,010101,,,A*7F" + assert _nmea_checksum_valid(s) is True + + def test_invalid_checksum(self): + s = "$GPGGA,123519,4807.038,N,01131.000,E,1,08,0.9,545.4,M,46.9,M,,*FF" + assert _nmea_checksum_valid(s) is False + + def test_no_checksum(self): + assert _nmea_checksum_valid("$GPGGA,123519,4807.038") is False + + def test_empty_string(self): + assert _nmea_checksum_valid("") is False + + +# ── NMEA DM conversion tests ────────────────────────────────────────────────── + +class TestNmeaDm: + def test_north_hemisphere(self): + # 4807.038N = 48° 7.038' = 48.1173° + assert _nmea_dm_to_degrees("4807.038", "N") == pytest.approx(48.1173, abs=1e-4) + + def test_south_hemisphere_negative(self): + result = _nmea_dm_to_degrees("4807.038", "S") + assert result == pytest.approx(-48.1173, abs=1e-4) + + def test_east_longitude(self): + # 01131.000E = 11° 31.000' = 11.5167° + assert _nmea_dm_to_degrees("01131.000", "E") == pytest.approx(11.5167, abs=1e-4) + + def test_west_longitude_negative(self): + result = _nmea_dm_to_degrees("07937.600", "W") + assert result < 0 + + def test_zero(self): + assert _nmea_dm_to_degrees("0000.000", "N") == pytest.approx(0.0) + + def test_empty_string_none(self): + assert _nmea_dm_to_degrees("", "N") is None + + +# ── GGA parsing tests ───────────────────────────────────────────────────────── + +_GGA_VALID = "$GPGGA,123519,4807.038,N,01131.000,E,1,08,0.9,545.4,M,46.9,M,,*47" +_GGA_DGPS = "$GPGGA,123519,4807.038,N,01131.000,E,2,08,0.9,545.4,M,46.9,M,,*44" +_GGA_NOFIX = "$GPGGA,123519,,,,,0,00,99.9,,M,,M,,*71" +_GGA_GN = "$GNGGA,123519,4807.038,N,01131.000,E,1,12,0.7,545.4,M,46.9,M,,*4B" + + +class TestParseGga: + def test_valid_parse(self): + r = _parse_gga(_GGA_VALID) + assert r is not None + assert r["latitude"] == pytest.approx(48.1173, abs=1e-4) + assert r["longitude"] == pytest.approx(11.5167, abs=1e-4) + assert r["fix_quality"] == 1 + assert r["num_satellites"] == 8 + assert r["hdop"] == pytest.approx(0.9) + + def test_altitude_is_msl_plus_geoid(self): + r = _parse_gga(_GGA_VALID) + # 545.4 MSL + 46.9 geoid = 592.3m ellipsoidal + assert r["altitude"] == pytest.approx(592.3, abs=0.01) + + def test_no_fix_returns_none(self): + assert _parse_gga(_GGA_NOFIX) is None + + def test_dgps_fix_quality_2(self): + r = _parse_gga(_GGA_DGPS) + assert r is not None + assert r["fix_quality"] == 2 + + def test_gn_talker_id_accepted(self): + r = _parse_gga(_GGA_GN) + assert r is not None + assert r["num_satellites"] == 12 + + def test_non_gga_returns_none(self): + assert _parse_gga("$GPGSV,3,1,12,...*XX") is None + + def test_short_sentence_returns_none(self): + assert _parse_gga("$GPGGA,123519") is None + + def test_empty_returns_none(self): + assert _parse_gga("") is None + + def test_utc_time_field(self): + r = _parse_gga(_GGA_VALID) + assert r["utc_time"] == "123519" + + +# ── RMC parsing tests ───────────────────────────────────────────────────────── + +_RMC_VALID = "$GNRMC,123519.00,A,4807.038,N,01131.000,E,1.234,181.5,010101,,,A*XX" +_RMC_VOID = "$GNRMC,123519.00,V,,,,,,,010101,,,N*XX" + + +class TestParseRmc: + def test_valid_speed(self): + r = _parse_rmc(_RMC_VALID) + assert r is not None + assert r["speed_mps"] == pytest.approx(1.234 * KNOTS_TO_MS, rel=1e-3) + + def test_valid_course(self): + r = _parse_rmc(_RMC_VALID) + assert r["course_deg"] == pytest.approx(181.5, abs=0.01) + + def test_void_returns_none(self): + assert _parse_rmc(_RMC_VOID) is None + + def test_non_rmc_returns_none(self): + assert _parse_rmc(_GGA_VALID) is None + + def test_zero_speed(self): + rmc = "$GNRMC,000000.00,A,4807.038,N,01131.000,E,0.000,0.0,010101,,,A*XX" + r = _parse_rmc(rmc) + assert r is not None + assert r["speed_mps"] == pytest.approx(0.0) + + +# ── HDOP covariance tests ───────────────────────────────────────────────────── + +class TestHdopCovariance: + def test_hdop1_gives_accuracy_squared(self): + cov = _hdop_to_covariance(1.0) + assert cov[0] == pytest.approx(GPS_ACCURACY ** 2) # sigma_h^2 + assert cov[4] == pytest.approx(GPS_ACCURACY ** 2) + assert cov[8] == pytest.approx((GPS_ACCURACY * 1.5) ** 2) + + def test_hdop2_doubles_sigma(self): + cov1 = _hdop_to_covariance(1.0) + cov2 = _hdop_to_covariance(2.0) + # sigma doubles → variance quadruples + assert cov2[0] == pytest.approx(4.0 * cov1[0]) + + def test_off_diagonal_zero(self): + cov = _hdop_to_covariance(1.0) + assert cov[1] == pytest.approx(0.0) + assert cov[2] == pytest.approx(0.0) + assert cov[3] == pytest.approx(0.0) + + def test_vertical_worse_than_horizontal(self): + cov = _hdop_to_covariance(1.0) + assert cov[8] > cov[0] # sigma_v > sigma_h + + +# ── Course → ENU velocity tests ─────────────────────────────────────────────── + +class TestCourseToEnu: + def test_north_course_zero_easting(self): + vE, vN = _course_to_enu(1.0, 0.0) # 0° = due North + assert vE == pytest.approx(0.0, abs=1e-9) + assert vN == pytest.approx(1.0) + + def test_east_course_zero_northing(self): + vE, vN = _course_to_enu(1.0, 90.0) # 90° = due East + assert vE == pytest.approx(1.0) + assert vN == pytest.approx(0.0, abs=1e-9) + + def test_south_course_negative_northing(self): + vE, vN = _course_to_enu(1.0, 180.0) # 180° = South + assert vN == pytest.approx(-1.0) + + def test_speed_preserved(self): + speed = 3.5 + vE, vN = _course_to_enu(speed, 45.0) + assert math.sqrt(vE**2 + vN**2) == pytest.approx(speed) + + def test_zero_speed(self): + vE, vN = _course_to_enu(0.0, 90.0) + assert vE == pytest.approx(0.0) + assert vN == pytest.approx(0.0) + + +# ── AT+CSQ parsing tests ────────────────────────────────────────────────────── + +class TestParseCsq: + def test_nominal_rssi(self): + r = _parse_csq("+CSQ: 18,0\r\nOK\r\n") + assert r["valid"] is True + assert r["rssi_raw"] == 18 + assert r["rssi_dbm"] == -113 + 2 * 18 # = -77 dBm + assert r["ber"] == 0 + + def test_unknown_rssi_99(self): + r = _parse_csq("+CSQ: 99,99\r\nOK\r\n") + assert r["valid"] is False + assert r["rssi_dbm"] is None + + def test_min_rssi_0(self): + r = _parse_csq("+CSQ: 0,0\r\nOK\r\n") + assert r["rssi_dbm"] == -113 + + def test_max_rssi_31(self): + r = _parse_csq("+CSQ: 31,0\r\nOK\r\n") + assert r["rssi_dbm"] == -113 + 62 # = -51 dBm + + def test_empty_response(self): + r = _parse_csq("ERROR\r\n") + assert r["valid"] is False + + def test_csq_without_space(self): + # Some modems omit the space after +CSQ: + r = _parse_csq("+CSQ:20,0\r\nOK\r\n") + assert r["valid"] is True + assert r["rssi_raw"] == 20 + + +# ── AT+CREG? parsing tests ───────────────────────────────────────────────────── + +class TestParseCreg: + def test_registered_home(self): + r = _parse_creg("+CREG: 0,1\r\nOK\r\n") + assert r["registered"] is True + assert r["status_str"] == "registered_home" + + def test_roaming(self): + r = _parse_creg("+CREG: 0,5\r\nOK\r\n") + assert r["registered"] is True + assert r["status_str"] == "roaming" + + def test_not_registered(self): + r = _parse_creg("+CREG: 0,0\r\nOK\r\n") + assert r["registered"] is False + + def test_searching(self): + r = _parse_creg("+CREG: 0,2\r\nOK\r\n") + assert r["registered"] is False + assert r["status_str"] == "searching" + + def test_stat_only_format(self): + r = _parse_creg("+CREG: 1\r\nOK\r\n") + assert r["registered"] is True + + def test_empty_response(self): + r = _parse_creg("ERROR\r\n") + assert r["registered"] is False + + +# ── AT+COPS? parsing tests ───────────────────────────────────────────────────── + +class TestParseCops: + def test_operator_name(self): + r = _parse_cops('+COPS: 0,0,"Fido",7\r\nOK\r\n') + assert r == "Fido" + + def test_operator_with_spaces(self): + r = _parse_cops('+COPS: 0,0,"Rogers Wireless",7\r\nOK\r\n') + assert r == "Rogers Wireless" + + def test_no_operator(self): + r = _parse_cops("+COPS: 0\r\nOK\r\n") + assert r == "" + + def test_error_response(self): + r = _parse_cops("ERROR\r\n") + assert r == "" + + +# ── Signal quality label tests ──────────────────────────────────────────────── + +class TestRssiLabel: + def test_excellent(self): + assert _rssi_to_label(-65) == "excellent" + + def test_good(self): + assert _rssi_to_label(-80) == "good" + + def test_fair(self): + assert _rssi_to_label(-95) == "fair" + + def test_poor(self): + assert _rssi_to_label(-105) == "poor" + + def test_none_is_no_signal(self): + assert _rssi_to_label(None) == "no_signal" + + def test_boundary_good_excellent(self): + assert _rssi_to_label(-70) == "excellent" # -70 >= -70 → excellent + + def test_boundary_fair_good(self): + assert _rssi_to_label(-85) == "good" # -85 >= -85 → good + + +# ── MQTT serialisation tests ────────────────────────────────────────────────── + +class TestMqttSerialisation: + def test_imu_dict_keys(self): + d = _imu_to_dict(9.81, 0.1, -0.2, 0.01, -0.02, 0.0) + assert set(d.keys()) == {"ax", "ay", "az", "gx", "gy", "gz", "t"} + + def test_imu_precision(self): + d = _imu_to_dict(9.81234, 0.0, 0.0, 0.0, 0.0, 0.0) + assert d["ax"] == pytest.approx(9.812, abs=1e-3) + + def test_gps_fix_dict(self): + # Simple inline navsatfix + class S: pass + m = S(); m.status = S(); m.header = S(); m.header.stamp = S() + m.latitude = 48.1173; m.longitude = 11.5167; m.altitude = 592.3 + m.status.status = 0; m.header.stamp.sec = 1234 + d = { + "lat": round(m.latitude, 7), "lon": round(m.longitude, 7), + "alt": round(m.altitude, 2), "stat": int(m.status.status), + "t": m.header.stamp.sec, + } + assert d["lat"] == pytest.approx(48.1173, abs=1e-4) + assert d["alt"] == pytest.approx(592.3) -- 2.47.2