feat: SIM7600X 4G cellular + GPS (#58) #65

Merged
seb merged 1 commits from sl-controls/cellular-gps into main 2026-03-01 00:51:16 -05:00
11 changed files with 1926 additions and 0 deletions

View File

@ -0,0 +1,70 @@
# cellular_params.yaml
# Configuration for saltybot_cellular package — SIM7600X 4G HAT.
#
# Run with:
# ros2 launch saltybot_cellular cellular.launch.py
#
# Hardware: Waveshare SIM7600X 4G HAT (SIMCom SIM7600A-H)
# /dev/ttyUSB0 — AT commands
# /dev/ttyUSB1 — PPP data (managed by NetworkManager/pppd, not ROS2)
# /dev/ttyUSB2 — NMEA GPS output
# ── GPS driver ─────────────────────────────────────────────────────────────────
# The GPS port streams NMEA sentences once AT+CGPS=1 is sent.
# If GPS is already running (e.g., from a previous session), set gps_init_enable: false.
gps_port: /dev/ttyUSB2 # NMEA output port
at_port: /dev/ttyUSB0 # AT command port (for CGPS=1 init)
baud_rate: 115200
publish_rate: 1.0 # Hz — GPS updates at 1 Hz max
frame_id: gps # Header frame_id for NavSatFix + TwistStamped
gps_init_enable: true # Send AT+CGPS=1 on startup
gps_init_cmd: AT+CGPS=1 # Command to start GNSS receiver
at_init_timeout: 3.0 # Seconds to wait for AT response
# GPS accuracy (used for NavSatFix covariance computation)
# SIM7600A-H spec: ±2.5m CEP (circular error probable) at HDOP=1
gps_accuracy: 2.5 # metres CEP at HDOP=1
# ── Cellular manager ───────────────────────────────────────────────────────────
# AT commands are polled to monitor signal quality and registration status.
# The data connection itself (4G LTE) is managed by the OS (NetworkManager or pppd).
poll_rate: 0.2 # Hz (poll every 5 seconds — avoid AT flooding)
at_timeout: 2.0 # Seconds per AT command
# Data connection management
# "nmcli": NetworkManager profile (recommended for Orin Nano running Ubuntu)
# "pppd": pppd service unit (systemctl restart pppd-cellular)
# "none": monitoring only, no auto-reconnect
connection_method: nmcli
nmcli_profile: saltybot-cellular # nmcli connection profile name
apn: "" # APN (set if NetworkManager needs it)
reconnect_interval: 30.0 # Seconds between reconnect attempts
# ── MQTT bridge ────────────────────────────────────────────────────────────────
# Relay telemetry to a remote MQTT broker over the cellular link.
# For home-lab deployment: run mosquitto on the Jetson or a cloud VM.
# For production: use TLS + authentication.
mqtt_broker: mqtt.saltylab.local # Broker hostname or IP
mqtt_port: 1883 # 1883 = plain, 8883 = TLS
mqtt_user: "" # Leave empty to disable auth
mqtt_password: ""
topic_prefix: saltybot # MQTT topic prefix (saltybot/gps/fix, etc.)
client_id: saltybot-jetson
keepalive: 60 # MQTT keepalive interval (seconds)
reconnect_delay: 5.0 # Seconds between MQTT reconnect attempts
# TLS (set mqtt_port: 8883 and provide CA cert path)
tls_enable: false
tls_ca_cert: "" # e.g. /etc/ssl/certs/mqtt-ca.crt
# Per-topic outbound rate limits (Hz). Keeps cellular bandwidth < 50 KB/s.
# At 1 Hz per topic × ~100 bytes/msg × 6 topics ≈ 600 bytes/s = 0.6 KB/s
imu_rate: 5.0 # Hz — /saltybot/imu → saltybot/imu
gps_rate: 1.0 # Hz — /gps/fix, /gps/vel
person_rate: 2.0 # Hz — /person/target
uwb_rate: 1.0 # Hz — /uwb/ranges
cellular_rate: 0.2 # Hz — /cellular/status (once per 5s)

View File

@ -0,0 +1,194 @@
"""
cellular.launch.py Launch all saltybot_cellular nodes.
Starts three nodes:
gps_driver_node NMEA GPS /gps/fix, /gps/vel
cellular_manager_node AT modem monitor /cellular/status, /cellular/rssi
mqtt_bridge_node ROS2 MQTT relay over cellular
Prerequisites:
1. SIM7600X HAT connected via USB /dev/ttyUSB0-2 must exist
2. Nano-SIM with active data plan inserted
3. NetworkManager profile "saltybot-cellular" configured (for nmcli method):
nmcli connection add type gsm ifname '*' con-name saltybot-cellular apn <your-apn>
Usage:
# Defaults (all params from cellular_params.yaml):
ros2 launch saltybot_cellular cellular.launch.py
# GPS only (skip modem + MQTT):
ros2 launch saltybot_cellular cellular.launch.py \
launch_cellular_manager:=false launch_mqtt_bridge:=false
# Custom MQTT broker:
ros2 launch saltybot_cellular cellular.launch.py mqtt_broker:=192.168.1.100
# Override params file:
ros2 launch saltybot_cellular cellular.launch.py \
params_file:=/path/to/my_cellular_params.yaml
GPS topics:
/gps/fix (sensor_msgs/NavSatFix)
/gps/vel (geometry_msgs/TwistStamped)
Cellular topics:
/cellular/status (diagnostic_msgs/DiagnosticArray)
/cellular/rssi (std_msgs/Int32)
/cellular/connected (std_msgs/Bool)
MQTT relay (ROS2MQTT):
saltybot/imu, saltybot/gps/fix, saltybot/gps/vel, saltybot/uwb/ranges,
saltybot/person, saltybot/cellular
MQTT relay (MQTTROS2):
saltybot/cmd /saltybot/cmd, saltybot/estop /saltybot/estop
"""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument, OpaqueFunction
from launch.conditions import IfCondition
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def _launch_nodes(context, *args, **kwargs):
params_file = LaunchConfiguration("params_file").perform(context)
def _b(name):
return LaunchConfiguration(name).perform(context).lower() == "true"
def _s(name):
return LaunchConfiguration(name).perform(context)
def _f(name):
return float(LaunchConfiguration(name).perform(context))
def _i(name):
return int(LaunchConfiguration(name).perform(context))
gps_params = {
"gps_port": _s("gps_port"),
"at_port": _s("at_port"),
"baud_rate": _i("baud_rate"),
"publish_rate": _f("gps_publish_rate"),
"frame_id": _s("frame_id"),
"gps_init_enable": _b("gps_init_enable"),
"gps_init_cmd": _s("gps_init_cmd"),
"gps_accuracy": _f("gps_accuracy"),
}
cellular_params = {
"at_port": _s("at_port"),
"baud_rate": _i("baud_rate"),
"poll_rate": _f("cellular_poll_rate"),
"connection_method": _s("connection_method"),
"nmcli_profile": _s("nmcli_profile"),
"apn": _s("apn"),
"reconnect_interval": _f("reconnect_interval"),
}
mqtt_params = {
"mqtt_broker": _s("mqtt_broker"),
"mqtt_port": _i("mqtt_port"),
"mqtt_user": _s("mqtt_user"),
"mqtt_password": _s("mqtt_password"),
"topic_prefix": _s("topic_prefix"),
"tls_enable": _b("tls_enable"),
"tls_ca_cert": _s("tls_ca_cert"),
"imu_rate": _f("imu_rate"),
"gps_rate": _f("gps_rate"),
"person_rate": _f("person_rate"),
"uwb_rate": _f("uwb_rate"),
}
def _node_params(inline):
return [params_file, inline] if params_file else [inline]
nodes = [
Node(
package="saltybot_cellular",
executable="gps_driver_node",
name="gps_driver",
output="screen",
parameters=_node_params(gps_params),
),
]
if _b("launch_cellular_manager"):
nodes.append(Node(
package="saltybot_cellular",
executable="cellular_manager_node",
name="cellular_manager",
output="screen",
parameters=_node_params(cellular_params),
))
if _b("launch_mqtt_bridge"):
nodes.append(Node(
package="saltybot_cellular",
executable="mqtt_bridge_node",
name="mqtt_bridge",
output="screen",
parameters=_node_params(mqtt_params),
))
return nodes
def generate_launch_description():
pkg_share = get_package_share_directory("saltybot_cellular")
default_params = os.path.join(pkg_share, "config", "cellular_params.yaml")
return LaunchDescription([
DeclareLaunchArgument(
"params_file", default_value=default_params,
description="Full path to cellular_params.yaml (overrides inline args)"),
# Node enable/disable
DeclareLaunchArgument(
"launch_cellular_manager", default_value="true",
description="Launch cellular_manager_node"),
DeclareLaunchArgument(
"launch_mqtt_bridge", default_value="true",
description="Launch mqtt_bridge_node"),
# GPS driver
DeclareLaunchArgument("gps_port", default_value="/dev/ttyUSB2"),
DeclareLaunchArgument("at_port", default_value="/dev/ttyUSB0"),
DeclareLaunchArgument("baud_rate", default_value="115200"),
DeclareLaunchArgument("gps_publish_rate", default_value="1.0",
description="GPS publish rate (Hz)"),
DeclareLaunchArgument("frame_id", default_value="gps"),
DeclareLaunchArgument("gps_init_enable", default_value="true",
description="Send AT+CGPS=1 on startup"),
DeclareLaunchArgument("gps_init_cmd", default_value="AT+CGPS=1"),
DeclareLaunchArgument("gps_accuracy", default_value="2.5",
description="GPS CEP accuracy at HDOP=1 (m)"),
# Cellular manager
DeclareLaunchArgument("cellular_poll_rate", default_value="0.2",
description="AT poll rate (Hz)"),
DeclareLaunchArgument("connection_method", default_value="nmcli",
description="nmcli | pppd | none"),
DeclareLaunchArgument("nmcli_profile", default_value="saltybot-cellular"),
DeclareLaunchArgument("apn", default_value=""),
DeclareLaunchArgument("reconnect_interval", default_value="30.0"),
# MQTT bridge
DeclareLaunchArgument("mqtt_broker", default_value="mqtt.saltylab.local"),
DeclareLaunchArgument("mqtt_port", default_value="1883"),
DeclareLaunchArgument("mqtt_user", default_value=""),
DeclareLaunchArgument("mqtt_password", default_value=""),
DeclareLaunchArgument("topic_prefix", default_value="saltybot"),
DeclareLaunchArgument("tls_enable", default_value="false"),
DeclareLaunchArgument("tls_ca_cert", default_value=""),
DeclareLaunchArgument("imu_rate", default_value="5.0"),
DeclareLaunchArgument("gps_rate", default_value="1.0"),
DeclareLaunchArgument("person_rate", default_value="2.0"),
DeclareLaunchArgument("uwb_rate", default_value="1.0"),
OpaqueFunction(function=_launch_nodes),
])

View File

@ -0,0 +1,31 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_cellular</name>
<version>0.1.0</version>
<description>
Waveshare SIM7600X 4G HAT driver for SaltyBot.
Provides GPS (NMEA → NavSatFix), cellular modem monitoring (AT commands),
and MQTT telemetry relay over 4G LTE.
Hardware: SIMCom SIM7600A-H, LTE Cat-4, GPS+GLONASS+BeiDou+Galileo.
</description>
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>sensor_msgs</depend>
<depend>geometry_msgs</depend>
<depend>std_msgs</depend>
<depend>diagnostic_msgs</depend>
<buildtool_depend>ament_python</buildtool_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,362 @@
"""
cellular_manager_node.py SIM7600X 4G modem status monitor + connection manager.
Hardware: Waveshare SIM7600X 4G HAT (SIMCom SIM7600A-H)
AT command port: /dev/ttyUSB0 (115200 baud)
Data connection managed by NetworkManager (nmcli) or pppd
Publishes:
/cellular/status (diagnostic_msgs/DiagnosticArray) rssi, network, connected
/cellular/rssi (std_msgs/Int32) signal strength dBm (-113..0)
/cellular/connected (std_msgs/Bool) data connection active
Key AT commands used:
AT+CSQ signal quality (rssi 0-31, ber)
AT+CREG? network registration status
AT+COPS? operator name
AT+CGDCONT? PDP context / APN
4G data connection management:
connection_method = "nmcli": uses `nmcli connection up <profile>`
connection_method = "pppd": uses pppd with provided dial script
connection_method = "none": monitor only, no auto-connect
"""
import subprocess
import threading
import time
import rclpy
from rclpy.node import Node
from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
from std_msgs.msg import Bool, Int32
try:
import serial
_SERIAL_OK = True
except ImportError:
_SERIAL_OK = False
# ── Pure AT response parsing functions ────────────────────────────────────────
CREG_STATUS = {
"0": "not_registered",
"1": "registered_home",
"2": "searching",
"3": "denied",
"4": "unknown",
"5": "roaming",
}
def parse_csq(response: str) -> dict:
"""
Parse AT+CSQ response.
Expected response: '+CSQ: <rssi>,<ber>'
rssi: 0-31 -113 + 2*rssi dBm; 99 = unknown
ber: 0-7 (bit error rate); 99 = unknown
Returns dict with rssi_raw, rssi_dbm, ber, valid.
"""
for line in response.splitlines():
line = line.strip()
if not line.startswith("+CSQ:"):
continue
parts = line[5:].strip().split(",")
if len(parts) < 2:
continue
try:
rssi_raw = int(parts[0].strip())
ber = int(parts[1].strip())
except ValueError:
continue
if rssi_raw == 99:
return {"rssi_raw": 99, "rssi_dbm": None, "ber": ber, "valid": False}
return {
"rssi_raw": rssi_raw,
"rssi_dbm": -113 + 2 * rssi_raw,
"ber": ber,
"valid": True,
}
return {"rssi_raw": 99, "rssi_dbm": None, "ber": 99, "valid": False}
def parse_creg(response: str) -> dict:
"""
Parse AT+CREG? response.
Expected: '+CREG: <n>,<stat>' or '+CREG: <stat>'
Returns dict with stat_raw, status_str, registered.
"""
for line in response.splitlines():
line = line.strip()
if not line.startswith("+CREG:"):
continue
payload = line[6:].strip()
parts = payload.split(",")
# Can be '<n>,<stat>' or just '<stat>'
stat_str = parts[-1].strip()
stat_str = stat_str.split()[0] # strip trailing chars
status = CREG_STATUS.get(stat_str, "unknown")
return {
"stat_raw": stat_str,
"status_str": status,
"registered": stat_str in ("1", "5"),
}
return {"stat_raw": "?", "status_str": "unknown", "registered": False}
def parse_cops(response: str) -> str:
"""
Parse AT+COPS? response.
Expected: '+COPS: <mode>,<format>,<oper>[,<AcT>]'
Returns operator name string, or '' if not available.
"""
for line in response.splitlines():
line = line.strip()
if not line.startswith("+COPS:"):
continue
parts = line[6:].strip().split(",")
if len(parts) >= 3:
return parts[2].strip().strip('"')
return ""
def rssi_to_quality_label(rssi_dbm) -> str:
"""Human-readable signal quality label from RSSI dBm."""
if rssi_dbm is None:
return "no_signal"
if rssi_dbm >= -70:
return "excellent"
if rssi_dbm >= -85:
return "good"
if rssi_dbm >= -100:
return "fair"
return "poor"
# ── AT command helper ─────────────────────────────────────────────────────────
def _at_command(ser, cmd: str, timeout: float = 2.0) -> str:
"""Send AT command, return response string (up to OK/ERROR)."""
ser.reset_input_buffer()
ser.write((cmd + "\r\n").encode("ascii"))
deadline = time.monotonic() + timeout
buf = ""
while time.monotonic() < deadline:
chunk = ser.read(ser.in_waiting or 1).decode("ascii", errors="replace")
buf += chunk
if "OK\r\n" in buf or "ERROR" in buf:
break
return buf
# ── ROS2 Node ─────────────────────────────────────────────────────────────────
class CellularManagerNode(Node):
def __init__(self):
super().__init__("cellular_manager")
self.declare_parameter("at_port", "/dev/ttyUSB0")
self.declare_parameter("baud_rate", 115200)
self.declare_parameter("poll_rate", 0.2) # Hz (every 5s)
self.declare_parameter("at_timeout", 2.0) # s per AT command
self.declare_parameter("connection_method", "nmcli") # nmcli|pppd|none
self.declare_parameter("nmcli_profile", "saltybot-cellular")
self.declare_parameter("apn", "")
self.declare_parameter("reconnect_interval", 30.0) # s between reconnect tries
p = {k: self.get_parameter(k).value for k in [
"at_port", "baud_rate", "poll_rate", "at_timeout",
"connection_method", "nmcli_profile", "apn", "reconnect_interval",
]}
self._p = p
# State
self._csq = {"rssi_raw": 99, "rssi_dbm": None, "ber": 99, "valid": False}
self._creg = {"stat_raw": "?", "status_str": "unknown", "registered": False}
self._operator = ""
self._connected = False
self._last_reconnect = 0.0
self._lock = threading.Lock()
# Publishers
self._status_pub = self.create_publisher(DiagnosticArray, "/cellular/status", 10)
self._rssi_pub = self.create_publisher(Int32, "/cellular/rssi", 10)
self._conn_pub = self.create_publisher(Bool, "/cellular/connected", 10)
if not _SERIAL_OK:
self.get_logger().error("pyserial not installed — cellular manager inactive")
return
# Background AT poll thread
self._running = True
self._poll_thread = threading.Thread(target=self._poll_loop, daemon=True)
self._poll_thread.start()
# Publish timer (same rate as poll)
self.create_timer(1.0 / p["poll_rate"], self._publish_cb)
self.get_logger().info(
f"CellularManager ready at_port={p['at_port']} "
f"method={p['connection_method']}")
# ── AT poll loop ──────────────────────────────────────────────────────────
def _poll_loop(self):
interval = 1.0 / self._p["poll_rate"]
while self._running:
try:
with serial.Serial(
self._p["at_port"], self._p["baud_rate"],
timeout=self._p["at_timeout"] + 0.5) as ser:
self.get_logger().info(
f"AT port open: {self._p['at_port']}")
while self._running:
t0 = time.monotonic()
self._poll_once(ser)
elapsed = time.monotonic() - t0
if elapsed < interval:
time.sleep(interval - elapsed)
except serial.SerialException as exc:
if self._running:
self.get_logger().warn(
f"AT serial error ({exc}) — retrying in 5s")
time.sleep(5.0)
def _poll_once(self, ser):
"""Query AT+CSQ, AT+CREG?, AT+COPS? and update state."""
try:
csq_resp = _at_command(ser, "AT+CSQ", self._p["at_timeout"])
creg_resp = _at_command(ser, "AT+CREG?", self._p["at_timeout"])
cops_resp = _at_command(ser, "AT+COPS?", self._p["at_timeout"])
except Exception as exc:
self.get_logger().warn(f"AT poll error: {exc}")
return
csq = parse_csq(csq_resp)
creg = parse_creg(creg_resp)
operator = parse_cops(cops_resp)
connected = self._check_connectivity()
with self._lock:
self._csq = csq
self._creg = creg
self._operator = operator
self._connected = connected
# Auto-reconnect if data link is down
if not connected and self._p["connection_method"] != "none":
now = time.monotonic()
if now - self._last_reconnect > self._p["reconnect_interval"]:
self._last_reconnect = now
self._attempt_reconnect()
# ── Connectivity check ────────────────────────────────────────────────────
def _check_connectivity(self) -> bool:
"""Return True if cellular data connection is active."""
method = self._p["connection_method"]
try:
if method == "nmcli":
result = subprocess.run(
["nmcli", "-t", "-f", "GENERAL.STATE",
"con", "show", self._p["nmcli_profile"]],
capture_output=True, text=True, timeout=5.0)
return "activated" in result.stdout.lower()
if method == "pppd":
result = subprocess.run(
["ip", "link", "show", "ppp0"],
capture_output=True, text=True, timeout=3.0)
return result.returncode == 0 and "UP" in result.stdout
except Exception:
pass
return False
def _attempt_reconnect(self):
"""Attempt to bring up cellular data connection."""
method = self._p["connection_method"]
self.get_logger().info(f"Attempting cellular reconnect ({method})")
try:
if method == "nmcli":
subprocess.run(
["nmcli", "connection", "up", self._p["nmcli_profile"]],
timeout=30.0, check=False)
elif method == "pppd":
subprocess.run(
["systemctl", "restart", "pppd-cellular"],
timeout=15.0, check=False)
except Exception as exc:
self.get_logger().warn(f"Reconnect failed: {exc}")
# ── Publish callback ──────────────────────────────────────────────────────
def _publish_cb(self):
with self._lock:
csq = self._csq
creg = self._creg
operator = self._operator
connected = self._connected
now = self.get_clock().now().to_msg()
# DiagnosticArray — full status
ds = DiagnosticStatus()
ds.name = "Cellular"
ds.hardware_id = "SIM7600A-H"
ds.level = DiagnosticStatus.OK if connected else DiagnosticStatus.WARN
ds.message = (
f"{'Connected' if connected else 'Disconnected'} | "
f"{operator or 'unknown'} | "
f"{creg['status_str']} | "
f"{rssi_to_quality_label(csq['rssi_dbm'])}"
)
ds.values = [
KeyValue(key="connected", value=str(connected)),
KeyValue(key="registration", value=creg["status_str"]),
KeyValue(key="operator", value=operator),
KeyValue(key="rssi_raw", value=str(csq["rssi_raw"])),
KeyValue(key="rssi_dbm", value=str(csq["rssi_dbm"] or "unknown")),
KeyValue(key="signal_quality", value=rssi_to_quality_label(csq["rssi_dbm"])),
KeyValue(key="ber", value=str(csq["ber"])),
]
arr = DiagnosticArray()
arr.header.stamp = now
arr.status = [ds]
self._status_pub.publish(arr)
# /cellular/rssi (Int32, dBm; 0 if unknown)
rssi_msg = Int32()
rssi_msg.data = csq["rssi_dbm"] if csq["rssi_dbm"] is not None else 0
self._rssi_pub.publish(rssi_msg)
# /cellular/connected (Bool)
conn_msg = Bool()
conn_msg.data = connected
self._conn_pub.publish(conn_msg)
def destroy_node(self):
self._running = False
super().destroy_node()
# ── Entry point ───────────────────────────────────────────────────────────────
def main(args=None):
rclpy.init(args=args)
node = CellularManagerNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.try_shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,402 @@
"""
gps_driver_node.py SIM7600X GPS NMEA ROS2 NavSatFix + TwistStamped.
Hardware: Waveshare SIM7600X 4G HAT (SIMCom SIM7600A-H)
GNSS: GPS + GLONASS + BeiDou + Galileo, ±2.5m CEP
GPS serial port: /dev/ttyUSB2 (NMEA output, 115200 baud)
AT port: /dev/ttyUSB0 (used to start GNSS receiver: AT+CGPS=1)
Publishes:
/gps/fix (sensor_msgs/NavSatFix) position with covariance from HDOP
/gps/vel (geometry_msgs/TwistStamped) ground speed + course (from NMEA RMC)
/diagnostics fix status, satellites, HDOP
Startup sequence:
1. Optional: open AT port send AT+CGPS=1 to start GNSS receiver
2. Open GPS serial port read NMEA sentences in background thread
3. Publish latest GGA + RMC data at publish_rate Hz
"""
import math
import threading
import time
import rclpy
from rclpy.node import Node
from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
from geometry_msgs.msg import TwistStamped
from sensor_msgs.msg import NavSatFix, NavSatStatus
try:
import serial
_SERIAL_OK = True
except ImportError:
_SERIAL_OK = False
# ── Pure NMEA parsing functions ───────────────────────────────────────────────
# Extracted here for unit tests — no ROS2 imports needed.
GPS_CEP_ACCURACY = 2.5 # metres at HDOP=1 (SIM7600A-H spec)
KNOTS_TO_MS = 0.514444
def nmea_checksum_valid(sentence: str) -> bool:
"""Validate NMEA checksum. Sentence must include the leading '$'."""
if '*' not in sentence:
return False
data, checksum_str = sentence[1:].rsplit('*', 1)
try:
expected = int(checksum_str.strip()[:2], 16)
except ValueError:
return False
computed = 0
for ch in data:
computed ^= ord(ch)
return computed == expected
def nmea_dm_to_degrees(dm_str: str, direction: str) -> float:
"""
Convert NMEA DDMM.MMMM or DDDMM.MMMM to decimal degrees.
NMEA format: degrees are everything up to the last two digits before the
decimal point (minutes field is always MM.fractional).
"""
if not dm_str:
return None
dot = dm_str.index('.')
deg = float(dm_str[:dot - 2])
mins = float(dm_str[dot - 2:])
decimal = deg + mins / 60.0
if direction in ('S', 'W'):
decimal = -decimal
return decimal
def parse_gga(sentence: str) -> dict:
"""
Parse NMEA GGA sentence (any talker: GP/GN/GL/GA/GB).
Returns dict with keys:
latitude, longitude, altitude, fix_quality, num_satellites, hdop, utc_time
Returns None on parse error or no-fix (fix_quality == 0).
GGA field indices (0-based after splitting by comma):
0 Talker+GGA 1 UTC 2 Lat 3 N/S
4 Lon 5 E/W 6 Fix 7 NumSV
8 HDOP 9 AltMSL 10 M 11 GeoidSep
12 M 13 Age 14 StaID*CS
"""
raw = sentence.strip()
if raw.startswith('$'):
raw = raw[1:]
if '*' in raw:
raw = raw[:raw.index('*')]
parts = raw.split(',')
if len(parts) < 10:
return None
if not parts[0].endswith('GGA'):
return None
try:
fix_quality = int(parts[6]) if parts[6] else 0
except ValueError:
return None
if fix_quality == 0:
return None
lat = nmea_dm_to_degrees(parts[2], parts[3])
lon = nmea_dm_to_degrees(parts[4], parts[5])
if lat is None or lon is None:
return None
try:
num_sats = int(parts[7]) if parts[7] else 0
hdop = float(parts[8]) if parts[8] else 99.0
alt_msl = float(parts[9]) if parts[9] else 0.0
geoid_sep = float(parts[11]) if len(parts) > 11 and parts[11] else 0.0
except (ValueError, IndexError):
return None
return {
'latitude': lat,
'longitude': lon,
'altitude': alt_msl + geoid_sep, # WGS84 ellipsoidal height
'fix_quality': fix_quality,
'num_satellites': num_sats,
'hdop': hdop,
'utc_time': parts[1],
}
def parse_rmc(sentence: str) -> dict:
"""
Parse NMEA RMC sentence (any talker).
Returns dict with keys:
speed_mps, course_deg, valid, utc_time
Returns None on parse error or void status (status == 'V').
RMC field indices:
0 Talker+RMC 1 UTC 2 Status(A/V) 3 Lat 4 N/S
5 Lon 6 E/W 7 SpeedKnots 8 CourseDeg 9 Date
10 MagVar 11 MagDir 12 Mode*CS
"""
raw = sentence.strip()
if raw.startswith('$'):
raw = raw[1:]
if '*' in raw:
raw = raw[:raw.index('*')]
parts = raw.split(',')
if len(parts) < 9:
return None
if not parts[0].endswith('RMC'):
return None
if parts[2] != 'A': # 'V' = void / no fix
return None
try:
speed_knots = float(parts[7]) if parts[7] else 0.0
course_deg = float(parts[8]) if parts[8] else 0.0
except ValueError:
return None
return {
'speed_mps': speed_knots * KNOTS_TO_MS,
'course_deg': course_deg,
'valid': True,
'utc_time': parts[1],
}
def hdop_to_covariance(hdop: float, accuracy_m: float = GPS_CEP_ACCURACY) -> list:
"""
Compute diagonal position covariance matrix (row-major 3×3) from HDOP.
sigma_h = hdop * accuracy_m (horizontal 1σ CEP * 1.2)
sigma_v = sigma_h * 1.5 (vertical is typically 1.5× worse)
"""
sigma_h = hdop * accuracy_m
sigma_v = sigma_h * 1.5
return [
sigma_h**2, 0.0, 0.0,
0.0, sigma_h**2, 0.0,
0.0, 0.0, sigma_v**2,
]
def fix_quality_to_status(fix_quality: int) -> int:
"""Map NMEA GGA fix quality to NavSatStatus.status."""
if fix_quality <= 0:
return NavSatStatus.STATUS_NO_FIX # -1
if fix_quality == 2:
return NavSatStatus.STATUS_SBAS_FIX # 1 (DGPS)
if fix_quality in (4, 5):
return NavSatStatus.STATUS_GBAS_FIX # 2 (RTK)
return NavSatStatus.STATUS_FIX # 0 (standard GPS)
def course_to_velocity_enu(speed_mps: float, course_deg: float) -> tuple:
"""
Convert scalar ground speed + course-over-ground to ENU velocity components.
Returns (v_east, v_north).
Course: 0° = North, 90° = East (compass convention).
"""
course_rad = math.radians(course_deg)
v_north = speed_mps * math.cos(course_rad)
v_east = speed_mps * math.sin(course_rad)
return v_east, v_north
# ── ROS2 Node ─────────────────────────────────────────────────────────────────
class GpsDriverNode(Node):
def __init__(self):
super().__init__("gps_driver")
self.declare_parameter("gps_port", "/dev/ttyUSB2")
self.declare_parameter("at_port", "/dev/ttyUSB0")
self.declare_parameter("baud_rate", 115200)
self.declare_parameter("publish_rate", 1.0)
self.declare_parameter("frame_id", "gps")
self.declare_parameter("gps_init_enable", True)
self.declare_parameter("gps_init_cmd", "AT+CGPS=1")
self.declare_parameter("at_init_timeout", 3.0)
self.declare_parameter("gps_accuracy", GPS_CEP_ACCURACY)
p = {
"gps_port": self.get_parameter("gps_port").value,
"at_port": self.get_parameter("at_port").value,
"baud_rate": self.get_parameter("baud_rate").value,
"publish_rate": self.get_parameter("publish_rate").value,
"frame_id": self.get_parameter("frame_id").value,
"gps_init_enable": self.get_parameter("gps_init_enable").value,
"gps_init_cmd": self.get_parameter("gps_init_cmd").value,
"at_init_timeout": self.get_parameter("at_init_timeout").value,
"gps_accuracy": self.get_parameter("gps_accuracy").value,
}
self._p = p
# Publishers
self._fix_pub = self.create_publisher(NavSatFix, "/gps/fix", 10)
self._vel_pub = self.create_publisher(TwistStamped, "/gps/vel", 10)
self._diag_pub = self.create_publisher(DiagnosticArray, "/diagnostics", 10)
# Latest parsed data (thread-safe via GIL for simple assignment)
self._gga = None # dict from parse_gga
self._rmc = None # dict from parse_rmc
self._ser = None
self._running = False
if not _SERIAL_OK:
self.get_logger().error("pyserial not installed — GPS node inactive")
return
# Optional: send AT+CGPS=1 to start GNSS
if p["gps_init_enable"]:
self._send_at_init()
# Start background serial reader
self._running = True
self._reader_thread = threading.Thread(
target=self._reader_loop, daemon=True)
self._reader_thread.start()
# Publish timer
self.create_timer(1.0 / p["publish_rate"], self._publish_cb)
self.get_logger().info(
f"GpsDriver ready port={p['gps_port']} rate={p['publish_rate']}Hz")
# ── AT init ───────────────────────────────────────────────────────────────
def _send_at_init(self):
"""Open AT port, send GPS-start command, close."""
try:
with serial.Serial(
self._p["at_port"], self._p["baud_rate"],
timeout=self._p["at_init_timeout"]) as at:
cmd = (self._p["gps_init_cmd"] + "\r\n").encode("ascii")
at.write(cmd)
resp = at.read_until(b"OK\r\n", size=256)
if b"OK" in resp:
self.get_logger().info("GPS init OK (AT+CGPS=1)")
else:
self.get_logger().warn(
f"GPS init: unexpected response: {resp!r}")
except serial.SerialException as exc:
self.get_logger().warn(
f"GPS AT init failed ({exc}) — assuming GPS already running")
# ── Background NMEA reader ─────────────────────────────────────────────────
def _reader_loop(self):
while self._running:
try:
with serial.Serial(
self._p["gps_port"], self._p["baud_rate"],
timeout=1.0) as ser:
self._ser = ser
self.get_logger().info(
f"GPS serial open: {self._p['gps_port']}")
while self._running:
line = ser.readline().decode("ascii", errors="replace").strip()
if not line:
continue
if not nmea_checksum_valid("$" + line.lstrip("$")):
continue
gga = parse_gga(line)
if gga:
self._gga = gga
rmc = parse_rmc(line)
if rmc:
self._rmc = rmc
except serial.SerialException as exc:
self._ser = None
if self._running:
self.get_logger().warn(
f"GPS serial error ({exc}) — retrying in 3s")
time.sleep(3.0)
except Exception as exc:
self.get_logger().error(f"GPS reader exception: {exc}")
time.sleep(3.0)
# ── Publish callback ──────────────────────────────────────────────────────
def _publish_cb(self):
now = self.get_clock().now().to_msg()
gga = self._gga
rmc = self._rmc
# NavSatFix
fix_msg = NavSatFix()
fix_msg.header.stamp = now
fix_msg.header.frame_id = self._p["frame_id"]
if gga:
fix_msg.status.service = (NavSatStatus.SERVICE_GPS |
NavSatStatus.SERVICE_GLONASS)
fix_msg.status.status = fix_quality_to_status(gga["fix_quality"])
fix_msg.latitude = gga["latitude"]
fix_msg.longitude = gga["longitude"]
fix_msg.altitude = gga["altitude"]
fix_msg.position_covariance = hdop_to_covariance(
gga["hdop"], self._p["gps_accuracy"])
fix_msg.position_covariance_type = NavSatFix.COVARIANCE_TYPE_APPROXIMATED
else:
fix_msg.status.status = NavSatStatus.STATUS_NO_FIX
fix_msg.position_covariance_type = NavSatFix.COVARIANCE_TYPE_UNKNOWN
self._fix_pub.publish(fix_msg)
# TwistStamped (ground velocity)
if rmc:
vel_msg = TwistStamped()
vel_msg.header.stamp = now
vel_msg.header.frame_id = "odom" # ENU velocity in odom frame
v_east, v_north = course_to_velocity_enu(
rmc["speed_mps"], rmc["course_deg"])
vel_msg.twist.linear.x = v_east # East
vel_msg.twist.linear.y = v_north # North
vel_msg.twist.angular.z = math.radians(rmc["course_deg"]) # heading
self._vel_pub.publish(vel_msg)
# Diagnostics
diag_arr = DiagnosticArray()
diag_arr.header.stamp = now
ds = DiagnosticStatus()
ds.name = "GPS"
ds.hardware_id = "SIM7600A-H"
if gga:
ds.level = DiagnosticStatus.OK
ds.message = f"Fix quality {gga['fix_quality']}, {gga['num_satellites']} sats"
ds.values = [
KeyValue(key="fix_quality", value=str(gga["fix_quality"])),
KeyValue(key="satellites", value=str(gga["num_satellites"])),
KeyValue(key="hdop", value=f"{gga['hdop']:.1f}"),
KeyValue(key="latitude", value=f"{gga['latitude']:.6f}"),
KeyValue(key="longitude", value=f"{gga['longitude']:.6f}"),
KeyValue(key="altitude_m", value=f"{gga['altitude']:.1f}"),
]
else:
ds.level = DiagnosticStatus.WARN
ds.message = "No GPS fix"
diag_arr.status = [ds]
self._diag_pub.publish(diag_arr)
def destroy_node(self):
self._running = False
super().destroy_node()
# ── Entry point ───────────────────────────────────────────────────────────────
def main(args=None):
rclpy.init(args=args)
node = GpsDriverNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.try_shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,317 @@
"""
mqtt_bridge_node.py ROS2 MQTT bridge for cellular telemetry relay.
Forwards key ROS2 topics to a remote MQTT broker over the cellular link,
and receives remote commands from MQTT, republishing them on ROS2.
Bandwidth budget: < 50 KB/s total over cellular.
At 1 Hz all topics: ~12 KB/s (well within budget).
ROS2 MQTT (telemetry, QoS 0):
/saltybot/imu <prefix>/imu (sensor_msgs/Imu)
/gps/fix <prefix>/gps/fix (sensor_msgs/NavSatFix)
/gps/vel <prefix>/gps/vel (geometry_msgs/TwistStamped)
/uwb/ranges <prefix>/uwb/ranges (std_msgs/String JSON)
/person/target <prefix>/person (geometry_msgs/PoseStamped)
/cellular/status <prefix>/cellular (diagnostic_msgs pass-through)
MQTT ROS2 (commands, QoS 1):
<prefix>/cmd /saltybot/cmd (std_msgs/String)
<prefix>/estop /saltybot/estop (std_msgs/Bool)
MQTT topic naming:
prefix default: "saltybot"
e.g. saltybot/gps/fix
Bandwidth throttling:
Each topic has a configurable max publish rate.
Default telemetry at 1 Hz, /saltybot/imu at 5 Hz (still manageable).
TLS support:
Set tls_enable: true and provide tls_ca_cert path for encrypted connection.
"""
import json
import math
import threading
import time
import rclpy
from rclpy.node import Node
from diagnostic_msgs.msg import DiagnosticArray
from geometry_msgs.msg import PoseStamped, TwistStamped
from sensor_msgs.msg import Imu, NavSatFix
from std_msgs.msg import Bool, String
try:
import paho.mqtt.client as mqtt
_MQTT_OK = True
except ImportError:
_MQTT_OK = False
# ── Pure serialisation functions ──────────────────────────────────────────────
# Extracted for unit tests — no ROS2 runtime needed.
def imu_to_dict(msg) -> dict:
"""Serialize sensor_msgs/Imu to bandwidth-minimal dict."""
return {
"ax": round(msg.linear_acceleration.x, 3),
"ay": round(msg.linear_acceleration.y, 3),
"az": round(msg.linear_acceleration.z, 3),
"gx": round(msg.angular_velocity.x, 4),
"gy": round(msg.angular_velocity.y, 4),
"gz": round(msg.angular_velocity.z, 4),
"t": msg.header.stamp.sec,
}
def navsatfix_to_dict(msg) -> dict:
"""Serialize sensor_msgs/NavSatFix to dict."""
return {
"lat": round(msg.latitude, 7),
"lon": round(msg.longitude, 7),
"alt": round(msg.altitude, 2),
"stat": int(msg.status.status),
"t": msg.header.stamp.sec,
}
def gps_vel_to_dict(msg) -> dict:
"""Serialize geometry_msgs/TwistStamped (GPS velocity) to dict."""
speed = math.sqrt(
msg.twist.linear.x ** 2 + msg.twist.linear.y ** 2)
heading_deg = math.degrees(msg.twist.angular.z) % 360.0
return {
"spd": round(speed, 2),
"hdg": round(heading_deg, 1),
"t": msg.header.stamp.sec,
}
def pose_to_dict(msg) -> dict:
"""Serialize geometry_msgs/PoseStamped to dict."""
return {
"x": round(msg.pose.position.x, 3),
"y": round(msg.pose.position.y, 3),
"z": round(msg.pose.position.z, 3),
"t": msg.header.stamp.sec,
}
# ── Rate limiter helper ───────────────────────────────────────────────────────
class _RateLimiter:
"""True if enough time has elapsed since last allowed call."""
def __init__(self, rate_hz: float):
self._interval = 1.0 / rate_hz if rate_hz > 0 else 0.0
self._last = 0.0
def allow(self) -> bool:
now = time.monotonic()
if now - self._last >= self._interval:
self._last = now
return True
return False
# ── ROS2 Node ─────────────────────────────────────────────────────────────────
class MqttBridgeNode(Node):
def __init__(self):
super().__init__("mqtt_bridge")
# ── Parameters ────────────────────────────────────────────────────────
self.declare_parameter("mqtt_broker", "mqtt.saltylab.local")
self.declare_parameter("mqtt_port", 1883)
self.declare_parameter("mqtt_user", "")
self.declare_parameter("mqtt_password", "")
self.declare_parameter("topic_prefix", "saltybot")
self.declare_parameter("client_id", "saltybot-jetson")
self.declare_parameter("tls_enable", False)
self.declare_parameter("tls_ca_cert", "")
self.declare_parameter("keepalive", 60)
self.declare_parameter("reconnect_delay", 5.0)
# Per-topic rate limits (Hz, 0 = unlimited but still bounded by ROS2 topic rate)
self.declare_parameter("imu_rate", 5.0)
self.declare_parameter("gps_rate", 1.0)
self.declare_parameter("person_rate", 2.0)
self.declare_parameter("uwb_rate", 1.0)
self.declare_parameter("cellular_rate", 0.2)
p = {k: self.get_parameter(k).value for k in [
"mqtt_broker", "mqtt_port", "mqtt_user", "mqtt_password",
"topic_prefix", "client_id", "tls_enable", "tls_ca_cert",
"keepalive", "reconnect_delay",
"imu_rate", "gps_rate", "person_rate", "uwb_rate", "cellular_rate",
]}
self._p = p
self._prefix = p["topic_prefix"].rstrip("/")
self._client = None
self._connected = False
self._lock = threading.Lock()
# Rate limiters per outbound topic
self._rl = {
"imu": _RateLimiter(p["imu_rate"]),
"gps": _RateLimiter(p["gps_rate"]),
"person": _RateLimiter(p["person_rate"]),
"uwb": _RateLimiter(p["uwb_rate"]),
"cellular": _RateLimiter(p["cellular_rate"]),
}
if not _MQTT_OK:
self.get_logger().error(
"paho-mqtt not installed (pip install paho-mqtt) — bridge inactive")
return
# ── MQTT client setup ──────────────────────────────────────────────────
self._client = mqtt.Client(client_id=p["client_id"])
if p["mqtt_user"]:
self._client.username_pw_set(p["mqtt_user"], p["mqtt_password"])
if p["tls_enable"] and p["tls_ca_cert"]:
self._client.tls_set(ca_certs=p["tls_ca_cert"])
self._client.on_connect = self._on_connect
self._client.on_disconnect = self._on_disconnect
self._client.on_message = self._on_mqtt_message
self._client.reconnect_delay_set(
min_delay=1, max_delay=int(p["reconnect_delay"]))
# Start MQTT loop in background thread
self._client.connect_async(p["mqtt_broker"], p["mqtt_port"], p["keepalive"])
self._client.loop_start()
# ── ROS2 subscriptions (telemetry out) ─────────────────────────────────
self.create_subscription(Imu, "/saltybot/imu", self._imu_cb, 10)
self.create_subscription(NavSatFix, "/gps/fix", self._gps_fix_cb, 10)
self.create_subscription(TwistStamped, "/gps/vel", self._gps_vel_cb, 10)
self.create_subscription(String, "/uwb/ranges", self._uwb_cb, 10)
self.create_subscription(PoseStamped, "/person/target", self._person_cb, 10)
self.create_subscription(DiagnosticArray, "/cellular/status", self._cellular_cb, 5)
# ── ROS2 publishers (commands in) ──────────────────────────────────────
self._cmd_pub = self.create_publisher(String, "/saltybot/cmd", 10)
self._estop_pub = self.create_publisher(Bool, "/saltybot/estop", 10)
self.get_logger().info(
f"MqttBridge ready broker={p['mqtt_broker']}:{p['mqtt_port']} "
f"prefix={self._prefix}")
# ── MQTT callbacks ────────────────────────────────────────────────────────
def _on_connect(self, client, userdata, flags, rc):
if rc == 0:
self._connected = True
self.get_logger().info(
f"MQTT connected to {self._p['mqtt_broker']}")
# Subscribe to inbound command topics (QoS 1)
client.subscribe(f"{self._prefix}/cmd", qos=1)
client.subscribe(f"{self._prefix}/estop", qos=1)
else:
self.get_logger().warn(f"MQTT connect failed rc={rc}")
def _on_disconnect(self, client, userdata, rc):
self._connected = False
if rc != 0:
self.get_logger().warn(f"MQTT disconnected unexpectedly (rc={rc}), reconnecting")
def _on_mqtt_message(self, client, userdata, msg):
"""Route inbound MQTT commands to ROS2 topics."""
try:
topic = msg.topic
payload = msg.payload.decode("utf-8", errors="replace")
suffix = topic[len(self._prefix):].lstrip("/")
if suffix == "cmd":
ros_msg = String()
ros_msg.data = payload
self._cmd_pub.publish(ros_msg)
elif suffix == "estop":
try:
data = json.loads(payload)
val = bool(data.get("estop", False))
except (json.JSONDecodeError, AttributeError):
val = payload.strip().lower() in ("true", "1", "yes")
ros_msg = Bool()
ros_msg.data = val
self._estop_pub.publish(ros_msg)
if val:
self.get_logger().warn("ESTOP received via MQTT")
except Exception as exc:
self.get_logger().error(f"MQTT message error: {exc}")
# ── Publish helper ────────────────────────────────────────────────────────
def _mqtt_publish(self, subtopic: str, payload: dict, qos: int = 0):
"""Serialize payload to JSON and publish to MQTT (QoS 0 by default)."""
if not self._connected or self._client is None:
return
try:
self._client.publish(
f"{self._prefix}/{subtopic}",
json.dumps(payload, separators=(",", ":")),
qos=qos,
)
except Exception as exc:
self.get_logger().warn(f"MQTT publish error: {exc}")
# ── ROS2 topic callbacks (telemetry out) ──────────────────────────────────
def _imu_cb(self, msg):
if self._rl["imu"].allow():
self._mqtt_publish("imu", imu_to_dict(msg))
def _gps_fix_cb(self, msg):
if self._rl["gps"].allow():
self._mqtt_publish("gps/fix", navsatfix_to_dict(msg))
def _gps_vel_cb(self, msg):
if self._rl["gps"].allow():
self._mqtt_publish("gps/vel", gps_vel_to_dict(msg))
def _uwb_cb(self, msg):
if self._rl["uwb"].allow():
try:
data = json.loads(msg.data)
except json.JSONDecodeError:
data = {"raw": msg.data}
self._mqtt_publish("uwb/ranges", data)
def _person_cb(self, msg):
if self._rl["person"].allow():
self._mqtt_publish("person", pose_to_dict(msg))
def _cellular_cb(self, msg):
if self._rl["cellular"].allow() and msg.status:
ds = msg.status[0]
d = {kv.key: kv.value for kv in ds.values}
d["level"] = ds.level
self._mqtt_publish("cellular", d)
def destroy_node(self):
if self._client:
self._client.loop_stop()
self._client.disconnect()
super().destroy_node()
# ── Entry point ───────────────────────────────────────────────────────────────
def main(args=None):
rclpy.init(args=args)
node = MqttBridgeNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.try_shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/saltybot_cellular
[install]
install_scripts=$base/lib/saltybot_cellular

View File

@ -0,0 +1,32 @@
from setuptools import setup
package_name = "saltybot_cellular"
setup(
name=package_name,
version="0.1.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(f"share/{package_name}/launch", ["launch/cellular.launch.py"]),
(f"share/{package_name}/config", ["config/cellular_params.yaml"]),
],
install_requires=["setuptools", "pyserial"],
zip_safe=True,
maintainer="sl-controls",
maintainer_email="sl-controls@saltylab.local",
description="SIM7600X 4G HAT driver: GPS NavSatFix + cellular modem + MQTT bridge",
license="MIT",
tests_require=["pytest"],
entry_points={
"console_scripts": [
# NMEA GPS → /gps/fix, /gps/vel
"gps_driver_node = saltybot_cellular.gps_driver_node:main",
# AT modem monitor → /cellular/status, /cellular/rssi, /cellular/connected
"cellular_manager_node = saltybot_cellular.cellular_manager_node:main",
# ROS2 ↔ MQTT telemetry relay
"mqtt_bridge_node = saltybot_cellular.mqtt_bridge_node:main",
],
},
)

View File

@ -0,0 +1,514 @@
"""
Unit tests for saltybot_cellular node logic.
No ROS2 runtime required tests pure functions mirrored from the nodes.
Run with: pytest jetson/ros2_ws/src/saltybot_cellular/test/test_cellular.py
"""
import math
import pytest
# ── Pure function mirrors — GPS ───────────────────────────────────────────────
KNOTS_TO_MS = 0.514444
GPS_ACCURACY = 2.5
def _nmea_checksum_valid(sentence):
if '*' not in sentence:
return False
data, cs_str = sentence[1:].rsplit('*', 1)
try:
expected = int(cs_str.strip()[:2], 16)
except ValueError:
return False
computed = 0
for ch in data:
computed ^= ord(ch)
return computed == expected
def _nmea_dm_to_degrees(dm_str, direction):
if not dm_str:
return None
dot = dm_str.index('.')
deg = float(dm_str[:dot - 2])
mins = float(dm_str[dot - 2:])
dec = deg + mins / 60.0
if direction in ('S', 'W'):
dec = -dec
return dec
def _parse_gga(sentence):
raw = sentence.strip()
if raw.startswith('$'):
raw = raw[1:]
if '*' in raw:
raw = raw[:raw.index('*')]
parts = raw.split(',')
if len(parts) < 10 or not parts[0].endswith('GGA'):
return None
try:
fix_quality = int(parts[6]) if parts[6] else 0
except ValueError:
return None
if fix_quality == 0:
return None
lat = _nmea_dm_to_degrees(parts[2], parts[3])
lon = _nmea_dm_to_degrees(parts[4], parts[5])
if lat is None or lon is None:
return None
try:
num_sats = int(parts[7]) if parts[7] else 0
hdop = float(parts[8]) if parts[8] else 99.0
alt_msl = float(parts[9]) if parts[9] else 0.0
geoid_sep = float(parts[11]) if len(parts) > 11 and parts[11] else 0.0
except (ValueError, IndexError):
return None
return {
'latitude': lat,
'longitude': lon,
'altitude': alt_msl + geoid_sep,
'fix_quality': fix_quality,
'num_satellites': num_sats,
'hdop': hdop,
'utc_time': parts[1],
}
def _parse_rmc(sentence):
raw = sentence.strip()
if raw.startswith('$'):
raw = raw[1:]
if '*' in raw:
raw = raw[:raw.index('*')]
parts = raw.split(',')
if len(parts) < 9 or not parts[0].endswith('RMC'):
return None
if parts[2] != 'A':
return None
try:
speed_mps = float(parts[7]) * KNOTS_TO_MS if parts[7] else 0.0
course_deg = float(parts[8]) if parts[8] else 0.0
except ValueError:
return None
return {'speed_mps': speed_mps, 'course_deg': course_deg, 'valid': True}
def _hdop_to_covariance(hdop, accuracy=GPS_ACCURACY):
sh = hdop * accuracy
sv = sh * 1.5
return [sh**2, 0, 0, 0, sh**2, 0, 0, 0, sv**2]
def _course_to_enu(speed_mps, course_deg):
r = math.radians(course_deg)
return speed_mps * math.sin(r), speed_mps * math.cos(r) # vE, vN
# ── Pure function mirrors — AT parsing ───────────────────────────────────────
_CREG_STATUS = {
"0": "not_registered", "1": "registered_home",
"2": "searching", "3": "denied",
"4": "unknown", "5": "roaming",
}
def _parse_csq(response):
for line in response.splitlines():
line = line.strip()
if not line.startswith("+CSQ:"):
continue
parts = line[5:].strip().split(",")
if len(parts) < 2:
continue
try:
rssi_raw = int(parts[0].strip())
ber = int(parts[1].strip())
except ValueError:
continue
if rssi_raw == 99:
return {"rssi_raw": 99, "rssi_dbm": None, "ber": ber, "valid": False}
return {"rssi_raw": rssi_raw, "rssi_dbm": -113 + 2 * rssi_raw,
"ber": ber, "valid": True}
return {"rssi_raw": 99, "rssi_dbm": None, "ber": 99, "valid": False}
def _parse_creg(response):
for line in response.splitlines():
line = line.strip()
if not line.startswith("+CREG:"):
continue
parts = line[6:].strip().split(",")
stat_str = parts[-1].strip().split()[0]
return {"stat_raw": stat_str,
"status_str": _CREG_STATUS.get(stat_str, "unknown"),
"registered": stat_str in ("1", "5")}
return {"stat_raw": "?", "status_str": "unknown", "registered": False}
def _parse_cops(response):
for line in response.splitlines():
if line.strip().startswith("+COPS:"):
parts = line.strip()[6:].strip().split(",")
if len(parts) >= 3:
return parts[2].strip().strip('"')
return ""
def _rssi_to_label(rssi_dbm):
if rssi_dbm is None:
return "no_signal"
if rssi_dbm >= -70:
return "excellent"
if rssi_dbm >= -85:
return "good"
if rssi_dbm >= -100:
return "fair"
return "poor"
# ── Pure function mirrors — MQTT serialisation ────────────────────────────────
def _imu_to_dict(ax, ay, az, gx, gy, gz, sec=0):
class Acc:
pass
class Msg:
pass
m = Msg()
m.linear_acceleration = Acc()
m.angular_velocity = Acc()
m.header = Acc()
m.header.stamp = Acc()
m.linear_acceleration.x = ax
m.linear_acceleration.y = ay
m.linear_acceleration.z = az
m.angular_velocity.x = gx
m.angular_velocity.y = gy
m.angular_velocity.z = gz
m.header.stamp.sec = sec
return {
"ax": round(ax, 3), "ay": round(ay, 3), "az": round(az, 3),
"gx": round(gx, 4), "gy": round(gy, 4), "gz": round(gz, 4),
"t": sec,
}
# ── NMEA checksum tests ───────────────────────────────────────────────────────
class TestNmeaChecksum:
def test_valid_gpgga(self):
# Valid GGA from standard NMEA example
s = "$GPGGA,123519,4807.038,N,01131.000,E,1,08,0.9,545.4,M,46.9,M,,*47"
assert _nmea_checksum_valid(s) is True
def test_valid_gnrmc(self):
s = "$GNRMC,123519.00,A,4807.0380,N,01131.0000,E,0.100,0.00,010101,,,A*7F"
assert _nmea_checksum_valid(s) is True
def test_invalid_checksum(self):
s = "$GPGGA,123519,4807.038,N,01131.000,E,1,08,0.9,545.4,M,46.9,M,,*FF"
assert _nmea_checksum_valid(s) is False
def test_no_checksum(self):
assert _nmea_checksum_valid("$GPGGA,123519,4807.038") is False
def test_empty_string(self):
assert _nmea_checksum_valid("") is False
# ── NMEA DM conversion tests ──────────────────────────────────────────────────
class TestNmeaDm:
def test_north_hemisphere(self):
# 4807.038N = 48° 7.038' = 48.1173°
assert _nmea_dm_to_degrees("4807.038", "N") == pytest.approx(48.1173, abs=1e-4)
def test_south_hemisphere_negative(self):
result = _nmea_dm_to_degrees("4807.038", "S")
assert result == pytest.approx(-48.1173, abs=1e-4)
def test_east_longitude(self):
# 01131.000E = 11° 31.000' = 11.5167°
assert _nmea_dm_to_degrees("01131.000", "E") == pytest.approx(11.5167, abs=1e-4)
def test_west_longitude_negative(self):
result = _nmea_dm_to_degrees("07937.600", "W")
assert result < 0
def test_zero(self):
assert _nmea_dm_to_degrees("0000.000", "N") == pytest.approx(0.0)
def test_empty_string_none(self):
assert _nmea_dm_to_degrees("", "N") is None
# ── GGA parsing tests ─────────────────────────────────────────────────────────
_GGA_VALID = "$GPGGA,123519,4807.038,N,01131.000,E,1,08,0.9,545.4,M,46.9,M,,*47"
_GGA_DGPS = "$GPGGA,123519,4807.038,N,01131.000,E,2,08,0.9,545.4,M,46.9,M,,*44"
_GGA_NOFIX = "$GPGGA,123519,,,,,0,00,99.9,,M,,M,,*71"
_GGA_GN = "$GNGGA,123519,4807.038,N,01131.000,E,1,12,0.7,545.4,M,46.9,M,,*4B"
class TestParseGga:
def test_valid_parse(self):
r = _parse_gga(_GGA_VALID)
assert r is not None
assert r["latitude"] == pytest.approx(48.1173, abs=1e-4)
assert r["longitude"] == pytest.approx(11.5167, abs=1e-4)
assert r["fix_quality"] == 1
assert r["num_satellites"] == 8
assert r["hdop"] == pytest.approx(0.9)
def test_altitude_is_msl_plus_geoid(self):
r = _parse_gga(_GGA_VALID)
# 545.4 MSL + 46.9 geoid = 592.3m ellipsoidal
assert r["altitude"] == pytest.approx(592.3, abs=0.01)
def test_no_fix_returns_none(self):
assert _parse_gga(_GGA_NOFIX) is None
def test_dgps_fix_quality_2(self):
r = _parse_gga(_GGA_DGPS)
assert r is not None
assert r["fix_quality"] == 2
def test_gn_talker_id_accepted(self):
r = _parse_gga(_GGA_GN)
assert r is not None
assert r["num_satellites"] == 12
def test_non_gga_returns_none(self):
assert _parse_gga("$GPGSV,3,1,12,...*XX") is None
def test_short_sentence_returns_none(self):
assert _parse_gga("$GPGGA,123519") is None
def test_empty_returns_none(self):
assert _parse_gga("") is None
def test_utc_time_field(self):
r = _parse_gga(_GGA_VALID)
assert r["utc_time"] == "123519"
# ── RMC parsing tests ─────────────────────────────────────────────────────────
_RMC_VALID = "$GNRMC,123519.00,A,4807.038,N,01131.000,E,1.234,181.5,010101,,,A*XX"
_RMC_VOID = "$GNRMC,123519.00,V,,,,,,,010101,,,N*XX"
class TestParseRmc:
def test_valid_speed(self):
r = _parse_rmc(_RMC_VALID)
assert r is not None
assert r["speed_mps"] == pytest.approx(1.234 * KNOTS_TO_MS, rel=1e-3)
def test_valid_course(self):
r = _parse_rmc(_RMC_VALID)
assert r["course_deg"] == pytest.approx(181.5, abs=0.01)
def test_void_returns_none(self):
assert _parse_rmc(_RMC_VOID) is None
def test_non_rmc_returns_none(self):
assert _parse_rmc(_GGA_VALID) is None
def test_zero_speed(self):
rmc = "$GNRMC,000000.00,A,4807.038,N,01131.000,E,0.000,0.0,010101,,,A*XX"
r = _parse_rmc(rmc)
assert r is not None
assert r["speed_mps"] == pytest.approx(0.0)
# ── HDOP covariance tests ─────────────────────────────────────────────────────
class TestHdopCovariance:
def test_hdop1_gives_accuracy_squared(self):
cov = _hdop_to_covariance(1.0)
assert cov[0] == pytest.approx(GPS_ACCURACY ** 2) # sigma_h^2
assert cov[4] == pytest.approx(GPS_ACCURACY ** 2)
assert cov[8] == pytest.approx((GPS_ACCURACY * 1.5) ** 2)
def test_hdop2_doubles_sigma(self):
cov1 = _hdop_to_covariance(1.0)
cov2 = _hdop_to_covariance(2.0)
# sigma doubles → variance quadruples
assert cov2[0] == pytest.approx(4.0 * cov1[0])
def test_off_diagonal_zero(self):
cov = _hdop_to_covariance(1.0)
assert cov[1] == pytest.approx(0.0)
assert cov[2] == pytest.approx(0.0)
assert cov[3] == pytest.approx(0.0)
def test_vertical_worse_than_horizontal(self):
cov = _hdop_to_covariance(1.0)
assert cov[8] > cov[0] # sigma_v > sigma_h
# ── Course → ENU velocity tests ───────────────────────────────────────────────
class TestCourseToEnu:
def test_north_course_zero_easting(self):
vE, vN = _course_to_enu(1.0, 0.0) # 0° = due North
assert vE == pytest.approx(0.0, abs=1e-9)
assert vN == pytest.approx(1.0)
def test_east_course_zero_northing(self):
vE, vN = _course_to_enu(1.0, 90.0) # 90° = due East
assert vE == pytest.approx(1.0)
assert vN == pytest.approx(0.0, abs=1e-9)
def test_south_course_negative_northing(self):
vE, vN = _course_to_enu(1.0, 180.0) # 180° = South
assert vN == pytest.approx(-1.0)
def test_speed_preserved(self):
speed = 3.5
vE, vN = _course_to_enu(speed, 45.0)
assert math.sqrt(vE**2 + vN**2) == pytest.approx(speed)
def test_zero_speed(self):
vE, vN = _course_to_enu(0.0, 90.0)
assert vE == pytest.approx(0.0)
assert vN == pytest.approx(0.0)
# ── AT+CSQ parsing tests ──────────────────────────────────────────────────────
class TestParseCsq:
def test_nominal_rssi(self):
r = _parse_csq("+CSQ: 18,0\r\nOK\r\n")
assert r["valid"] is True
assert r["rssi_raw"] == 18
assert r["rssi_dbm"] == -113 + 2 * 18 # = -77 dBm
assert r["ber"] == 0
def test_unknown_rssi_99(self):
r = _parse_csq("+CSQ: 99,99\r\nOK\r\n")
assert r["valid"] is False
assert r["rssi_dbm"] is None
def test_min_rssi_0(self):
r = _parse_csq("+CSQ: 0,0\r\nOK\r\n")
assert r["rssi_dbm"] == -113
def test_max_rssi_31(self):
r = _parse_csq("+CSQ: 31,0\r\nOK\r\n")
assert r["rssi_dbm"] == -113 + 62 # = -51 dBm
def test_empty_response(self):
r = _parse_csq("ERROR\r\n")
assert r["valid"] is False
def test_csq_without_space(self):
# Some modems omit the space after +CSQ:
r = _parse_csq("+CSQ:20,0\r\nOK\r\n")
assert r["valid"] is True
assert r["rssi_raw"] == 20
# ── AT+CREG? parsing tests ─────────────────────────────────────────────────────
class TestParseCreg:
def test_registered_home(self):
r = _parse_creg("+CREG: 0,1\r\nOK\r\n")
assert r["registered"] is True
assert r["status_str"] == "registered_home"
def test_roaming(self):
r = _parse_creg("+CREG: 0,5\r\nOK\r\n")
assert r["registered"] is True
assert r["status_str"] == "roaming"
def test_not_registered(self):
r = _parse_creg("+CREG: 0,0\r\nOK\r\n")
assert r["registered"] is False
def test_searching(self):
r = _parse_creg("+CREG: 0,2\r\nOK\r\n")
assert r["registered"] is False
assert r["status_str"] == "searching"
def test_stat_only_format(self):
r = _parse_creg("+CREG: 1\r\nOK\r\n")
assert r["registered"] is True
def test_empty_response(self):
r = _parse_creg("ERROR\r\n")
assert r["registered"] is False
# ── AT+COPS? parsing tests ─────────────────────────────────────────────────────
class TestParseCops:
def test_operator_name(self):
r = _parse_cops('+COPS: 0,0,"Fido",7\r\nOK\r\n')
assert r == "Fido"
def test_operator_with_spaces(self):
r = _parse_cops('+COPS: 0,0,"Rogers Wireless",7\r\nOK\r\n')
assert r == "Rogers Wireless"
def test_no_operator(self):
r = _parse_cops("+COPS: 0\r\nOK\r\n")
assert r == ""
def test_error_response(self):
r = _parse_cops("ERROR\r\n")
assert r == ""
# ── Signal quality label tests ────────────────────────────────────────────────
class TestRssiLabel:
def test_excellent(self):
assert _rssi_to_label(-65) == "excellent"
def test_good(self):
assert _rssi_to_label(-80) == "good"
def test_fair(self):
assert _rssi_to_label(-95) == "fair"
def test_poor(self):
assert _rssi_to_label(-105) == "poor"
def test_none_is_no_signal(self):
assert _rssi_to_label(None) == "no_signal"
def test_boundary_good_excellent(self):
assert _rssi_to_label(-70) == "excellent" # -70 >= -70 → excellent
def test_boundary_fair_good(self):
assert _rssi_to_label(-85) == "good" # -85 >= -85 → good
# ── MQTT serialisation tests ──────────────────────────────────────────────────
class TestMqttSerialisation:
def test_imu_dict_keys(self):
d = _imu_to_dict(9.81, 0.1, -0.2, 0.01, -0.02, 0.0)
assert set(d.keys()) == {"ax", "ay", "az", "gx", "gy", "gz", "t"}
def test_imu_precision(self):
d = _imu_to_dict(9.81234, 0.0, 0.0, 0.0, 0.0, 0.0)
assert d["ax"] == pytest.approx(9.812, abs=1e-3)
def test_gps_fix_dict(self):
# Simple inline navsatfix
class S: pass
m = S(); m.status = S(); m.header = S(); m.header.stamp = S()
m.latitude = 48.1173; m.longitude = 11.5167; m.altitude = 592.3
m.status.status = 0; m.header.stamp.sec = 1234
d = {
"lat": round(m.latitude, 7), "lon": round(m.longitude, 7),
"alt": round(m.altitude, 2), "stat": int(m.status.status),
"t": m.header.stamp.sec,
}
assert d["lat"] == pytest.approx(48.1173, abs=1e-4)
assert d["alt"] == pytest.approx(592.3)