Merge pull request 'feat: SIM7600X 4G cellular + GPS (#58)' (#65) from sl-controls/cellular-gps into main
This commit is contained in:
commit
528034fe69
@ -0,0 +1,70 @@
|
|||||||
|
# cellular_params.yaml
|
||||||
|
# Configuration for saltybot_cellular package — SIM7600X 4G HAT.
|
||||||
|
#
|
||||||
|
# Run with:
|
||||||
|
# ros2 launch saltybot_cellular cellular.launch.py
|
||||||
|
#
|
||||||
|
# Hardware: Waveshare SIM7600X 4G HAT (SIMCom SIM7600A-H)
|
||||||
|
# /dev/ttyUSB0 — AT commands
|
||||||
|
# /dev/ttyUSB1 — PPP data (managed by NetworkManager/pppd, not ROS2)
|
||||||
|
# /dev/ttyUSB2 — NMEA GPS output
|
||||||
|
|
||||||
|
# ── GPS driver ─────────────────────────────────────────────────────────────────
|
||||||
|
# The GPS port streams NMEA sentences once AT+CGPS=1 is sent.
|
||||||
|
# If GPS is already running (e.g., from a previous session), set gps_init_enable: false.
|
||||||
|
|
||||||
|
gps_port: /dev/ttyUSB2 # NMEA output port
|
||||||
|
at_port: /dev/ttyUSB0 # AT command port (for CGPS=1 init)
|
||||||
|
baud_rate: 115200
|
||||||
|
publish_rate: 1.0 # Hz — GPS updates at 1 Hz max
|
||||||
|
frame_id: gps # Header frame_id for NavSatFix + TwistStamped
|
||||||
|
|
||||||
|
gps_init_enable: true # Send AT+CGPS=1 on startup
|
||||||
|
gps_init_cmd: AT+CGPS=1 # Command to start GNSS receiver
|
||||||
|
at_init_timeout: 3.0 # Seconds to wait for AT response
|
||||||
|
|
||||||
|
# GPS accuracy (used for NavSatFix covariance computation)
|
||||||
|
# SIM7600A-H spec: ±2.5m CEP (circular error probable) at HDOP=1
|
||||||
|
gps_accuracy: 2.5 # metres CEP at HDOP=1
|
||||||
|
|
||||||
|
# ── Cellular manager ───────────────────────────────────────────────────────────
|
||||||
|
# AT commands are polled to monitor signal quality and registration status.
|
||||||
|
# The data connection itself (4G LTE) is managed by the OS (NetworkManager or pppd).
|
||||||
|
|
||||||
|
poll_rate: 0.2 # Hz (poll every 5 seconds — avoid AT flooding)
|
||||||
|
at_timeout: 2.0 # Seconds per AT command
|
||||||
|
|
||||||
|
# Data connection management
|
||||||
|
# "nmcli": NetworkManager profile (recommended for Orin Nano running Ubuntu)
|
||||||
|
# "pppd": pppd service unit (systemctl restart pppd-cellular)
|
||||||
|
# "none": monitoring only, no auto-reconnect
|
||||||
|
connection_method: nmcli
|
||||||
|
nmcli_profile: saltybot-cellular # nmcli connection profile name
|
||||||
|
apn: "" # APN (set if NetworkManager needs it)
|
||||||
|
reconnect_interval: 30.0 # Seconds between reconnect attempts
|
||||||
|
|
||||||
|
# ── MQTT bridge ────────────────────────────────────────────────────────────────
|
||||||
|
# Relay telemetry to a remote MQTT broker over the cellular link.
|
||||||
|
# For home-lab deployment: run mosquitto on the Jetson or a cloud VM.
|
||||||
|
# For production: use TLS + authentication.
|
||||||
|
|
||||||
|
mqtt_broker: mqtt.saltylab.local # Broker hostname or IP
|
||||||
|
mqtt_port: 1883 # 1883 = plain, 8883 = TLS
|
||||||
|
mqtt_user: "" # Leave empty to disable auth
|
||||||
|
mqtt_password: ""
|
||||||
|
topic_prefix: saltybot # MQTT topic prefix (saltybot/gps/fix, etc.)
|
||||||
|
client_id: saltybot-jetson
|
||||||
|
keepalive: 60 # MQTT keepalive interval (seconds)
|
||||||
|
reconnect_delay: 5.0 # Seconds between MQTT reconnect attempts
|
||||||
|
|
||||||
|
# TLS (set mqtt_port: 8883 and provide CA cert path)
|
||||||
|
tls_enable: false
|
||||||
|
tls_ca_cert: "" # e.g. /etc/ssl/certs/mqtt-ca.crt
|
||||||
|
|
||||||
|
# Per-topic outbound rate limits (Hz). Keeps cellular bandwidth < 50 KB/s.
|
||||||
|
# At 1 Hz per topic × ~100 bytes/msg × 6 topics ≈ 600 bytes/s = 0.6 KB/s
|
||||||
|
imu_rate: 5.0 # Hz — /saltybot/imu → saltybot/imu
|
||||||
|
gps_rate: 1.0 # Hz — /gps/fix, /gps/vel
|
||||||
|
person_rate: 2.0 # Hz — /person/target
|
||||||
|
uwb_rate: 1.0 # Hz — /uwb/ranges
|
||||||
|
cellular_rate: 0.2 # Hz — /cellular/status (once per 5s)
|
||||||
194
jetson/ros2_ws/src/saltybot_cellular/launch/cellular.launch.py
Normal file
194
jetson/ros2_ws/src/saltybot_cellular/launch/cellular.launch.py
Normal file
@ -0,0 +1,194 @@
|
|||||||
|
"""
|
||||||
|
cellular.launch.py — Launch all saltybot_cellular nodes.
|
||||||
|
|
||||||
|
Starts three nodes:
|
||||||
|
gps_driver_node — NMEA GPS → /gps/fix, /gps/vel
|
||||||
|
cellular_manager_node — AT modem monitor → /cellular/status, /cellular/rssi
|
||||||
|
mqtt_bridge_node — ROS2 ↔ MQTT relay over cellular
|
||||||
|
|
||||||
|
Prerequisites:
|
||||||
|
1. SIM7600X HAT connected via USB → /dev/ttyUSB0-2 must exist
|
||||||
|
2. Nano-SIM with active data plan inserted
|
||||||
|
3. NetworkManager profile "saltybot-cellular" configured (for nmcli method):
|
||||||
|
nmcli connection add type gsm ifname '*' con-name saltybot-cellular apn <your-apn>
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
# Defaults (all params from cellular_params.yaml):
|
||||||
|
ros2 launch saltybot_cellular cellular.launch.py
|
||||||
|
|
||||||
|
# GPS only (skip modem + MQTT):
|
||||||
|
ros2 launch saltybot_cellular cellular.launch.py \
|
||||||
|
launch_cellular_manager:=false launch_mqtt_bridge:=false
|
||||||
|
|
||||||
|
# Custom MQTT broker:
|
||||||
|
ros2 launch saltybot_cellular cellular.launch.py mqtt_broker:=192.168.1.100
|
||||||
|
|
||||||
|
# Override params file:
|
||||||
|
ros2 launch saltybot_cellular cellular.launch.py \
|
||||||
|
params_file:=/path/to/my_cellular_params.yaml
|
||||||
|
|
||||||
|
GPS topics:
|
||||||
|
/gps/fix (sensor_msgs/NavSatFix)
|
||||||
|
/gps/vel (geometry_msgs/TwistStamped)
|
||||||
|
|
||||||
|
Cellular topics:
|
||||||
|
/cellular/status (diagnostic_msgs/DiagnosticArray)
|
||||||
|
/cellular/rssi (std_msgs/Int32)
|
||||||
|
/cellular/connected (std_msgs/Bool)
|
||||||
|
|
||||||
|
MQTT relay (ROS2→MQTT):
|
||||||
|
saltybot/imu, saltybot/gps/fix, saltybot/gps/vel, saltybot/uwb/ranges,
|
||||||
|
saltybot/person, saltybot/cellular
|
||||||
|
|
||||||
|
MQTT relay (MQTT→ROS2):
|
||||||
|
saltybot/cmd → /saltybot/cmd, saltybot/estop → /saltybot/estop
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
|
||||||
|
from ament_index_python.packages import get_package_share_directory
|
||||||
|
from launch import LaunchDescription
|
||||||
|
from launch.actions import DeclareLaunchArgument, OpaqueFunction
|
||||||
|
from launch.conditions import IfCondition
|
||||||
|
from launch.substitutions import LaunchConfiguration
|
||||||
|
from launch_ros.actions import Node
|
||||||
|
|
||||||
|
|
||||||
|
def _launch_nodes(context, *args, **kwargs):
|
||||||
|
params_file = LaunchConfiguration("params_file").perform(context)
|
||||||
|
|
||||||
|
def _b(name):
|
||||||
|
return LaunchConfiguration(name).perform(context).lower() == "true"
|
||||||
|
|
||||||
|
def _s(name):
|
||||||
|
return LaunchConfiguration(name).perform(context)
|
||||||
|
|
||||||
|
def _f(name):
|
||||||
|
return float(LaunchConfiguration(name).perform(context))
|
||||||
|
|
||||||
|
def _i(name):
|
||||||
|
return int(LaunchConfiguration(name).perform(context))
|
||||||
|
|
||||||
|
gps_params = {
|
||||||
|
"gps_port": _s("gps_port"),
|
||||||
|
"at_port": _s("at_port"),
|
||||||
|
"baud_rate": _i("baud_rate"),
|
||||||
|
"publish_rate": _f("gps_publish_rate"),
|
||||||
|
"frame_id": _s("frame_id"),
|
||||||
|
"gps_init_enable": _b("gps_init_enable"),
|
||||||
|
"gps_init_cmd": _s("gps_init_cmd"),
|
||||||
|
"gps_accuracy": _f("gps_accuracy"),
|
||||||
|
}
|
||||||
|
|
||||||
|
cellular_params = {
|
||||||
|
"at_port": _s("at_port"),
|
||||||
|
"baud_rate": _i("baud_rate"),
|
||||||
|
"poll_rate": _f("cellular_poll_rate"),
|
||||||
|
"connection_method": _s("connection_method"),
|
||||||
|
"nmcli_profile": _s("nmcli_profile"),
|
||||||
|
"apn": _s("apn"),
|
||||||
|
"reconnect_interval": _f("reconnect_interval"),
|
||||||
|
}
|
||||||
|
|
||||||
|
mqtt_params = {
|
||||||
|
"mqtt_broker": _s("mqtt_broker"),
|
||||||
|
"mqtt_port": _i("mqtt_port"),
|
||||||
|
"mqtt_user": _s("mqtt_user"),
|
||||||
|
"mqtt_password": _s("mqtt_password"),
|
||||||
|
"topic_prefix": _s("topic_prefix"),
|
||||||
|
"tls_enable": _b("tls_enable"),
|
||||||
|
"tls_ca_cert": _s("tls_ca_cert"),
|
||||||
|
"imu_rate": _f("imu_rate"),
|
||||||
|
"gps_rate": _f("gps_rate"),
|
||||||
|
"person_rate": _f("person_rate"),
|
||||||
|
"uwb_rate": _f("uwb_rate"),
|
||||||
|
}
|
||||||
|
|
||||||
|
def _node_params(inline):
|
||||||
|
return [params_file, inline] if params_file else [inline]
|
||||||
|
|
||||||
|
nodes = [
|
||||||
|
Node(
|
||||||
|
package="saltybot_cellular",
|
||||||
|
executable="gps_driver_node",
|
||||||
|
name="gps_driver",
|
||||||
|
output="screen",
|
||||||
|
parameters=_node_params(gps_params),
|
||||||
|
),
|
||||||
|
]
|
||||||
|
|
||||||
|
if _b("launch_cellular_manager"):
|
||||||
|
nodes.append(Node(
|
||||||
|
package="saltybot_cellular",
|
||||||
|
executable="cellular_manager_node",
|
||||||
|
name="cellular_manager",
|
||||||
|
output="screen",
|
||||||
|
parameters=_node_params(cellular_params),
|
||||||
|
))
|
||||||
|
|
||||||
|
if _b("launch_mqtt_bridge"):
|
||||||
|
nodes.append(Node(
|
||||||
|
package="saltybot_cellular",
|
||||||
|
executable="mqtt_bridge_node",
|
||||||
|
name="mqtt_bridge",
|
||||||
|
output="screen",
|
||||||
|
parameters=_node_params(mqtt_params),
|
||||||
|
))
|
||||||
|
|
||||||
|
return nodes
|
||||||
|
|
||||||
|
|
||||||
|
def generate_launch_description():
|
||||||
|
pkg_share = get_package_share_directory("saltybot_cellular")
|
||||||
|
default_params = os.path.join(pkg_share, "config", "cellular_params.yaml")
|
||||||
|
|
||||||
|
return LaunchDescription([
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
"params_file", default_value=default_params,
|
||||||
|
description="Full path to cellular_params.yaml (overrides inline args)"),
|
||||||
|
|
||||||
|
# Node enable/disable
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
"launch_cellular_manager", default_value="true",
|
||||||
|
description="Launch cellular_manager_node"),
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
"launch_mqtt_bridge", default_value="true",
|
||||||
|
description="Launch mqtt_bridge_node"),
|
||||||
|
|
||||||
|
# GPS driver
|
||||||
|
DeclareLaunchArgument("gps_port", default_value="/dev/ttyUSB2"),
|
||||||
|
DeclareLaunchArgument("at_port", default_value="/dev/ttyUSB0"),
|
||||||
|
DeclareLaunchArgument("baud_rate", default_value="115200"),
|
||||||
|
DeclareLaunchArgument("gps_publish_rate", default_value="1.0",
|
||||||
|
description="GPS publish rate (Hz)"),
|
||||||
|
DeclareLaunchArgument("frame_id", default_value="gps"),
|
||||||
|
DeclareLaunchArgument("gps_init_enable", default_value="true",
|
||||||
|
description="Send AT+CGPS=1 on startup"),
|
||||||
|
DeclareLaunchArgument("gps_init_cmd", default_value="AT+CGPS=1"),
|
||||||
|
DeclareLaunchArgument("gps_accuracy", default_value="2.5",
|
||||||
|
description="GPS CEP accuracy at HDOP=1 (m)"),
|
||||||
|
|
||||||
|
# Cellular manager
|
||||||
|
DeclareLaunchArgument("cellular_poll_rate", default_value="0.2",
|
||||||
|
description="AT poll rate (Hz)"),
|
||||||
|
DeclareLaunchArgument("connection_method", default_value="nmcli",
|
||||||
|
description="nmcli | pppd | none"),
|
||||||
|
DeclareLaunchArgument("nmcli_profile", default_value="saltybot-cellular"),
|
||||||
|
DeclareLaunchArgument("apn", default_value=""),
|
||||||
|
DeclareLaunchArgument("reconnect_interval", default_value="30.0"),
|
||||||
|
|
||||||
|
# MQTT bridge
|
||||||
|
DeclareLaunchArgument("mqtt_broker", default_value="mqtt.saltylab.local"),
|
||||||
|
DeclareLaunchArgument("mqtt_port", default_value="1883"),
|
||||||
|
DeclareLaunchArgument("mqtt_user", default_value=""),
|
||||||
|
DeclareLaunchArgument("mqtt_password", default_value=""),
|
||||||
|
DeclareLaunchArgument("topic_prefix", default_value="saltybot"),
|
||||||
|
DeclareLaunchArgument("tls_enable", default_value="false"),
|
||||||
|
DeclareLaunchArgument("tls_ca_cert", default_value=""),
|
||||||
|
DeclareLaunchArgument("imu_rate", default_value="5.0"),
|
||||||
|
DeclareLaunchArgument("gps_rate", default_value="1.0"),
|
||||||
|
DeclareLaunchArgument("person_rate", default_value="2.0"),
|
||||||
|
DeclareLaunchArgument("uwb_rate", default_value="1.0"),
|
||||||
|
|
||||||
|
OpaqueFunction(function=_launch_nodes),
|
||||||
|
])
|
||||||
31
jetson/ros2_ws/src/saltybot_cellular/package.xml
Normal file
31
jetson/ros2_ws/src/saltybot_cellular/package.xml
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
<?xml version="1.0"?>
|
||||||
|
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||||
|
<package format="3">
|
||||||
|
<name>saltybot_cellular</name>
|
||||||
|
<version>0.1.0</version>
|
||||||
|
<description>
|
||||||
|
Waveshare SIM7600X 4G HAT driver for SaltyBot.
|
||||||
|
Provides GPS (NMEA → NavSatFix), cellular modem monitoring (AT commands),
|
||||||
|
and MQTT telemetry relay over 4G LTE.
|
||||||
|
Hardware: SIMCom SIM7600A-H, LTE Cat-4, GPS+GLONASS+BeiDou+Galileo.
|
||||||
|
</description>
|
||||||
|
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
|
||||||
|
<license>MIT</license>
|
||||||
|
|
||||||
|
<depend>rclpy</depend>
|
||||||
|
<depend>sensor_msgs</depend>
|
||||||
|
<depend>geometry_msgs</depend>
|
||||||
|
<depend>std_msgs</depend>
|
||||||
|
<depend>diagnostic_msgs</depend>
|
||||||
|
|
||||||
|
<buildtool_depend>ament_python</buildtool_depend>
|
||||||
|
|
||||||
|
<test_depend>ament_copyright</test_depend>
|
||||||
|
<test_depend>ament_flake8</test_depend>
|
||||||
|
<test_depend>ament_pep257</test_depend>
|
||||||
|
<test_depend>python3-pytest</test_depend>
|
||||||
|
|
||||||
|
<export>
|
||||||
|
<build_type>ament_python</build_type>
|
||||||
|
</export>
|
||||||
|
</package>
|
||||||
@ -0,0 +1,362 @@
|
|||||||
|
"""
|
||||||
|
cellular_manager_node.py — SIM7600X 4G modem status monitor + connection manager.
|
||||||
|
|
||||||
|
Hardware: Waveshare SIM7600X 4G HAT (SIMCom SIM7600A-H)
|
||||||
|
AT command port: /dev/ttyUSB0 (115200 baud)
|
||||||
|
Data connection managed by NetworkManager (nmcli) or pppd
|
||||||
|
|
||||||
|
Publishes:
|
||||||
|
/cellular/status (diagnostic_msgs/DiagnosticArray) — rssi, network, connected
|
||||||
|
/cellular/rssi (std_msgs/Int32) — signal strength dBm (-113..0)
|
||||||
|
/cellular/connected (std_msgs/Bool) — data connection active
|
||||||
|
|
||||||
|
Key AT commands used:
|
||||||
|
AT+CSQ → signal quality (rssi 0-31, ber)
|
||||||
|
AT+CREG? → network registration status
|
||||||
|
AT+COPS? → operator name
|
||||||
|
AT+CGDCONT? → PDP context / APN
|
||||||
|
|
||||||
|
4G data connection management:
|
||||||
|
connection_method = "nmcli": uses `nmcli connection up <profile>`
|
||||||
|
connection_method = "pppd": uses pppd with provided dial script
|
||||||
|
connection_method = "none": monitor only, no auto-connect
|
||||||
|
"""
|
||||||
|
|
||||||
|
import subprocess
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
|
||||||
|
from std_msgs.msg import Bool, Int32
|
||||||
|
|
||||||
|
try:
|
||||||
|
import serial
|
||||||
|
_SERIAL_OK = True
|
||||||
|
except ImportError:
|
||||||
|
_SERIAL_OK = False
|
||||||
|
|
||||||
|
|
||||||
|
# ── Pure AT response parsing functions ────────────────────────────────────────
|
||||||
|
|
||||||
|
CREG_STATUS = {
|
||||||
|
"0": "not_registered",
|
||||||
|
"1": "registered_home",
|
||||||
|
"2": "searching",
|
||||||
|
"3": "denied",
|
||||||
|
"4": "unknown",
|
||||||
|
"5": "roaming",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def parse_csq(response: str) -> dict:
|
||||||
|
"""
|
||||||
|
Parse AT+CSQ response.
|
||||||
|
|
||||||
|
Expected response: '+CSQ: <rssi>,<ber>'
|
||||||
|
rssi: 0-31 → -113 + 2*rssi dBm; 99 = unknown
|
||||||
|
ber: 0-7 (bit error rate); 99 = unknown
|
||||||
|
|
||||||
|
Returns dict with rssi_raw, rssi_dbm, ber, valid.
|
||||||
|
"""
|
||||||
|
for line in response.splitlines():
|
||||||
|
line = line.strip()
|
||||||
|
if not line.startswith("+CSQ:"):
|
||||||
|
continue
|
||||||
|
parts = line[5:].strip().split(",")
|
||||||
|
if len(parts) < 2:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
rssi_raw = int(parts[0].strip())
|
||||||
|
ber = int(parts[1].strip())
|
||||||
|
except ValueError:
|
||||||
|
continue
|
||||||
|
if rssi_raw == 99:
|
||||||
|
return {"rssi_raw": 99, "rssi_dbm": None, "ber": ber, "valid": False}
|
||||||
|
return {
|
||||||
|
"rssi_raw": rssi_raw,
|
||||||
|
"rssi_dbm": -113 + 2 * rssi_raw,
|
||||||
|
"ber": ber,
|
||||||
|
"valid": True,
|
||||||
|
}
|
||||||
|
return {"rssi_raw": 99, "rssi_dbm": None, "ber": 99, "valid": False}
|
||||||
|
|
||||||
|
|
||||||
|
def parse_creg(response: str) -> dict:
|
||||||
|
"""
|
||||||
|
Parse AT+CREG? response.
|
||||||
|
|
||||||
|
Expected: '+CREG: <n>,<stat>' or '+CREG: <stat>'
|
||||||
|
Returns dict with stat_raw, status_str, registered.
|
||||||
|
"""
|
||||||
|
for line in response.splitlines():
|
||||||
|
line = line.strip()
|
||||||
|
if not line.startswith("+CREG:"):
|
||||||
|
continue
|
||||||
|
payload = line[6:].strip()
|
||||||
|
parts = payload.split(",")
|
||||||
|
# Can be '<n>,<stat>' or just '<stat>'
|
||||||
|
stat_str = parts[-1].strip()
|
||||||
|
stat_str = stat_str.split()[0] # strip trailing chars
|
||||||
|
status = CREG_STATUS.get(stat_str, "unknown")
|
||||||
|
return {
|
||||||
|
"stat_raw": stat_str,
|
||||||
|
"status_str": status,
|
||||||
|
"registered": stat_str in ("1", "5"),
|
||||||
|
}
|
||||||
|
return {"stat_raw": "?", "status_str": "unknown", "registered": False}
|
||||||
|
|
||||||
|
|
||||||
|
def parse_cops(response: str) -> str:
|
||||||
|
"""
|
||||||
|
Parse AT+COPS? response.
|
||||||
|
|
||||||
|
Expected: '+COPS: <mode>,<format>,<oper>[,<AcT>]'
|
||||||
|
Returns operator name string, or '' if not available.
|
||||||
|
"""
|
||||||
|
for line in response.splitlines():
|
||||||
|
line = line.strip()
|
||||||
|
if not line.startswith("+COPS:"):
|
||||||
|
continue
|
||||||
|
parts = line[6:].strip().split(",")
|
||||||
|
if len(parts) >= 3:
|
||||||
|
return parts[2].strip().strip('"')
|
||||||
|
return ""
|
||||||
|
|
||||||
|
|
||||||
|
def rssi_to_quality_label(rssi_dbm) -> str:
|
||||||
|
"""Human-readable signal quality label from RSSI dBm."""
|
||||||
|
if rssi_dbm is None:
|
||||||
|
return "no_signal"
|
||||||
|
if rssi_dbm >= -70:
|
||||||
|
return "excellent"
|
||||||
|
if rssi_dbm >= -85:
|
||||||
|
return "good"
|
||||||
|
if rssi_dbm >= -100:
|
||||||
|
return "fair"
|
||||||
|
return "poor"
|
||||||
|
|
||||||
|
|
||||||
|
# ── AT command helper ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _at_command(ser, cmd: str, timeout: float = 2.0) -> str:
|
||||||
|
"""Send AT command, return response string (up to OK/ERROR)."""
|
||||||
|
ser.reset_input_buffer()
|
||||||
|
ser.write((cmd + "\r\n").encode("ascii"))
|
||||||
|
deadline = time.monotonic() + timeout
|
||||||
|
buf = ""
|
||||||
|
while time.monotonic() < deadline:
|
||||||
|
chunk = ser.read(ser.in_waiting or 1).decode("ascii", errors="replace")
|
||||||
|
buf += chunk
|
||||||
|
if "OK\r\n" in buf or "ERROR" in buf:
|
||||||
|
break
|
||||||
|
return buf
|
||||||
|
|
||||||
|
|
||||||
|
# ── ROS2 Node ─────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class CellularManagerNode(Node):
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__("cellular_manager")
|
||||||
|
|
||||||
|
self.declare_parameter("at_port", "/dev/ttyUSB0")
|
||||||
|
self.declare_parameter("baud_rate", 115200)
|
||||||
|
self.declare_parameter("poll_rate", 0.2) # Hz (every 5s)
|
||||||
|
self.declare_parameter("at_timeout", 2.0) # s per AT command
|
||||||
|
self.declare_parameter("connection_method", "nmcli") # nmcli|pppd|none
|
||||||
|
self.declare_parameter("nmcli_profile", "saltybot-cellular")
|
||||||
|
self.declare_parameter("apn", "")
|
||||||
|
self.declare_parameter("reconnect_interval", 30.0) # s between reconnect tries
|
||||||
|
|
||||||
|
p = {k: self.get_parameter(k).value for k in [
|
||||||
|
"at_port", "baud_rate", "poll_rate", "at_timeout",
|
||||||
|
"connection_method", "nmcli_profile", "apn", "reconnect_interval",
|
||||||
|
]}
|
||||||
|
self._p = p
|
||||||
|
|
||||||
|
# State
|
||||||
|
self._csq = {"rssi_raw": 99, "rssi_dbm": None, "ber": 99, "valid": False}
|
||||||
|
self._creg = {"stat_raw": "?", "status_str": "unknown", "registered": False}
|
||||||
|
self._operator = ""
|
||||||
|
self._connected = False
|
||||||
|
self._last_reconnect = 0.0
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
|
||||||
|
# Publishers
|
||||||
|
self._status_pub = self.create_publisher(DiagnosticArray, "/cellular/status", 10)
|
||||||
|
self._rssi_pub = self.create_publisher(Int32, "/cellular/rssi", 10)
|
||||||
|
self._conn_pub = self.create_publisher(Bool, "/cellular/connected", 10)
|
||||||
|
|
||||||
|
if not _SERIAL_OK:
|
||||||
|
self.get_logger().error("pyserial not installed — cellular manager inactive")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Background AT poll thread
|
||||||
|
self._running = True
|
||||||
|
self._poll_thread = threading.Thread(target=self._poll_loop, daemon=True)
|
||||||
|
self._poll_thread.start()
|
||||||
|
|
||||||
|
# Publish timer (same rate as poll)
|
||||||
|
self.create_timer(1.0 / p["poll_rate"], self._publish_cb)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"CellularManager ready at_port={p['at_port']} "
|
||||||
|
f"method={p['connection_method']}")
|
||||||
|
|
||||||
|
# ── AT poll loop ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _poll_loop(self):
|
||||||
|
interval = 1.0 / self._p["poll_rate"]
|
||||||
|
while self._running:
|
||||||
|
try:
|
||||||
|
with serial.Serial(
|
||||||
|
self._p["at_port"], self._p["baud_rate"],
|
||||||
|
timeout=self._p["at_timeout"] + 0.5) as ser:
|
||||||
|
self.get_logger().info(
|
||||||
|
f"AT port open: {self._p['at_port']}")
|
||||||
|
while self._running:
|
||||||
|
t0 = time.monotonic()
|
||||||
|
self._poll_once(ser)
|
||||||
|
elapsed = time.monotonic() - t0
|
||||||
|
if elapsed < interval:
|
||||||
|
time.sleep(interval - elapsed)
|
||||||
|
except serial.SerialException as exc:
|
||||||
|
if self._running:
|
||||||
|
self.get_logger().warn(
|
||||||
|
f"AT serial error ({exc}) — retrying in 5s")
|
||||||
|
time.sleep(5.0)
|
||||||
|
|
||||||
|
def _poll_once(self, ser):
|
||||||
|
"""Query AT+CSQ, AT+CREG?, AT+COPS? and update state."""
|
||||||
|
try:
|
||||||
|
csq_resp = _at_command(ser, "AT+CSQ", self._p["at_timeout"])
|
||||||
|
creg_resp = _at_command(ser, "AT+CREG?", self._p["at_timeout"])
|
||||||
|
cops_resp = _at_command(ser, "AT+COPS?", self._p["at_timeout"])
|
||||||
|
except Exception as exc:
|
||||||
|
self.get_logger().warn(f"AT poll error: {exc}")
|
||||||
|
return
|
||||||
|
|
||||||
|
csq = parse_csq(csq_resp)
|
||||||
|
creg = parse_creg(creg_resp)
|
||||||
|
operator = parse_cops(cops_resp)
|
||||||
|
connected = self._check_connectivity()
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
self._csq = csq
|
||||||
|
self._creg = creg
|
||||||
|
self._operator = operator
|
||||||
|
self._connected = connected
|
||||||
|
|
||||||
|
# Auto-reconnect if data link is down
|
||||||
|
if not connected and self._p["connection_method"] != "none":
|
||||||
|
now = time.monotonic()
|
||||||
|
if now - self._last_reconnect > self._p["reconnect_interval"]:
|
||||||
|
self._last_reconnect = now
|
||||||
|
self._attempt_reconnect()
|
||||||
|
|
||||||
|
# ── Connectivity check ────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _check_connectivity(self) -> bool:
|
||||||
|
"""Return True if cellular data connection is active."""
|
||||||
|
method = self._p["connection_method"]
|
||||||
|
try:
|
||||||
|
if method == "nmcli":
|
||||||
|
result = subprocess.run(
|
||||||
|
["nmcli", "-t", "-f", "GENERAL.STATE",
|
||||||
|
"con", "show", self._p["nmcli_profile"]],
|
||||||
|
capture_output=True, text=True, timeout=5.0)
|
||||||
|
return "activated" in result.stdout.lower()
|
||||||
|
if method == "pppd":
|
||||||
|
result = subprocess.run(
|
||||||
|
["ip", "link", "show", "ppp0"],
|
||||||
|
capture_output=True, text=True, timeout=3.0)
|
||||||
|
return result.returncode == 0 and "UP" in result.stdout
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _attempt_reconnect(self):
|
||||||
|
"""Attempt to bring up cellular data connection."""
|
||||||
|
method = self._p["connection_method"]
|
||||||
|
self.get_logger().info(f"Attempting cellular reconnect ({method})")
|
||||||
|
try:
|
||||||
|
if method == "nmcli":
|
||||||
|
subprocess.run(
|
||||||
|
["nmcli", "connection", "up", self._p["nmcli_profile"]],
|
||||||
|
timeout=30.0, check=False)
|
||||||
|
elif method == "pppd":
|
||||||
|
subprocess.run(
|
||||||
|
["systemctl", "restart", "pppd-cellular"],
|
||||||
|
timeout=15.0, check=False)
|
||||||
|
except Exception as exc:
|
||||||
|
self.get_logger().warn(f"Reconnect failed: {exc}")
|
||||||
|
|
||||||
|
# ── Publish callback ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _publish_cb(self):
|
||||||
|
with self._lock:
|
||||||
|
csq = self._csq
|
||||||
|
creg = self._creg
|
||||||
|
operator = self._operator
|
||||||
|
connected = self._connected
|
||||||
|
|
||||||
|
now = self.get_clock().now().to_msg()
|
||||||
|
|
||||||
|
# DiagnosticArray — full status
|
||||||
|
ds = DiagnosticStatus()
|
||||||
|
ds.name = "Cellular"
|
||||||
|
ds.hardware_id = "SIM7600A-H"
|
||||||
|
ds.level = DiagnosticStatus.OK if connected else DiagnosticStatus.WARN
|
||||||
|
ds.message = (
|
||||||
|
f"{'Connected' if connected else 'Disconnected'} | "
|
||||||
|
f"{operator or 'unknown'} | "
|
||||||
|
f"{creg['status_str']} | "
|
||||||
|
f"{rssi_to_quality_label(csq['rssi_dbm'])}"
|
||||||
|
)
|
||||||
|
ds.values = [
|
||||||
|
KeyValue(key="connected", value=str(connected)),
|
||||||
|
KeyValue(key="registration", value=creg["status_str"]),
|
||||||
|
KeyValue(key="operator", value=operator),
|
||||||
|
KeyValue(key="rssi_raw", value=str(csq["rssi_raw"])),
|
||||||
|
KeyValue(key="rssi_dbm", value=str(csq["rssi_dbm"] or "unknown")),
|
||||||
|
KeyValue(key="signal_quality", value=rssi_to_quality_label(csq["rssi_dbm"])),
|
||||||
|
KeyValue(key="ber", value=str(csq["ber"])),
|
||||||
|
]
|
||||||
|
arr = DiagnosticArray()
|
||||||
|
arr.header.stamp = now
|
||||||
|
arr.status = [ds]
|
||||||
|
self._status_pub.publish(arr)
|
||||||
|
|
||||||
|
# /cellular/rssi (Int32, dBm; 0 if unknown)
|
||||||
|
rssi_msg = Int32()
|
||||||
|
rssi_msg.data = csq["rssi_dbm"] if csq["rssi_dbm"] is not None else 0
|
||||||
|
self._rssi_pub.publish(rssi_msg)
|
||||||
|
|
||||||
|
# /cellular/connected (Bool)
|
||||||
|
conn_msg = Bool()
|
||||||
|
conn_msg.data = connected
|
||||||
|
self._conn_pub.publish(conn_msg)
|
||||||
|
|
||||||
|
def destroy_node(self):
|
||||||
|
self._running = False
|
||||||
|
super().destroy_node()
|
||||||
|
|
||||||
|
|
||||||
|
# ── Entry point ───────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = CellularManagerNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.try_shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@ -0,0 +1,402 @@
|
|||||||
|
"""
|
||||||
|
gps_driver_node.py — SIM7600X GPS NMEA → ROS2 NavSatFix + TwistStamped.
|
||||||
|
|
||||||
|
Hardware: Waveshare SIM7600X 4G HAT (SIMCom SIM7600A-H)
|
||||||
|
GNSS: GPS + GLONASS + BeiDou + Galileo, ±2.5m CEP
|
||||||
|
GPS serial port: /dev/ttyUSB2 (NMEA output, 115200 baud)
|
||||||
|
AT port: /dev/ttyUSB0 (used to start GNSS receiver: AT+CGPS=1)
|
||||||
|
|
||||||
|
Publishes:
|
||||||
|
/gps/fix (sensor_msgs/NavSatFix) — position with covariance from HDOP
|
||||||
|
/gps/vel (geometry_msgs/TwistStamped) — ground speed + course (from NMEA RMC)
|
||||||
|
/diagnostics — fix status, satellites, HDOP
|
||||||
|
|
||||||
|
Startup sequence:
|
||||||
|
1. Optional: open AT port → send AT+CGPS=1 to start GNSS receiver
|
||||||
|
2. Open GPS serial port → read NMEA sentences in background thread
|
||||||
|
3. Publish latest GGA + RMC data at publish_rate Hz
|
||||||
|
"""
|
||||||
|
|
||||||
|
import math
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
|
||||||
|
from geometry_msgs.msg import TwistStamped
|
||||||
|
from sensor_msgs.msg import NavSatFix, NavSatStatus
|
||||||
|
|
||||||
|
try:
|
||||||
|
import serial
|
||||||
|
_SERIAL_OK = True
|
||||||
|
except ImportError:
|
||||||
|
_SERIAL_OK = False
|
||||||
|
|
||||||
|
|
||||||
|
# ── Pure NMEA parsing functions ───────────────────────────────────────────────
|
||||||
|
# Extracted here for unit tests — no ROS2 imports needed.
|
||||||
|
|
||||||
|
GPS_CEP_ACCURACY = 2.5 # metres at HDOP=1 (SIM7600A-H spec)
|
||||||
|
KNOTS_TO_MS = 0.514444
|
||||||
|
|
||||||
|
|
||||||
|
def nmea_checksum_valid(sentence: str) -> bool:
|
||||||
|
"""Validate NMEA checksum. Sentence must include the leading '$'."""
|
||||||
|
if '*' not in sentence:
|
||||||
|
return False
|
||||||
|
data, checksum_str = sentence[1:].rsplit('*', 1)
|
||||||
|
try:
|
||||||
|
expected = int(checksum_str.strip()[:2], 16)
|
||||||
|
except ValueError:
|
||||||
|
return False
|
||||||
|
computed = 0
|
||||||
|
for ch in data:
|
||||||
|
computed ^= ord(ch)
|
||||||
|
return computed == expected
|
||||||
|
|
||||||
|
|
||||||
|
def nmea_dm_to_degrees(dm_str: str, direction: str) -> float:
|
||||||
|
"""
|
||||||
|
Convert NMEA DDMM.MMMM or DDDMM.MMMM to decimal degrees.
|
||||||
|
|
||||||
|
NMEA format: degrees are everything up to the last two digits before the
|
||||||
|
decimal point (minutes field is always MM.fractional).
|
||||||
|
"""
|
||||||
|
if not dm_str:
|
||||||
|
return None
|
||||||
|
dot = dm_str.index('.')
|
||||||
|
deg = float(dm_str[:dot - 2])
|
||||||
|
mins = float(dm_str[dot - 2:])
|
||||||
|
decimal = deg + mins / 60.0
|
||||||
|
if direction in ('S', 'W'):
|
||||||
|
decimal = -decimal
|
||||||
|
return decimal
|
||||||
|
|
||||||
|
|
||||||
|
def parse_gga(sentence: str) -> dict:
|
||||||
|
"""
|
||||||
|
Parse NMEA GGA sentence (any talker: GP/GN/GL/GA/GB).
|
||||||
|
|
||||||
|
Returns dict with keys:
|
||||||
|
latitude, longitude, altitude, fix_quality, num_satellites, hdop, utc_time
|
||||||
|
Returns None on parse error or no-fix (fix_quality == 0).
|
||||||
|
|
||||||
|
GGA field indices (0-based after splitting by comma):
|
||||||
|
0 Talker+GGA 1 UTC 2 Lat 3 N/S
|
||||||
|
4 Lon 5 E/W 6 Fix 7 NumSV
|
||||||
|
8 HDOP 9 AltMSL 10 M 11 GeoidSep
|
||||||
|
12 M 13 Age 14 StaID*CS
|
||||||
|
"""
|
||||||
|
raw = sentence.strip()
|
||||||
|
if raw.startswith('$'):
|
||||||
|
raw = raw[1:]
|
||||||
|
if '*' in raw:
|
||||||
|
raw = raw[:raw.index('*')]
|
||||||
|
parts = raw.split(',')
|
||||||
|
if len(parts) < 10:
|
||||||
|
return None
|
||||||
|
if not parts[0].endswith('GGA'):
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
fix_quality = int(parts[6]) if parts[6] else 0
|
||||||
|
except ValueError:
|
||||||
|
return None
|
||||||
|
if fix_quality == 0:
|
||||||
|
return None
|
||||||
|
lat = nmea_dm_to_degrees(parts[2], parts[3])
|
||||||
|
lon = nmea_dm_to_degrees(parts[4], parts[5])
|
||||||
|
if lat is None or lon is None:
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
num_sats = int(parts[7]) if parts[7] else 0
|
||||||
|
hdop = float(parts[8]) if parts[8] else 99.0
|
||||||
|
alt_msl = float(parts[9]) if parts[9] else 0.0
|
||||||
|
geoid_sep = float(parts[11]) if len(parts) > 11 and parts[11] else 0.0
|
||||||
|
except (ValueError, IndexError):
|
||||||
|
return None
|
||||||
|
return {
|
||||||
|
'latitude': lat,
|
||||||
|
'longitude': lon,
|
||||||
|
'altitude': alt_msl + geoid_sep, # WGS84 ellipsoidal height
|
||||||
|
'fix_quality': fix_quality,
|
||||||
|
'num_satellites': num_sats,
|
||||||
|
'hdop': hdop,
|
||||||
|
'utc_time': parts[1],
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def parse_rmc(sentence: str) -> dict:
|
||||||
|
"""
|
||||||
|
Parse NMEA RMC sentence (any talker).
|
||||||
|
|
||||||
|
Returns dict with keys:
|
||||||
|
speed_mps, course_deg, valid, utc_time
|
||||||
|
Returns None on parse error or void status (status == 'V').
|
||||||
|
|
||||||
|
RMC field indices:
|
||||||
|
0 Talker+RMC 1 UTC 2 Status(A/V) 3 Lat 4 N/S
|
||||||
|
5 Lon 6 E/W 7 SpeedKnots 8 CourseDeg 9 Date
|
||||||
|
10 MagVar 11 MagDir 12 Mode*CS
|
||||||
|
"""
|
||||||
|
raw = sentence.strip()
|
||||||
|
if raw.startswith('$'):
|
||||||
|
raw = raw[1:]
|
||||||
|
if '*' in raw:
|
||||||
|
raw = raw[:raw.index('*')]
|
||||||
|
parts = raw.split(',')
|
||||||
|
if len(parts) < 9:
|
||||||
|
return None
|
||||||
|
if not parts[0].endswith('RMC'):
|
||||||
|
return None
|
||||||
|
if parts[2] != 'A': # 'V' = void / no fix
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
speed_knots = float(parts[7]) if parts[7] else 0.0
|
||||||
|
course_deg = float(parts[8]) if parts[8] else 0.0
|
||||||
|
except ValueError:
|
||||||
|
return None
|
||||||
|
return {
|
||||||
|
'speed_mps': speed_knots * KNOTS_TO_MS,
|
||||||
|
'course_deg': course_deg,
|
||||||
|
'valid': True,
|
||||||
|
'utc_time': parts[1],
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def hdop_to_covariance(hdop: float, accuracy_m: float = GPS_CEP_ACCURACY) -> list:
|
||||||
|
"""
|
||||||
|
Compute diagonal position covariance matrix (row-major 3×3) from HDOP.
|
||||||
|
|
||||||
|
sigma_h = hdop * accuracy_m (horizontal 1σ ≈ CEP * 1.2)
|
||||||
|
sigma_v = sigma_h * 1.5 (vertical is typically 1.5× worse)
|
||||||
|
"""
|
||||||
|
sigma_h = hdop * accuracy_m
|
||||||
|
sigma_v = sigma_h * 1.5
|
||||||
|
return [
|
||||||
|
sigma_h**2, 0.0, 0.0,
|
||||||
|
0.0, sigma_h**2, 0.0,
|
||||||
|
0.0, 0.0, sigma_v**2,
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def fix_quality_to_status(fix_quality: int) -> int:
|
||||||
|
"""Map NMEA GGA fix quality to NavSatStatus.status."""
|
||||||
|
if fix_quality <= 0:
|
||||||
|
return NavSatStatus.STATUS_NO_FIX # -1
|
||||||
|
if fix_quality == 2:
|
||||||
|
return NavSatStatus.STATUS_SBAS_FIX # 1 (DGPS)
|
||||||
|
if fix_quality in (4, 5):
|
||||||
|
return NavSatStatus.STATUS_GBAS_FIX # 2 (RTK)
|
||||||
|
return NavSatStatus.STATUS_FIX # 0 (standard GPS)
|
||||||
|
|
||||||
|
|
||||||
|
def course_to_velocity_enu(speed_mps: float, course_deg: float) -> tuple:
|
||||||
|
"""
|
||||||
|
Convert scalar ground speed + course-over-ground to ENU velocity components.
|
||||||
|
|
||||||
|
Returns (v_east, v_north).
|
||||||
|
Course: 0° = North, 90° = East (compass convention).
|
||||||
|
"""
|
||||||
|
course_rad = math.radians(course_deg)
|
||||||
|
v_north = speed_mps * math.cos(course_rad)
|
||||||
|
v_east = speed_mps * math.sin(course_rad)
|
||||||
|
return v_east, v_north
|
||||||
|
|
||||||
|
|
||||||
|
# ── ROS2 Node ─────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class GpsDriverNode(Node):
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__("gps_driver")
|
||||||
|
|
||||||
|
self.declare_parameter("gps_port", "/dev/ttyUSB2")
|
||||||
|
self.declare_parameter("at_port", "/dev/ttyUSB0")
|
||||||
|
self.declare_parameter("baud_rate", 115200)
|
||||||
|
self.declare_parameter("publish_rate", 1.0)
|
||||||
|
self.declare_parameter("frame_id", "gps")
|
||||||
|
self.declare_parameter("gps_init_enable", True)
|
||||||
|
self.declare_parameter("gps_init_cmd", "AT+CGPS=1")
|
||||||
|
self.declare_parameter("at_init_timeout", 3.0)
|
||||||
|
self.declare_parameter("gps_accuracy", GPS_CEP_ACCURACY)
|
||||||
|
|
||||||
|
p = {
|
||||||
|
"gps_port": self.get_parameter("gps_port").value,
|
||||||
|
"at_port": self.get_parameter("at_port").value,
|
||||||
|
"baud_rate": self.get_parameter("baud_rate").value,
|
||||||
|
"publish_rate": self.get_parameter("publish_rate").value,
|
||||||
|
"frame_id": self.get_parameter("frame_id").value,
|
||||||
|
"gps_init_enable": self.get_parameter("gps_init_enable").value,
|
||||||
|
"gps_init_cmd": self.get_parameter("gps_init_cmd").value,
|
||||||
|
"at_init_timeout": self.get_parameter("at_init_timeout").value,
|
||||||
|
"gps_accuracy": self.get_parameter("gps_accuracy").value,
|
||||||
|
}
|
||||||
|
self._p = p
|
||||||
|
|
||||||
|
# Publishers
|
||||||
|
self._fix_pub = self.create_publisher(NavSatFix, "/gps/fix", 10)
|
||||||
|
self._vel_pub = self.create_publisher(TwistStamped, "/gps/vel", 10)
|
||||||
|
self._diag_pub = self.create_publisher(DiagnosticArray, "/diagnostics", 10)
|
||||||
|
|
||||||
|
# Latest parsed data (thread-safe via GIL for simple assignment)
|
||||||
|
self._gga = None # dict from parse_gga
|
||||||
|
self._rmc = None # dict from parse_rmc
|
||||||
|
self._ser = None
|
||||||
|
self._running = False
|
||||||
|
|
||||||
|
if not _SERIAL_OK:
|
||||||
|
self.get_logger().error("pyserial not installed — GPS node inactive")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Optional: send AT+CGPS=1 to start GNSS
|
||||||
|
if p["gps_init_enable"]:
|
||||||
|
self._send_at_init()
|
||||||
|
|
||||||
|
# Start background serial reader
|
||||||
|
self._running = True
|
||||||
|
self._reader_thread = threading.Thread(
|
||||||
|
target=self._reader_loop, daemon=True)
|
||||||
|
self._reader_thread.start()
|
||||||
|
|
||||||
|
# Publish timer
|
||||||
|
self.create_timer(1.0 / p["publish_rate"], self._publish_cb)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"GpsDriver ready port={p['gps_port']} rate={p['publish_rate']}Hz")
|
||||||
|
|
||||||
|
# ── AT init ───────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _send_at_init(self):
|
||||||
|
"""Open AT port, send GPS-start command, close."""
|
||||||
|
try:
|
||||||
|
with serial.Serial(
|
||||||
|
self._p["at_port"], self._p["baud_rate"],
|
||||||
|
timeout=self._p["at_init_timeout"]) as at:
|
||||||
|
cmd = (self._p["gps_init_cmd"] + "\r\n").encode("ascii")
|
||||||
|
at.write(cmd)
|
||||||
|
resp = at.read_until(b"OK\r\n", size=256)
|
||||||
|
if b"OK" in resp:
|
||||||
|
self.get_logger().info("GPS init OK (AT+CGPS=1)")
|
||||||
|
else:
|
||||||
|
self.get_logger().warn(
|
||||||
|
f"GPS init: unexpected response: {resp!r}")
|
||||||
|
except serial.SerialException as exc:
|
||||||
|
self.get_logger().warn(
|
||||||
|
f"GPS AT init failed ({exc}) — assuming GPS already running")
|
||||||
|
|
||||||
|
# ── Background NMEA reader ─────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _reader_loop(self):
|
||||||
|
while self._running:
|
||||||
|
try:
|
||||||
|
with serial.Serial(
|
||||||
|
self._p["gps_port"], self._p["baud_rate"],
|
||||||
|
timeout=1.0) as ser:
|
||||||
|
self._ser = ser
|
||||||
|
self.get_logger().info(
|
||||||
|
f"GPS serial open: {self._p['gps_port']}")
|
||||||
|
while self._running:
|
||||||
|
line = ser.readline().decode("ascii", errors="replace").strip()
|
||||||
|
if not line:
|
||||||
|
continue
|
||||||
|
if not nmea_checksum_valid("$" + line.lstrip("$")):
|
||||||
|
continue
|
||||||
|
gga = parse_gga(line)
|
||||||
|
if gga:
|
||||||
|
self._gga = gga
|
||||||
|
rmc = parse_rmc(line)
|
||||||
|
if rmc:
|
||||||
|
self._rmc = rmc
|
||||||
|
except serial.SerialException as exc:
|
||||||
|
self._ser = None
|
||||||
|
if self._running:
|
||||||
|
self.get_logger().warn(
|
||||||
|
f"GPS serial error ({exc}) — retrying in 3s")
|
||||||
|
time.sleep(3.0)
|
||||||
|
except Exception as exc:
|
||||||
|
self.get_logger().error(f"GPS reader exception: {exc}")
|
||||||
|
time.sleep(3.0)
|
||||||
|
|
||||||
|
# ── Publish callback ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _publish_cb(self):
|
||||||
|
now = self.get_clock().now().to_msg()
|
||||||
|
gga = self._gga
|
||||||
|
rmc = self._rmc
|
||||||
|
|
||||||
|
# NavSatFix
|
||||||
|
fix_msg = NavSatFix()
|
||||||
|
fix_msg.header.stamp = now
|
||||||
|
fix_msg.header.frame_id = self._p["frame_id"]
|
||||||
|
if gga:
|
||||||
|
fix_msg.status.service = (NavSatStatus.SERVICE_GPS |
|
||||||
|
NavSatStatus.SERVICE_GLONASS)
|
||||||
|
fix_msg.status.status = fix_quality_to_status(gga["fix_quality"])
|
||||||
|
fix_msg.latitude = gga["latitude"]
|
||||||
|
fix_msg.longitude = gga["longitude"]
|
||||||
|
fix_msg.altitude = gga["altitude"]
|
||||||
|
fix_msg.position_covariance = hdop_to_covariance(
|
||||||
|
gga["hdop"], self._p["gps_accuracy"])
|
||||||
|
fix_msg.position_covariance_type = NavSatFix.COVARIANCE_TYPE_APPROXIMATED
|
||||||
|
else:
|
||||||
|
fix_msg.status.status = NavSatStatus.STATUS_NO_FIX
|
||||||
|
fix_msg.position_covariance_type = NavSatFix.COVARIANCE_TYPE_UNKNOWN
|
||||||
|
self._fix_pub.publish(fix_msg)
|
||||||
|
|
||||||
|
# TwistStamped (ground velocity)
|
||||||
|
if rmc:
|
||||||
|
vel_msg = TwistStamped()
|
||||||
|
vel_msg.header.stamp = now
|
||||||
|
vel_msg.header.frame_id = "odom" # ENU velocity in odom frame
|
||||||
|
v_east, v_north = course_to_velocity_enu(
|
||||||
|
rmc["speed_mps"], rmc["course_deg"])
|
||||||
|
vel_msg.twist.linear.x = v_east # East
|
||||||
|
vel_msg.twist.linear.y = v_north # North
|
||||||
|
vel_msg.twist.angular.z = math.radians(rmc["course_deg"]) # heading
|
||||||
|
self._vel_pub.publish(vel_msg)
|
||||||
|
|
||||||
|
# Diagnostics
|
||||||
|
diag_arr = DiagnosticArray()
|
||||||
|
diag_arr.header.stamp = now
|
||||||
|
ds = DiagnosticStatus()
|
||||||
|
ds.name = "GPS"
|
||||||
|
ds.hardware_id = "SIM7600A-H"
|
||||||
|
if gga:
|
||||||
|
ds.level = DiagnosticStatus.OK
|
||||||
|
ds.message = f"Fix quality {gga['fix_quality']}, {gga['num_satellites']} sats"
|
||||||
|
ds.values = [
|
||||||
|
KeyValue(key="fix_quality", value=str(gga["fix_quality"])),
|
||||||
|
KeyValue(key="satellites", value=str(gga["num_satellites"])),
|
||||||
|
KeyValue(key="hdop", value=f"{gga['hdop']:.1f}"),
|
||||||
|
KeyValue(key="latitude", value=f"{gga['latitude']:.6f}"),
|
||||||
|
KeyValue(key="longitude", value=f"{gga['longitude']:.6f}"),
|
||||||
|
KeyValue(key="altitude_m", value=f"{gga['altitude']:.1f}"),
|
||||||
|
]
|
||||||
|
else:
|
||||||
|
ds.level = DiagnosticStatus.WARN
|
||||||
|
ds.message = "No GPS fix"
|
||||||
|
diag_arr.status = [ds]
|
||||||
|
self._diag_pub.publish(diag_arr)
|
||||||
|
|
||||||
|
def destroy_node(self):
|
||||||
|
self._running = False
|
||||||
|
super().destroy_node()
|
||||||
|
|
||||||
|
|
||||||
|
# ── Entry point ───────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = GpsDriverNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.try_shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@ -0,0 +1,317 @@
|
|||||||
|
"""
|
||||||
|
mqtt_bridge_node.py — ROS2 ↔ MQTT bridge for cellular telemetry relay.
|
||||||
|
|
||||||
|
Forwards key ROS2 topics to a remote MQTT broker over the cellular link,
|
||||||
|
and receives remote commands from MQTT, republishing them on ROS2.
|
||||||
|
|
||||||
|
Bandwidth budget: < 50 KB/s total over cellular.
|
||||||
|
At 1 Hz all topics: ~1–2 KB/s (well within budget).
|
||||||
|
|
||||||
|
ROS2 → MQTT (telemetry, QoS 0):
|
||||||
|
/saltybot/imu → <prefix>/imu (sensor_msgs/Imu)
|
||||||
|
/gps/fix → <prefix>/gps/fix (sensor_msgs/NavSatFix)
|
||||||
|
/gps/vel → <prefix>/gps/vel (geometry_msgs/TwistStamped)
|
||||||
|
/uwb/ranges → <prefix>/uwb/ranges (std_msgs/String JSON)
|
||||||
|
/person/target → <prefix>/person (geometry_msgs/PoseStamped)
|
||||||
|
/cellular/status → <prefix>/cellular (diagnostic_msgs pass-through)
|
||||||
|
|
||||||
|
MQTT → ROS2 (commands, QoS 1):
|
||||||
|
<prefix>/cmd → /saltybot/cmd (std_msgs/String)
|
||||||
|
<prefix>/estop → /saltybot/estop (std_msgs/Bool)
|
||||||
|
|
||||||
|
MQTT topic naming:
|
||||||
|
prefix default: "saltybot"
|
||||||
|
e.g. saltybot/gps/fix
|
||||||
|
|
||||||
|
Bandwidth throttling:
|
||||||
|
Each topic has a configurable max publish rate.
|
||||||
|
Default telemetry at 1 Hz, /saltybot/imu at 5 Hz (still manageable).
|
||||||
|
|
||||||
|
TLS support:
|
||||||
|
Set tls_enable: true and provide tls_ca_cert path for encrypted connection.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import math
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from diagnostic_msgs.msg import DiagnosticArray
|
||||||
|
from geometry_msgs.msg import PoseStamped, TwistStamped
|
||||||
|
from sensor_msgs.msg import Imu, NavSatFix
|
||||||
|
from std_msgs.msg import Bool, String
|
||||||
|
|
||||||
|
try:
|
||||||
|
import paho.mqtt.client as mqtt
|
||||||
|
_MQTT_OK = True
|
||||||
|
except ImportError:
|
||||||
|
_MQTT_OK = False
|
||||||
|
|
||||||
|
|
||||||
|
# ── Pure serialisation functions ──────────────────────────────────────────────
|
||||||
|
# Extracted for unit tests — no ROS2 runtime needed.
|
||||||
|
|
||||||
|
def imu_to_dict(msg) -> dict:
|
||||||
|
"""Serialize sensor_msgs/Imu to bandwidth-minimal dict."""
|
||||||
|
return {
|
||||||
|
"ax": round(msg.linear_acceleration.x, 3),
|
||||||
|
"ay": round(msg.linear_acceleration.y, 3),
|
||||||
|
"az": round(msg.linear_acceleration.z, 3),
|
||||||
|
"gx": round(msg.angular_velocity.x, 4),
|
||||||
|
"gy": round(msg.angular_velocity.y, 4),
|
||||||
|
"gz": round(msg.angular_velocity.z, 4),
|
||||||
|
"t": msg.header.stamp.sec,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def navsatfix_to_dict(msg) -> dict:
|
||||||
|
"""Serialize sensor_msgs/NavSatFix to dict."""
|
||||||
|
return {
|
||||||
|
"lat": round(msg.latitude, 7),
|
||||||
|
"lon": round(msg.longitude, 7),
|
||||||
|
"alt": round(msg.altitude, 2),
|
||||||
|
"stat": int(msg.status.status),
|
||||||
|
"t": msg.header.stamp.sec,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def gps_vel_to_dict(msg) -> dict:
|
||||||
|
"""Serialize geometry_msgs/TwistStamped (GPS velocity) to dict."""
|
||||||
|
speed = math.sqrt(
|
||||||
|
msg.twist.linear.x ** 2 + msg.twist.linear.y ** 2)
|
||||||
|
heading_deg = math.degrees(msg.twist.angular.z) % 360.0
|
||||||
|
return {
|
||||||
|
"spd": round(speed, 2),
|
||||||
|
"hdg": round(heading_deg, 1),
|
||||||
|
"t": msg.header.stamp.sec,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def pose_to_dict(msg) -> dict:
|
||||||
|
"""Serialize geometry_msgs/PoseStamped to dict."""
|
||||||
|
return {
|
||||||
|
"x": round(msg.pose.position.x, 3),
|
||||||
|
"y": round(msg.pose.position.y, 3),
|
||||||
|
"z": round(msg.pose.position.z, 3),
|
||||||
|
"t": msg.header.stamp.sec,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ── Rate limiter helper ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class _RateLimiter:
|
||||||
|
"""True if enough time has elapsed since last allowed call."""
|
||||||
|
def __init__(self, rate_hz: float):
|
||||||
|
self._interval = 1.0 / rate_hz if rate_hz > 0 else 0.0
|
||||||
|
self._last = 0.0
|
||||||
|
|
||||||
|
def allow(self) -> bool:
|
||||||
|
now = time.monotonic()
|
||||||
|
if now - self._last >= self._interval:
|
||||||
|
self._last = now
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
# ── ROS2 Node ─────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class MqttBridgeNode(Node):
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__("mqtt_bridge")
|
||||||
|
|
||||||
|
# ── Parameters ────────────────────────────────────────────────────────
|
||||||
|
self.declare_parameter("mqtt_broker", "mqtt.saltylab.local")
|
||||||
|
self.declare_parameter("mqtt_port", 1883)
|
||||||
|
self.declare_parameter("mqtt_user", "")
|
||||||
|
self.declare_parameter("mqtt_password", "")
|
||||||
|
self.declare_parameter("topic_prefix", "saltybot")
|
||||||
|
self.declare_parameter("client_id", "saltybot-jetson")
|
||||||
|
self.declare_parameter("tls_enable", False)
|
||||||
|
self.declare_parameter("tls_ca_cert", "")
|
||||||
|
self.declare_parameter("keepalive", 60)
|
||||||
|
self.declare_parameter("reconnect_delay", 5.0)
|
||||||
|
# Per-topic rate limits (Hz, 0 = unlimited but still bounded by ROS2 topic rate)
|
||||||
|
self.declare_parameter("imu_rate", 5.0)
|
||||||
|
self.declare_parameter("gps_rate", 1.0)
|
||||||
|
self.declare_parameter("person_rate", 2.0)
|
||||||
|
self.declare_parameter("uwb_rate", 1.0)
|
||||||
|
self.declare_parameter("cellular_rate", 0.2)
|
||||||
|
|
||||||
|
p = {k: self.get_parameter(k).value for k in [
|
||||||
|
"mqtt_broker", "mqtt_port", "mqtt_user", "mqtt_password",
|
||||||
|
"topic_prefix", "client_id", "tls_enable", "tls_ca_cert",
|
||||||
|
"keepalive", "reconnect_delay",
|
||||||
|
"imu_rate", "gps_rate", "person_rate", "uwb_rate", "cellular_rate",
|
||||||
|
]}
|
||||||
|
self._p = p
|
||||||
|
self._prefix = p["topic_prefix"].rstrip("/")
|
||||||
|
self._client = None
|
||||||
|
self._connected = False
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
|
||||||
|
# Rate limiters per outbound topic
|
||||||
|
self._rl = {
|
||||||
|
"imu": _RateLimiter(p["imu_rate"]),
|
||||||
|
"gps": _RateLimiter(p["gps_rate"]),
|
||||||
|
"person": _RateLimiter(p["person_rate"]),
|
||||||
|
"uwb": _RateLimiter(p["uwb_rate"]),
|
||||||
|
"cellular": _RateLimiter(p["cellular_rate"]),
|
||||||
|
}
|
||||||
|
|
||||||
|
if not _MQTT_OK:
|
||||||
|
self.get_logger().error(
|
||||||
|
"paho-mqtt not installed (pip install paho-mqtt) — bridge inactive")
|
||||||
|
return
|
||||||
|
|
||||||
|
# ── MQTT client setup ──────────────────────────────────────────────────
|
||||||
|
self._client = mqtt.Client(client_id=p["client_id"])
|
||||||
|
if p["mqtt_user"]:
|
||||||
|
self._client.username_pw_set(p["mqtt_user"], p["mqtt_password"])
|
||||||
|
if p["tls_enable"] and p["tls_ca_cert"]:
|
||||||
|
self._client.tls_set(ca_certs=p["tls_ca_cert"])
|
||||||
|
self._client.on_connect = self._on_connect
|
||||||
|
self._client.on_disconnect = self._on_disconnect
|
||||||
|
self._client.on_message = self._on_mqtt_message
|
||||||
|
|
||||||
|
self._client.reconnect_delay_set(
|
||||||
|
min_delay=1, max_delay=int(p["reconnect_delay"]))
|
||||||
|
|
||||||
|
# Start MQTT loop in background thread
|
||||||
|
self._client.connect_async(p["mqtt_broker"], p["mqtt_port"], p["keepalive"])
|
||||||
|
self._client.loop_start()
|
||||||
|
|
||||||
|
# ── ROS2 subscriptions (telemetry out) ─────────────────────────────────
|
||||||
|
self.create_subscription(Imu, "/saltybot/imu", self._imu_cb, 10)
|
||||||
|
self.create_subscription(NavSatFix, "/gps/fix", self._gps_fix_cb, 10)
|
||||||
|
self.create_subscription(TwistStamped, "/gps/vel", self._gps_vel_cb, 10)
|
||||||
|
self.create_subscription(String, "/uwb/ranges", self._uwb_cb, 10)
|
||||||
|
self.create_subscription(PoseStamped, "/person/target", self._person_cb, 10)
|
||||||
|
self.create_subscription(DiagnosticArray, "/cellular/status", self._cellular_cb, 5)
|
||||||
|
|
||||||
|
# ── ROS2 publishers (commands in) ──────────────────────────────────────
|
||||||
|
self._cmd_pub = self.create_publisher(String, "/saltybot/cmd", 10)
|
||||||
|
self._estop_pub = self.create_publisher(Bool, "/saltybot/estop", 10)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"MqttBridge ready broker={p['mqtt_broker']}:{p['mqtt_port']} "
|
||||||
|
f"prefix={self._prefix}")
|
||||||
|
|
||||||
|
# ── MQTT callbacks ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _on_connect(self, client, userdata, flags, rc):
|
||||||
|
if rc == 0:
|
||||||
|
self._connected = True
|
||||||
|
self.get_logger().info(
|
||||||
|
f"MQTT connected to {self._p['mqtt_broker']}")
|
||||||
|
# Subscribe to inbound command topics (QoS 1)
|
||||||
|
client.subscribe(f"{self._prefix}/cmd", qos=1)
|
||||||
|
client.subscribe(f"{self._prefix}/estop", qos=1)
|
||||||
|
else:
|
||||||
|
self.get_logger().warn(f"MQTT connect failed rc={rc}")
|
||||||
|
|
||||||
|
def _on_disconnect(self, client, userdata, rc):
|
||||||
|
self._connected = False
|
||||||
|
if rc != 0:
|
||||||
|
self.get_logger().warn(f"MQTT disconnected unexpectedly (rc={rc}), reconnecting")
|
||||||
|
|
||||||
|
def _on_mqtt_message(self, client, userdata, msg):
|
||||||
|
"""Route inbound MQTT commands to ROS2 topics."""
|
||||||
|
try:
|
||||||
|
topic = msg.topic
|
||||||
|
payload = msg.payload.decode("utf-8", errors="replace")
|
||||||
|
suffix = topic[len(self._prefix):].lstrip("/")
|
||||||
|
|
||||||
|
if suffix == "cmd":
|
||||||
|
ros_msg = String()
|
||||||
|
ros_msg.data = payload
|
||||||
|
self._cmd_pub.publish(ros_msg)
|
||||||
|
|
||||||
|
elif suffix == "estop":
|
||||||
|
try:
|
||||||
|
data = json.loads(payload)
|
||||||
|
val = bool(data.get("estop", False))
|
||||||
|
except (json.JSONDecodeError, AttributeError):
|
||||||
|
val = payload.strip().lower() in ("true", "1", "yes")
|
||||||
|
ros_msg = Bool()
|
||||||
|
ros_msg.data = val
|
||||||
|
self._estop_pub.publish(ros_msg)
|
||||||
|
if val:
|
||||||
|
self.get_logger().warn("ESTOP received via MQTT")
|
||||||
|
|
||||||
|
except Exception as exc:
|
||||||
|
self.get_logger().error(f"MQTT message error: {exc}")
|
||||||
|
|
||||||
|
# ── Publish helper ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _mqtt_publish(self, subtopic: str, payload: dict, qos: int = 0):
|
||||||
|
"""Serialize payload to JSON and publish to MQTT (QoS 0 by default)."""
|
||||||
|
if not self._connected or self._client is None:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
self._client.publish(
|
||||||
|
f"{self._prefix}/{subtopic}",
|
||||||
|
json.dumps(payload, separators=(",", ":")),
|
||||||
|
qos=qos,
|
||||||
|
)
|
||||||
|
except Exception as exc:
|
||||||
|
self.get_logger().warn(f"MQTT publish error: {exc}")
|
||||||
|
|
||||||
|
# ── ROS2 topic callbacks (telemetry out) ──────────────────────────────────
|
||||||
|
|
||||||
|
def _imu_cb(self, msg):
|
||||||
|
if self._rl["imu"].allow():
|
||||||
|
self._mqtt_publish("imu", imu_to_dict(msg))
|
||||||
|
|
||||||
|
def _gps_fix_cb(self, msg):
|
||||||
|
if self._rl["gps"].allow():
|
||||||
|
self._mqtt_publish("gps/fix", navsatfix_to_dict(msg))
|
||||||
|
|
||||||
|
def _gps_vel_cb(self, msg):
|
||||||
|
if self._rl["gps"].allow():
|
||||||
|
self._mqtt_publish("gps/vel", gps_vel_to_dict(msg))
|
||||||
|
|
||||||
|
def _uwb_cb(self, msg):
|
||||||
|
if self._rl["uwb"].allow():
|
||||||
|
try:
|
||||||
|
data = json.loads(msg.data)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
data = {"raw": msg.data}
|
||||||
|
self._mqtt_publish("uwb/ranges", data)
|
||||||
|
|
||||||
|
def _person_cb(self, msg):
|
||||||
|
if self._rl["person"].allow():
|
||||||
|
self._mqtt_publish("person", pose_to_dict(msg))
|
||||||
|
|
||||||
|
def _cellular_cb(self, msg):
|
||||||
|
if self._rl["cellular"].allow() and msg.status:
|
||||||
|
ds = msg.status[0]
|
||||||
|
d = {kv.key: kv.value for kv in ds.values}
|
||||||
|
d["level"] = ds.level
|
||||||
|
self._mqtt_publish("cellular", d)
|
||||||
|
|
||||||
|
def destroy_node(self):
|
||||||
|
if self._client:
|
||||||
|
self._client.loop_stop()
|
||||||
|
self._client.disconnect()
|
||||||
|
super().destroy_node()
|
||||||
|
|
||||||
|
|
||||||
|
# ── Entry point ───────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = MqttBridgeNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.try_shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
4
jetson/ros2_ws/src/saltybot_cellular/setup.cfg
Normal file
4
jetson/ros2_ws/src/saltybot_cellular/setup.cfg
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
[develop]
|
||||||
|
script_dir=$base/lib/saltybot_cellular
|
||||||
|
[install]
|
||||||
|
install_scripts=$base/lib/saltybot_cellular
|
||||||
32
jetson/ros2_ws/src/saltybot_cellular/setup.py
Normal file
32
jetson/ros2_ws/src/saltybot_cellular/setup.py
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
from setuptools import setup
|
||||||
|
|
||||||
|
package_name = "saltybot_cellular"
|
||||||
|
|
||||||
|
setup(
|
||||||
|
name=package_name,
|
||||||
|
version="0.1.0",
|
||||||
|
packages=[package_name],
|
||||||
|
data_files=[
|
||||||
|
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
|
||||||
|
(f"share/{package_name}", ["package.xml"]),
|
||||||
|
(f"share/{package_name}/launch", ["launch/cellular.launch.py"]),
|
||||||
|
(f"share/{package_name}/config", ["config/cellular_params.yaml"]),
|
||||||
|
],
|
||||||
|
install_requires=["setuptools", "pyserial"],
|
||||||
|
zip_safe=True,
|
||||||
|
maintainer="sl-controls",
|
||||||
|
maintainer_email="sl-controls@saltylab.local",
|
||||||
|
description="SIM7600X 4G HAT driver: GPS NavSatFix + cellular modem + MQTT bridge",
|
||||||
|
license="MIT",
|
||||||
|
tests_require=["pytest"],
|
||||||
|
entry_points={
|
||||||
|
"console_scripts": [
|
||||||
|
# NMEA GPS → /gps/fix, /gps/vel
|
||||||
|
"gps_driver_node = saltybot_cellular.gps_driver_node:main",
|
||||||
|
# AT modem monitor → /cellular/status, /cellular/rssi, /cellular/connected
|
||||||
|
"cellular_manager_node = saltybot_cellular.cellular_manager_node:main",
|
||||||
|
# ROS2 ↔ MQTT telemetry relay
|
||||||
|
"mqtt_bridge_node = saltybot_cellular.mqtt_bridge_node:main",
|
||||||
|
],
|
||||||
|
},
|
||||||
|
)
|
||||||
514
jetson/ros2_ws/src/saltybot_cellular/test/test_cellular.py
Normal file
514
jetson/ros2_ws/src/saltybot_cellular/test/test_cellular.py
Normal file
@ -0,0 +1,514 @@
|
|||||||
|
"""
|
||||||
|
Unit tests for saltybot_cellular node logic.
|
||||||
|
No ROS2 runtime required — tests pure functions mirrored from the nodes.
|
||||||
|
Run with: pytest jetson/ros2_ws/src/saltybot_cellular/test/test_cellular.py
|
||||||
|
"""
|
||||||
|
|
||||||
|
import math
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
|
# ── Pure function mirrors — GPS ───────────────────────────────────────────────
|
||||||
|
|
||||||
|
KNOTS_TO_MS = 0.514444
|
||||||
|
GPS_ACCURACY = 2.5
|
||||||
|
|
||||||
|
|
||||||
|
def _nmea_checksum_valid(sentence):
|
||||||
|
if '*' not in sentence:
|
||||||
|
return False
|
||||||
|
data, cs_str = sentence[1:].rsplit('*', 1)
|
||||||
|
try:
|
||||||
|
expected = int(cs_str.strip()[:2], 16)
|
||||||
|
except ValueError:
|
||||||
|
return False
|
||||||
|
computed = 0
|
||||||
|
for ch in data:
|
||||||
|
computed ^= ord(ch)
|
||||||
|
return computed == expected
|
||||||
|
|
||||||
|
|
||||||
|
def _nmea_dm_to_degrees(dm_str, direction):
|
||||||
|
if not dm_str:
|
||||||
|
return None
|
||||||
|
dot = dm_str.index('.')
|
||||||
|
deg = float(dm_str[:dot - 2])
|
||||||
|
mins = float(dm_str[dot - 2:])
|
||||||
|
dec = deg + mins / 60.0
|
||||||
|
if direction in ('S', 'W'):
|
||||||
|
dec = -dec
|
||||||
|
return dec
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_gga(sentence):
|
||||||
|
raw = sentence.strip()
|
||||||
|
if raw.startswith('$'):
|
||||||
|
raw = raw[1:]
|
||||||
|
if '*' in raw:
|
||||||
|
raw = raw[:raw.index('*')]
|
||||||
|
parts = raw.split(',')
|
||||||
|
if len(parts) < 10 or not parts[0].endswith('GGA'):
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
fix_quality = int(parts[6]) if parts[6] else 0
|
||||||
|
except ValueError:
|
||||||
|
return None
|
||||||
|
if fix_quality == 0:
|
||||||
|
return None
|
||||||
|
lat = _nmea_dm_to_degrees(parts[2], parts[3])
|
||||||
|
lon = _nmea_dm_to_degrees(parts[4], parts[5])
|
||||||
|
if lat is None or lon is None:
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
num_sats = int(parts[7]) if parts[7] else 0
|
||||||
|
hdop = float(parts[8]) if parts[8] else 99.0
|
||||||
|
alt_msl = float(parts[9]) if parts[9] else 0.0
|
||||||
|
geoid_sep = float(parts[11]) if len(parts) > 11 and parts[11] else 0.0
|
||||||
|
except (ValueError, IndexError):
|
||||||
|
return None
|
||||||
|
return {
|
||||||
|
'latitude': lat,
|
||||||
|
'longitude': lon,
|
||||||
|
'altitude': alt_msl + geoid_sep,
|
||||||
|
'fix_quality': fix_quality,
|
||||||
|
'num_satellites': num_sats,
|
||||||
|
'hdop': hdop,
|
||||||
|
'utc_time': parts[1],
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_rmc(sentence):
|
||||||
|
raw = sentence.strip()
|
||||||
|
if raw.startswith('$'):
|
||||||
|
raw = raw[1:]
|
||||||
|
if '*' in raw:
|
||||||
|
raw = raw[:raw.index('*')]
|
||||||
|
parts = raw.split(',')
|
||||||
|
if len(parts) < 9 or not parts[0].endswith('RMC'):
|
||||||
|
return None
|
||||||
|
if parts[2] != 'A':
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
speed_mps = float(parts[7]) * KNOTS_TO_MS if parts[7] else 0.0
|
||||||
|
course_deg = float(parts[8]) if parts[8] else 0.0
|
||||||
|
except ValueError:
|
||||||
|
return None
|
||||||
|
return {'speed_mps': speed_mps, 'course_deg': course_deg, 'valid': True}
|
||||||
|
|
||||||
|
|
||||||
|
def _hdop_to_covariance(hdop, accuracy=GPS_ACCURACY):
|
||||||
|
sh = hdop * accuracy
|
||||||
|
sv = sh * 1.5
|
||||||
|
return [sh**2, 0, 0, 0, sh**2, 0, 0, 0, sv**2]
|
||||||
|
|
||||||
|
|
||||||
|
def _course_to_enu(speed_mps, course_deg):
|
||||||
|
r = math.radians(course_deg)
|
||||||
|
return speed_mps * math.sin(r), speed_mps * math.cos(r) # vE, vN
|
||||||
|
|
||||||
|
|
||||||
|
# ── Pure function mirrors — AT parsing ───────────────────────────────────────
|
||||||
|
|
||||||
|
_CREG_STATUS = {
|
||||||
|
"0": "not_registered", "1": "registered_home",
|
||||||
|
"2": "searching", "3": "denied",
|
||||||
|
"4": "unknown", "5": "roaming",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_csq(response):
|
||||||
|
for line in response.splitlines():
|
||||||
|
line = line.strip()
|
||||||
|
if not line.startswith("+CSQ:"):
|
||||||
|
continue
|
||||||
|
parts = line[5:].strip().split(",")
|
||||||
|
if len(parts) < 2:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
rssi_raw = int(parts[0].strip())
|
||||||
|
ber = int(parts[1].strip())
|
||||||
|
except ValueError:
|
||||||
|
continue
|
||||||
|
if rssi_raw == 99:
|
||||||
|
return {"rssi_raw": 99, "rssi_dbm": None, "ber": ber, "valid": False}
|
||||||
|
return {"rssi_raw": rssi_raw, "rssi_dbm": -113 + 2 * rssi_raw,
|
||||||
|
"ber": ber, "valid": True}
|
||||||
|
return {"rssi_raw": 99, "rssi_dbm": None, "ber": 99, "valid": False}
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_creg(response):
|
||||||
|
for line in response.splitlines():
|
||||||
|
line = line.strip()
|
||||||
|
if not line.startswith("+CREG:"):
|
||||||
|
continue
|
||||||
|
parts = line[6:].strip().split(",")
|
||||||
|
stat_str = parts[-1].strip().split()[0]
|
||||||
|
return {"stat_raw": stat_str,
|
||||||
|
"status_str": _CREG_STATUS.get(stat_str, "unknown"),
|
||||||
|
"registered": stat_str in ("1", "5")}
|
||||||
|
return {"stat_raw": "?", "status_str": "unknown", "registered": False}
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_cops(response):
|
||||||
|
for line in response.splitlines():
|
||||||
|
if line.strip().startswith("+COPS:"):
|
||||||
|
parts = line.strip()[6:].strip().split(",")
|
||||||
|
if len(parts) >= 3:
|
||||||
|
return parts[2].strip().strip('"')
|
||||||
|
return ""
|
||||||
|
|
||||||
|
|
||||||
|
def _rssi_to_label(rssi_dbm):
|
||||||
|
if rssi_dbm is None:
|
||||||
|
return "no_signal"
|
||||||
|
if rssi_dbm >= -70:
|
||||||
|
return "excellent"
|
||||||
|
if rssi_dbm >= -85:
|
||||||
|
return "good"
|
||||||
|
if rssi_dbm >= -100:
|
||||||
|
return "fair"
|
||||||
|
return "poor"
|
||||||
|
|
||||||
|
|
||||||
|
# ── Pure function mirrors — MQTT serialisation ────────────────────────────────
|
||||||
|
|
||||||
|
def _imu_to_dict(ax, ay, az, gx, gy, gz, sec=0):
|
||||||
|
class Acc:
|
||||||
|
pass
|
||||||
|
class Msg:
|
||||||
|
pass
|
||||||
|
m = Msg()
|
||||||
|
m.linear_acceleration = Acc()
|
||||||
|
m.angular_velocity = Acc()
|
||||||
|
m.header = Acc()
|
||||||
|
m.header.stamp = Acc()
|
||||||
|
m.linear_acceleration.x = ax
|
||||||
|
m.linear_acceleration.y = ay
|
||||||
|
m.linear_acceleration.z = az
|
||||||
|
m.angular_velocity.x = gx
|
||||||
|
m.angular_velocity.y = gy
|
||||||
|
m.angular_velocity.z = gz
|
||||||
|
m.header.stamp.sec = sec
|
||||||
|
return {
|
||||||
|
"ax": round(ax, 3), "ay": round(ay, 3), "az": round(az, 3),
|
||||||
|
"gx": round(gx, 4), "gy": round(gy, 4), "gz": round(gz, 4),
|
||||||
|
"t": sec,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ── NMEA checksum tests ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestNmeaChecksum:
|
||||||
|
def test_valid_gpgga(self):
|
||||||
|
# Valid GGA from standard NMEA example
|
||||||
|
s = "$GPGGA,123519,4807.038,N,01131.000,E,1,08,0.9,545.4,M,46.9,M,,*47"
|
||||||
|
assert _nmea_checksum_valid(s) is True
|
||||||
|
|
||||||
|
def test_valid_gnrmc(self):
|
||||||
|
s = "$GNRMC,123519.00,A,4807.0380,N,01131.0000,E,0.100,0.00,010101,,,A*7F"
|
||||||
|
assert _nmea_checksum_valid(s) is True
|
||||||
|
|
||||||
|
def test_invalid_checksum(self):
|
||||||
|
s = "$GPGGA,123519,4807.038,N,01131.000,E,1,08,0.9,545.4,M,46.9,M,,*FF"
|
||||||
|
assert _nmea_checksum_valid(s) is False
|
||||||
|
|
||||||
|
def test_no_checksum(self):
|
||||||
|
assert _nmea_checksum_valid("$GPGGA,123519,4807.038") is False
|
||||||
|
|
||||||
|
def test_empty_string(self):
|
||||||
|
assert _nmea_checksum_valid("") is False
|
||||||
|
|
||||||
|
|
||||||
|
# ── NMEA DM conversion tests ──────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestNmeaDm:
|
||||||
|
def test_north_hemisphere(self):
|
||||||
|
# 4807.038N = 48° 7.038' = 48.1173°
|
||||||
|
assert _nmea_dm_to_degrees("4807.038", "N") == pytest.approx(48.1173, abs=1e-4)
|
||||||
|
|
||||||
|
def test_south_hemisphere_negative(self):
|
||||||
|
result = _nmea_dm_to_degrees("4807.038", "S")
|
||||||
|
assert result == pytest.approx(-48.1173, abs=1e-4)
|
||||||
|
|
||||||
|
def test_east_longitude(self):
|
||||||
|
# 01131.000E = 11° 31.000' = 11.5167°
|
||||||
|
assert _nmea_dm_to_degrees("01131.000", "E") == pytest.approx(11.5167, abs=1e-4)
|
||||||
|
|
||||||
|
def test_west_longitude_negative(self):
|
||||||
|
result = _nmea_dm_to_degrees("07937.600", "W")
|
||||||
|
assert result < 0
|
||||||
|
|
||||||
|
def test_zero(self):
|
||||||
|
assert _nmea_dm_to_degrees("0000.000", "N") == pytest.approx(0.0)
|
||||||
|
|
||||||
|
def test_empty_string_none(self):
|
||||||
|
assert _nmea_dm_to_degrees("", "N") is None
|
||||||
|
|
||||||
|
|
||||||
|
# ── GGA parsing tests ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
_GGA_VALID = "$GPGGA,123519,4807.038,N,01131.000,E,1,08,0.9,545.4,M,46.9,M,,*47"
|
||||||
|
_GGA_DGPS = "$GPGGA,123519,4807.038,N,01131.000,E,2,08,0.9,545.4,M,46.9,M,,*44"
|
||||||
|
_GGA_NOFIX = "$GPGGA,123519,,,,,0,00,99.9,,M,,M,,*71"
|
||||||
|
_GGA_GN = "$GNGGA,123519,4807.038,N,01131.000,E,1,12,0.7,545.4,M,46.9,M,,*4B"
|
||||||
|
|
||||||
|
|
||||||
|
class TestParseGga:
|
||||||
|
def test_valid_parse(self):
|
||||||
|
r = _parse_gga(_GGA_VALID)
|
||||||
|
assert r is not None
|
||||||
|
assert r["latitude"] == pytest.approx(48.1173, abs=1e-4)
|
||||||
|
assert r["longitude"] == pytest.approx(11.5167, abs=1e-4)
|
||||||
|
assert r["fix_quality"] == 1
|
||||||
|
assert r["num_satellites"] == 8
|
||||||
|
assert r["hdop"] == pytest.approx(0.9)
|
||||||
|
|
||||||
|
def test_altitude_is_msl_plus_geoid(self):
|
||||||
|
r = _parse_gga(_GGA_VALID)
|
||||||
|
# 545.4 MSL + 46.9 geoid = 592.3m ellipsoidal
|
||||||
|
assert r["altitude"] == pytest.approx(592.3, abs=0.01)
|
||||||
|
|
||||||
|
def test_no_fix_returns_none(self):
|
||||||
|
assert _parse_gga(_GGA_NOFIX) is None
|
||||||
|
|
||||||
|
def test_dgps_fix_quality_2(self):
|
||||||
|
r = _parse_gga(_GGA_DGPS)
|
||||||
|
assert r is not None
|
||||||
|
assert r["fix_quality"] == 2
|
||||||
|
|
||||||
|
def test_gn_talker_id_accepted(self):
|
||||||
|
r = _parse_gga(_GGA_GN)
|
||||||
|
assert r is not None
|
||||||
|
assert r["num_satellites"] == 12
|
||||||
|
|
||||||
|
def test_non_gga_returns_none(self):
|
||||||
|
assert _parse_gga("$GPGSV,3,1,12,...*XX") is None
|
||||||
|
|
||||||
|
def test_short_sentence_returns_none(self):
|
||||||
|
assert _parse_gga("$GPGGA,123519") is None
|
||||||
|
|
||||||
|
def test_empty_returns_none(self):
|
||||||
|
assert _parse_gga("") is None
|
||||||
|
|
||||||
|
def test_utc_time_field(self):
|
||||||
|
r = _parse_gga(_GGA_VALID)
|
||||||
|
assert r["utc_time"] == "123519"
|
||||||
|
|
||||||
|
|
||||||
|
# ── RMC parsing tests ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
_RMC_VALID = "$GNRMC,123519.00,A,4807.038,N,01131.000,E,1.234,181.5,010101,,,A*XX"
|
||||||
|
_RMC_VOID = "$GNRMC,123519.00,V,,,,,,,010101,,,N*XX"
|
||||||
|
|
||||||
|
|
||||||
|
class TestParseRmc:
|
||||||
|
def test_valid_speed(self):
|
||||||
|
r = _parse_rmc(_RMC_VALID)
|
||||||
|
assert r is not None
|
||||||
|
assert r["speed_mps"] == pytest.approx(1.234 * KNOTS_TO_MS, rel=1e-3)
|
||||||
|
|
||||||
|
def test_valid_course(self):
|
||||||
|
r = _parse_rmc(_RMC_VALID)
|
||||||
|
assert r["course_deg"] == pytest.approx(181.5, abs=0.01)
|
||||||
|
|
||||||
|
def test_void_returns_none(self):
|
||||||
|
assert _parse_rmc(_RMC_VOID) is None
|
||||||
|
|
||||||
|
def test_non_rmc_returns_none(self):
|
||||||
|
assert _parse_rmc(_GGA_VALID) is None
|
||||||
|
|
||||||
|
def test_zero_speed(self):
|
||||||
|
rmc = "$GNRMC,000000.00,A,4807.038,N,01131.000,E,0.000,0.0,010101,,,A*XX"
|
||||||
|
r = _parse_rmc(rmc)
|
||||||
|
assert r is not None
|
||||||
|
assert r["speed_mps"] == pytest.approx(0.0)
|
||||||
|
|
||||||
|
|
||||||
|
# ── HDOP covariance tests ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestHdopCovariance:
|
||||||
|
def test_hdop1_gives_accuracy_squared(self):
|
||||||
|
cov = _hdop_to_covariance(1.0)
|
||||||
|
assert cov[0] == pytest.approx(GPS_ACCURACY ** 2) # sigma_h^2
|
||||||
|
assert cov[4] == pytest.approx(GPS_ACCURACY ** 2)
|
||||||
|
assert cov[8] == pytest.approx((GPS_ACCURACY * 1.5) ** 2)
|
||||||
|
|
||||||
|
def test_hdop2_doubles_sigma(self):
|
||||||
|
cov1 = _hdop_to_covariance(1.0)
|
||||||
|
cov2 = _hdop_to_covariance(2.0)
|
||||||
|
# sigma doubles → variance quadruples
|
||||||
|
assert cov2[0] == pytest.approx(4.0 * cov1[0])
|
||||||
|
|
||||||
|
def test_off_diagonal_zero(self):
|
||||||
|
cov = _hdop_to_covariance(1.0)
|
||||||
|
assert cov[1] == pytest.approx(0.0)
|
||||||
|
assert cov[2] == pytest.approx(0.0)
|
||||||
|
assert cov[3] == pytest.approx(0.0)
|
||||||
|
|
||||||
|
def test_vertical_worse_than_horizontal(self):
|
||||||
|
cov = _hdop_to_covariance(1.0)
|
||||||
|
assert cov[8] > cov[0] # sigma_v > sigma_h
|
||||||
|
|
||||||
|
|
||||||
|
# ── Course → ENU velocity tests ───────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestCourseToEnu:
|
||||||
|
def test_north_course_zero_easting(self):
|
||||||
|
vE, vN = _course_to_enu(1.0, 0.0) # 0° = due North
|
||||||
|
assert vE == pytest.approx(0.0, abs=1e-9)
|
||||||
|
assert vN == pytest.approx(1.0)
|
||||||
|
|
||||||
|
def test_east_course_zero_northing(self):
|
||||||
|
vE, vN = _course_to_enu(1.0, 90.0) # 90° = due East
|
||||||
|
assert vE == pytest.approx(1.0)
|
||||||
|
assert vN == pytest.approx(0.0, abs=1e-9)
|
||||||
|
|
||||||
|
def test_south_course_negative_northing(self):
|
||||||
|
vE, vN = _course_to_enu(1.0, 180.0) # 180° = South
|
||||||
|
assert vN == pytest.approx(-1.0)
|
||||||
|
|
||||||
|
def test_speed_preserved(self):
|
||||||
|
speed = 3.5
|
||||||
|
vE, vN = _course_to_enu(speed, 45.0)
|
||||||
|
assert math.sqrt(vE**2 + vN**2) == pytest.approx(speed)
|
||||||
|
|
||||||
|
def test_zero_speed(self):
|
||||||
|
vE, vN = _course_to_enu(0.0, 90.0)
|
||||||
|
assert vE == pytest.approx(0.0)
|
||||||
|
assert vN == pytest.approx(0.0)
|
||||||
|
|
||||||
|
|
||||||
|
# ── AT+CSQ parsing tests ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestParseCsq:
|
||||||
|
def test_nominal_rssi(self):
|
||||||
|
r = _parse_csq("+CSQ: 18,0\r\nOK\r\n")
|
||||||
|
assert r["valid"] is True
|
||||||
|
assert r["rssi_raw"] == 18
|
||||||
|
assert r["rssi_dbm"] == -113 + 2 * 18 # = -77 dBm
|
||||||
|
assert r["ber"] == 0
|
||||||
|
|
||||||
|
def test_unknown_rssi_99(self):
|
||||||
|
r = _parse_csq("+CSQ: 99,99\r\nOK\r\n")
|
||||||
|
assert r["valid"] is False
|
||||||
|
assert r["rssi_dbm"] is None
|
||||||
|
|
||||||
|
def test_min_rssi_0(self):
|
||||||
|
r = _parse_csq("+CSQ: 0,0\r\nOK\r\n")
|
||||||
|
assert r["rssi_dbm"] == -113
|
||||||
|
|
||||||
|
def test_max_rssi_31(self):
|
||||||
|
r = _parse_csq("+CSQ: 31,0\r\nOK\r\n")
|
||||||
|
assert r["rssi_dbm"] == -113 + 62 # = -51 dBm
|
||||||
|
|
||||||
|
def test_empty_response(self):
|
||||||
|
r = _parse_csq("ERROR\r\n")
|
||||||
|
assert r["valid"] is False
|
||||||
|
|
||||||
|
def test_csq_without_space(self):
|
||||||
|
# Some modems omit the space after +CSQ:
|
||||||
|
r = _parse_csq("+CSQ:20,0\r\nOK\r\n")
|
||||||
|
assert r["valid"] is True
|
||||||
|
assert r["rssi_raw"] == 20
|
||||||
|
|
||||||
|
|
||||||
|
# ── AT+CREG? parsing tests ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestParseCreg:
|
||||||
|
def test_registered_home(self):
|
||||||
|
r = _parse_creg("+CREG: 0,1\r\nOK\r\n")
|
||||||
|
assert r["registered"] is True
|
||||||
|
assert r["status_str"] == "registered_home"
|
||||||
|
|
||||||
|
def test_roaming(self):
|
||||||
|
r = _parse_creg("+CREG: 0,5\r\nOK\r\n")
|
||||||
|
assert r["registered"] is True
|
||||||
|
assert r["status_str"] == "roaming"
|
||||||
|
|
||||||
|
def test_not_registered(self):
|
||||||
|
r = _parse_creg("+CREG: 0,0\r\nOK\r\n")
|
||||||
|
assert r["registered"] is False
|
||||||
|
|
||||||
|
def test_searching(self):
|
||||||
|
r = _parse_creg("+CREG: 0,2\r\nOK\r\n")
|
||||||
|
assert r["registered"] is False
|
||||||
|
assert r["status_str"] == "searching"
|
||||||
|
|
||||||
|
def test_stat_only_format(self):
|
||||||
|
r = _parse_creg("+CREG: 1\r\nOK\r\n")
|
||||||
|
assert r["registered"] is True
|
||||||
|
|
||||||
|
def test_empty_response(self):
|
||||||
|
r = _parse_creg("ERROR\r\n")
|
||||||
|
assert r["registered"] is False
|
||||||
|
|
||||||
|
|
||||||
|
# ── AT+COPS? parsing tests ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestParseCops:
|
||||||
|
def test_operator_name(self):
|
||||||
|
r = _parse_cops('+COPS: 0,0,"Fido",7\r\nOK\r\n')
|
||||||
|
assert r == "Fido"
|
||||||
|
|
||||||
|
def test_operator_with_spaces(self):
|
||||||
|
r = _parse_cops('+COPS: 0,0,"Rogers Wireless",7\r\nOK\r\n')
|
||||||
|
assert r == "Rogers Wireless"
|
||||||
|
|
||||||
|
def test_no_operator(self):
|
||||||
|
r = _parse_cops("+COPS: 0\r\nOK\r\n")
|
||||||
|
assert r == ""
|
||||||
|
|
||||||
|
def test_error_response(self):
|
||||||
|
r = _parse_cops("ERROR\r\n")
|
||||||
|
assert r == ""
|
||||||
|
|
||||||
|
|
||||||
|
# ── Signal quality label tests ────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestRssiLabel:
|
||||||
|
def test_excellent(self):
|
||||||
|
assert _rssi_to_label(-65) == "excellent"
|
||||||
|
|
||||||
|
def test_good(self):
|
||||||
|
assert _rssi_to_label(-80) == "good"
|
||||||
|
|
||||||
|
def test_fair(self):
|
||||||
|
assert _rssi_to_label(-95) == "fair"
|
||||||
|
|
||||||
|
def test_poor(self):
|
||||||
|
assert _rssi_to_label(-105) == "poor"
|
||||||
|
|
||||||
|
def test_none_is_no_signal(self):
|
||||||
|
assert _rssi_to_label(None) == "no_signal"
|
||||||
|
|
||||||
|
def test_boundary_good_excellent(self):
|
||||||
|
assert _rssi_to_label(-70) == "excellent" # -70 >= -70 → excellent
|
||||||
|
|
||||||
|
def test_boundary_fair_good(self):
|
||||||
|
assert _rssi_to_label(-85) == "good" # -85 >= -85 → good
|
||||||
|
|
||||||
|
|
||||||
|
# ── MQTT serialisation tests ──────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestMqttSerialisation:
|
||||||
|
def test_imu_dict_keys(self):
|
||||||
|
d = _imu_to_dict(9.81, 0.1, -0.2, 0.01, -0.02, 0.0)
|
||||||
|
assert set(d.keys()) == {"ax", "ay", "az", "gx", "gy", "gz", "t"}
|
||||||
|
|
||||||
|
def test_imu_precision(self):
|
||||||
|
d = _imu_to_dict(9.81234, 0.0, 0.0, 0.0, 0.0, 0.0)
|
||||||
|
assert d["ax"] == pytest.approx(9.812, abs=1e-3)
|
||||||
|
|
||||||
|
def test_gps_fix_dict(self):
|
||||||
|
# Simple inline navsatfix
|
||||||
|
class S: pass
|
||||||
|
m = S(); m.status = S(); m.header = S(); m.header.stamp = S()
|
||||||
|
m.latitude = 48.1173; m.longitude = 11.5167; m.altitude = 592.3
|
||||||
|
m.status.status = 0; m.header.stamp.sec = 1234
|
||||||
|
d = {
|
||||||
|
"lat": round(m.latitude, 7), "lon": round(m.longitude, 7),
|
||||||
|
"alt": round(m.altitude, 2), "stat": int(m.status.status),
|
||||||
|
"t": m.header.stamp.sec,
|
||||||
|
}
|
||||||
|
assert d["lat"] == pytest.approx(48.1173, abs=1e-4)
|
||||||
|
assert d["alt"] == pytest.approx(592.3)
|
||||||
Loading…
x
Reference in New Issue
Block a user