feat: MQTT-to-ROS2 phone sensor bridge (Issue #601) #605
@ -0,0 +1,69 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
mqtt_bridge.launch.py — Launch the MQTT-to-ROS2 phone sensor bridge (Issue #601)
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
ros2 launch saltybot_phone mqtt_bridge.launch.py
|
||||||
|
ros2 launch saltybot_phone mqtt_bridge.launch.py mqtt_host:=192.168.1.100
|
||||||
|
ros2 launch saltybot_phone mqtt_bridge.launch.py use_phone_timestamp:=true warn_drift_s:=0.5
|
||||||
|
"""
|
||||||
|
from launch import LaunchDescription
|
||||||
|
from launch.actions import DeclareLaunchArgument
|
||||||
|
from launch.substitutions import LaunchConfiguration
|
||||||
|
from launch_ros.actions import Node
|
||||||
|
|
||||||
|
|
||||||
|
def generate_launch_description() -> LaunchDescription:
|
||||||
|
args = [
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
"mqtt_host", default_value="localhost",
|
||||||
|
description="MQTT broker IP/hostname",
|
||||||
|
),
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
"mqtt_port", default_value="1883",
|
||||||
|
description="MQTT broker port",
|
||||||
|
),
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
"reconnect_delay_s", default_value="5.0",
|
||||||
|
description="Seconds between MQTT reconnect attempts",
|
||||||
|
),
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
"use_phone_timestamp", default_value="false",
|
||||||
|
description="Use phone epoch ts for ROS2 header stamp",
|
||||||
|
),
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
"warn_drift_s", default_value="1.0",
|
||||||
|
description="Clock drift threshold for warning (s)",
|
||||||
|
),
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
"imu_accel_cov", default_value="0.01",
|
||||||
|
description="IMU accelerometer diagonal covariance (m/s²)²",
|
||||||
|
),
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
"imu_gyro_cov", default_value="0.001",
|
||||||
|
description="IMU gyroscope diagonal covariance (rad/s)²",
|
||||||
|
),
|
||||||
|
]
|
||||||
|
|
||||||
|
bridge_node = Node(
|
||||||
|
package="saltybot_phone",
|
||||||
|
executable="mqtt_ros2_bridge",
|
||||||
|
name="mqtt_ros2_bridge",
|
||||||
|
parameters=[{
|
||||||
|
"mqtt_host": LaunchConfiguration("mqtt_host"),
|
||||||
|
"mqtt_port": LaunchConfiguration("mqtt_port"),
|
||||||
|
"reconnect_delay_s": LaunchConfiguration("reconnect_delay_s"),
|
||||||
|
"frame_id_imu": "phone_imu",
|
||||||
|
"frame_id_gps": "phone_gps",
|
||||||
|
"use_phone_timestamp": LaunchConfiguration("use_phone_timestamp"),
|
||||||
|
"warn_drift_s": LaunchConfiguration("warn_drift_s"),
|
||||||
|
"imu_accel_cov": LaunchConfiguration("imu_accel_cov"),
|
||||||
|
"imu_gyro_cov": LaunchConfiguration("imu_gyro_cov"),
|
||||||
|
"gps_alt_cov_factor": 4.0,
|
||||||
|
"status_hz": 0.2,
|
||||||
|
}],
|
||||||
|
output="screen",
|
||||||
|
emulate_tty=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
return LaunchDescription(args + [bridge_node])
|
||||||
@ -0,0 +1,485 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
mqtt_ros2_bridge_node.py — MQTT-to-ROS2 bridge for Termux phone sensors (Issue #601)
|
||||||
|
|
||||||
|
Subscribes to the three MQTT topics published by phone/sensor_dashboard.py and
|
||||||
|
republishes each as a proper ROS2 message type on the Jetson.
|
||||||
|
|
||||||
|
MQTT topics consumed (sensor_dashboard.py JSON format)
|
||||||
|
───────────────────────────────────────────────────────
|
||||||
|
saltybot/phone/imu → sensor_msgs/Imu
|
||||||
|
saltybot/phone/gps → sensor_msgs/NavSatFix
|
||||||
|
saltybot/phone/battery → sensor_msgs/BatteryState
|
||||||
|
|
||||||
|
ROS2 topics published
|
||||||
|
──────────────────────
|
||||||
|
/saltybot/phone/imu sensor_msgs/Imu
|
||||||
|
/saltybot/phone/gps sensor_msgs/NavSatFix
|
||||||
|
/saltybot/phone/battery sensor_msgs/BatteryState
|
||||||
|
/saltybot/phone/bridge/status std_msgs/String (JSON health/stats)
|
||||||
|
|
||||||
|
Timestamp alignment
|
||||||
|
────────────────────
|
||||||
|
Header stamp = ROS2 wall clock (monotonic, consistent with rest of system).
|
||||||
|
Phone epoch ts from JSON is cross-checked to detect clock drift > warn_drift_s;
|
||||||
|
if drift is within tolerance the phone timestamp can be used instead by
|
||||||
|
setting use_phone_timestamp:=true.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
──────────
|
||||||
|
mqtt_host str Broker IP/hostname (default: localhost)
|
||||||
|
mqtt_port int Broker port (default: 1883)
|
||||||
|
mqtt_keepalive int Keepalive seconds (default: 60)
|
||||||
|
reconnect_delay_s float Delay between reconnects (default: 5.0)
|
||||||
|
frame_id_imu str TF frame for IMU (default: phone_imu)
|
||||||
|
frame_id_gps str TF frame for GPS (default: phone_gps)
|
||||||
|
use_phone_timestamp bool Use phone ts for header (default: false)
|
||||||
|
warn_drift_s float Clock drift warning thresh (default: 1.0)
|
||||||
|
imu_accel_cov float Accel diagonal covariance (default: 0.01)
|
||||||
|
imu_gyro_cov float Gyro diagonal covariance (default: 0.001)
|
||||||
|
gps_alt_cov_factor float Altitude cov = acc*factor (default: 4.0)
|
||||||
|
status_hz float Bridge status publish rate (default: 0.2)
|
||||||
|
|
||||||
|
Dependencies
|
||||||
|
────────────
|
||||||
|
pip install paho-mqtt (on Jetson)
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import math
|
||||||
|
import queue
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from sensor_msgs.msg import BatteryState, Imu, NavSatFix, NavSatStatus
|
||||||
|
from std_msgs.msg import Header, String
|
||||||
|
|
||||||
|
try:
|
||||||
|
import paho.mqtt.client as mqtt
|
||||||
|
_MQTT_OK = True
|
||||||
|
except ImportError:
|
||||||
|
_MQTT_OK = False
|
||||||
|
|
||||||
|
# ── MQTT topic constants (must match sensor_dashboard.py) ────────────────────
|
||||||
|
|
||||||
|
_TOPIC_IMU = "saltybot/phone/imu"
|
||||||
|
_TOPIC_GPS = "saltybot/phone/gps"
|
||||||
|
_TOPIC_BATTERY = "saltybot/phone/battery"
|
||||||
|
|
||||||
|
# ── BatteryState health/power-supply constants ───────────────────────────────
|
||||||
|
|
||||||
|
_HEALTH_MAP = {
|
||||||
|
"GOOD": BatteryState.POWER_SUPPLY_HEALTH_GOOD,
|
||||||
|
"OVERHEAT": BatteryState.POWER_SUPPLY_HEALTH_OVERHEAT,
|
||||||
|
"DEAD": BatteryState.POWER_SUPPLY_HEALTH_DEAD,
|
||||||
|
"OVER_VOLTAGE": BatteryState.POWER_SUPPLY_HEALTH_OVERVOLTAGE,
|
||||||
|
"UNSPECIFIED_FAILURE": BatteryState.POWER_SUPPLY_HEALTH_UNSPEC_FAILURE,
|
||||||
|
"COLD": BatteryState.POWER_SUPPLY_HEALTH_COLD,
|
||||||
|
"WATCHDOG_TIMER_EXPIRE": BatteryState.POWER_SUPPLY_HEALTH_WATCHDOG_TIMER_EXPIRE,
|
||||||
|
"SAFETY_TIMER_EXPIRE": BatteryState.POWER_SUPPLY_HEALTH_SAFETY_TIMER_EXPIRE,
|
||||||
|
}
|
||||||
|
|
||||||
|
_PLUGGED_MAP = {
|
||||||
|
"AC": BatteryState.POWER_SUPPLY_STATUS_CHARGING,
|
||||||
|
"USB": BatteryState.POWER_SUPPLY_STATUS_CHARGING,
|
||||||
|
"WIRELESS": BatteryState.POWER_SUPPLY_STATUS_CHARGING,
|
||||||
|
"UNPLUGGED": BatteryState.POWER_SUPPLY_STATUS_DISCHARGING,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class MqttRos2BridgeNode(Node):
|
||||||
|
"""
|
||||||
|
Bridges phone sensor MQTT topics into ROS2.
|
||||||
|
|
||||||
|
Architecture:
|
||||||
|
- paho-mqtt loop_start() runs the MQTT network thread independently.
|
||||||
|
- on_message callback enqueues raw (topic, payload) pairs.
|
||||||
|
- A ROS2 timer drains the queue and publishes ROS2 messages, keeping
|
||||||
|
all ROS2 operations on the executor thread.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
super().__init__("mqtt_ros2_bridge")
|
||||||
|
|
||||||
|
# ── Parameters ────────────────────────────────────────────────────────
|
||||||
|
self.declare_parameter("mqtt_host", "localhost")
|
||||||
|
self.declare_parameter("mqtt_port", 1883)
|
||||||
|
self.declare_parameter("mqtt_keepalive", 60)
|
||||||
|
self.declare_parameter("reconnect_delay_s", 5.0)
|
||||||
|
self.declare_parameter("frame_id_imu", "phone_imu")
|
||||||
|
self.declare_parameter("frame_id_gps", "phone_gps")
|
||||||
|
self.declare_parameter("use_phone_timestamp", False)
|
||||||
|
self.declare_parameter("warn_drift_s", 1.0)
|
||||||
|
self.declare_parameter("imu_accel_cov", 0.01)
|
||||||
|
self.declare_parameter("imu_gyro_cov", 0.001)
|
||||||
|
self.declare_parameter("gps_alt_cov_factor", 4.0)
|
||||||
|
self.declare_parameter("status_hz", 0.2)
|
||||||
|
|
||||||
|
p = self._p = self._load_params()
|
||||||
|
|
||||||
|
if not _MQTT_OK:
|
||||||
|
self.get_logger().error(
|
||||||
|
"paho-mqtt not installed. Run: pip install paho-mqtt"
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Publishers ────────────────────────────────────────────────────────
|
||||||
|
self._imu_pub = self.create_publisher(Imu, "/saltybot/phone/imu", 10)
|
||||||
|
self._gps_pub = self.create_publisher(NavSatFix, "/saltybot/phone/gps", 10)
|
||||||
|
self._bat_pub = self.create_publisher(BatteryState, "/saltybot/phone/battery", 10)
|
||||||
|
self._status_pub = self.create_publisher(String, "/saltybot/phone/bridge/status", 10)
|
||||||
|
|
||||||
|
# ── Message queue (MQTT thread → ROS2 executor thread) ────────────────
|
||||||
|
self._q: queue.Queue[tuple[str, bytes]] = queue.Queue(maxsize=200)
|
||||||
|
|
||||||
|
# ── Stats ─────────────────────────────────────────────────────────────
|
||||||
|
self._rx_count = {_TOPIC_IMU: 0, _TOPIC_GPS: 0, _TOPIC_BATTERY: 0}
|
||||||
|
self._pub_count = {_TOPIC_IMU: 0, _TOPIC_GPS: 0, _TOPIC_BATTERY: 0}
|
||||||
|
self._err_count = {_TOPIC_IMU: 0, _TOPIC_GPS: 0, _TOPIC_BATTERY: 0}
|
||||||
|
self._last_rx_ts = {_TOPIC_IMU: 0.0, _TOPIC_GPS: 0.0, _TOPIC_BATTERY: 0.0}
|
||||||
|
self._mqtt_connected = False
|
||||||
|
|
||||||
|
# ── Timers ────────────────────────────────────────────────────────────
|
||||||
|
# Drain queue at 50 Hz (>> sensor rates so latency stays low)
|
||||||
|
self.create_timer(0.02, self._drain_queue)
|
||||||
|
# Status publisher
|
||||||
|
self.create_timer(1.0 / max(p["status_hz"], 0.01), self._publish_status)
|
||||||
|
|
||||||
|
# ── MQTT client ───────────────────────────────────────────────────────
|
||||||
|
if _MQTT_OK:
|
||||||
|
self._mqtt = mqtt.Client(
|
||||||
|
client_id="saltybot-mqtt-ros2-bridge", clean_session=True
|
||||||
|
)
|
||||||
|
self._mqtt.on_connect = self._on_mqtt_connect
|
||||||
|
self._mqtt.on_disconnect = self._on_mqtt_disconnect
|
||||||
|
self._mqtt.on_message = self._on_mqtt_message
|
||||||
|
self._mqtt.reconnect_delay_set(
|
||||||
|
min_delay=int(p["reconnect_delay_s"]),
|
||||||
|
max_delay=int(p["reconnect_delay_s"]) * 4,
|
||||||
|
)
|
||||||
|
self._mqtt_connect()
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
"MQTT→ROS2 bridge started — broker=%s:%d use_phone_ts=%s",
|
||||||
|
p["mqtt_host"], p["mqtt_port"], p["use_phone_timestamp"],
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Param helper ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _load_params(self) -> dict:
|
||||||
|
return {
|
||||||
|
"mqtt_host": self.get_parameter("mqtt_host").value,
|
||||||
|
"mqtt_port": self.get_parameter("mqtt_port").value,
|
||||||
|
"mqtt_keepalive": self.get_parameter("mqtt_keepalive").value,
|
||||||
|
"reconnect_delay_s": self.get_parameter("reconnect_delay_s").value,
|
||||||
|
"frame_id_imu": self.get_parameter("frame_id_imu").value,
|
||||||
|
"frame_id_gps": self.get_parameter("frame_id_gps").value,
|
||||||
|
"use_phone_ts": self.get_parameter("use_phone_timestamp").value,
|
||||||
|
"warn_drift_s": self.get_parameter("warn_drift_s").value,
|
||||||
|
"imu_accel_cov": self.get_parameter("imu_accel_cov").value,
|
||||||
|
"imu_gyro_cov": self.get_parameter("imu_gyro_cov").value,
|
||||||
|
"gps_alt_cov_factor": self.get_parameter("gps_alt_cov_factor").value,
|
||||||
|
"status_hz": self.get_parameter("status_hz").value,
|
||||||
|
}
|
||||||
|
|
||||||
|
# ── MQTT connection ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _mqtt_connect(self) -> None:
|
||||||
|
try:
|
||||||
|
self._mqtt.connect_async(
|
||||||
|
self._p["mqtt_host"],
|
||||||
|
self._p["mqtt_port"],
|
||||||
|
keepalive=self._p["mqtt_keepalive"],
|
||||||
|
)
|
||||||
|
self._mqtt.loop_start()
|
||||||
|
except Exception as e:
|
||||||
|
self.get_logger().warning("MQTT initial connect error: %s", str(e))
|
||||||
|
|
||||||
|
def _on_mqtt_connect(self, client, userdata, flags, rc) -> None:
|
||||||
|
if rc == 0:
|
||||||
|
self._mqtt_connected = True
|
||||||
|
# Subscribe to all phone sensor topics
|
||||||
|
for topic in (_TOPIC_IMU, _TOPIC_GPS, _TOPIC_BATTERY):
|
||||||
|
client.subscribe(topic, qos=0)
|
||||||
|
self.get_logger().info(
|
||||||
|
"MQTT connected to %s:%d — subscribed to 3 phone topics",
|
||||||
|
self._p["mqtt_host"], self._p["mqtt_port"],
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
self.get_logger().warning("MQTT connect failed rc=%d", rc)
|
||||||
|
|
||||||
|
def _on_mqtt_disconnect(self, client, userdata, rc) -> None:
|
||||||
|
self._mqtt_connected = False
|
||||||
|
if rc != 0:
|
||||||
|
self.get_logger().warning(
|
||||||
|
"MQTT disconnected (rc=%d) — paho will reconnect", rc
|
||||||
|
)
|
||||||
|
|
||||||
|
def _on_mqtt_message(self, client, userdata, message) -> None:
|
||||||
|
"""Called in paho network thread — only enqueue, never publish here."""
|
||||||
|
topic = message.topic
|
||||||
|
if topic in self._rx_count:
|
||||||
|
self._rx_count[topic] += 1
|
||||||
|
self._last_rx_ts[topic] = time.time()
|
||||||
|
try:
|
||||||
|
self._q.put_nowait((topic, message.payload))
|
||||||
|
except queue.Full:
|
||||||
|
self.get_logger().debug("MQTT queue full — dropping %s", topic)
|
||||||
|
|
||||||
|
# ── Queue drain (ROS2 executor thread) ────────────────────────────────────
|
||||||
|
|
||||||
|
def _drain_queue(self) -> None:
|
||||||
|
"""Drain up to 50 queued messages per timer tick to bound latency."""
|
||||||
|
for _ in range(50):
|
||||||
|
try:
|
||||||
|
topic, payload = self._q.get_nowait()
|
||||||
|
except queue.Empty:
|
||||||
|
break
|
||||||
|
self._dispatch(topic, payload)
|
||||||
|
|
||||||
|
def _dispatch(self, topic: str, payload: bytes) -> None:
|
||||||
|
try:
|
||||||
|
data = json.loads(payload)
|
||||||
|
except (json.JSONDecodeError, UnicodeDecodeError) as e:
|
||||||
|
self._err_count.get(topic, {}) # just access
|
||||||
|
if topic in self._err_count:
|
||||||
|
self._err_count[topic] += 1
|
||||||
|
self.get_logger().debug("JSON decode error on %s: %s", topic, e)
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
if topic == _TOPIC_IMU:
|
||||||
|
self._handle_imu(data)
|
||||||
|
elif topic == _TOPIC_GPS:
|
||||||
|
self._handle_gps(data)
|
||||||
|
elif topic == _TOPIC_BATTERY:
|
||||||
|
self._handle_battery(data)
|
||||||
|
except (KeyError, TypeError, ValueError) as e:
|
||||||
|
if topic in self._err_count:
|
||||||
|
self._err_count[topic] += 1
|
||||||
|
self.get_logger().debug("Payload error on %s: %s — %s", topic, e, data)
|
||||||
|
|
||||||
|
# ── Timestamp helper ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _make_header(self, data: dict, frame_id: str) -> Header:
|
||||||
|
"""
|
||||||
|
Build a ROS2 Header.
|
||||||
|
|
||||||
|
By default uses the ROS2 wall clock so timestamps are consistent with
|
||||||
|
other nodes. If use_phone_ts is true and the phone's `ts` is within
|
||||||
|
warn_drift_s of wall time, uses the phone's epoch timestamp instead.
|
||||||
|
"""
|
||||||
|
hdr = Header()
|
||||||
|
hdr.frame_id = frame_id
|
||||||
|
|
||||||
|
phone_ts: float = float(data.get("ts", 0.0))
|
||||||
|
now_epoch = time.time()
|
||||||
|
|
||||||
|
if self._p["use_phone_ts"] and phone_ts > 0.0:
|
||||||
|
drift = abs(now_epoch - phone_ts)
|
||||||
|
if drift > self._p["warn_drift_s"]:
|
||||||
|
self.get_logger().warning(
|
||||||
|
"Phone clock drift %.2f s (frame_id=%s) — using ROS2 clock",
|
||||||
|
drift, frame_id,
|
||||||
|
)
|
||||||
|
hdr.stamp = self.get_clock().now().to_msg()
|
||||||
|
else:
|
||||||
|
# Convert phone epoch to ROS2 Time
|
||||||
|
sec = int(phone_ts)
|
||||||
|
nsec = int((phone_ts - sec) * 1e9)
|
||||||
|
hdr.stamp.sec = sec
|
||||||
|
hdr.stamp.nanosec = nsec
|
||||||
|
else:
|
||||||
|
hdr.stamp = self.get_clock().now().to_msg()
|
||||||
|
|
||||||
|
return hdr
|
||||||
|
|
||||||
|
# ── IMU handler ───────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _handle_imu(self, data: dict) -> None:
|
||||||
|
"""
|
||||||
|
JSON: {"ts":..., "accel":{"x","y","z"}, "gyro":{"x","y","z"}}
|
||||||
|
Accel units: m/s² Gyro units: rad/s
|
||||||
|
"""
|
||||||
|
accel = data["accel"]
|
||||||
|
gyro = data["gyro"]
|
||||||
|
|
||||||
|
# Validate finite values
|
||||||
|
for v in (accel["x"], accel["y"], accel["z"],
|
||||||
|
gyro["x"], gyro["y"], gyro["z"]):
|
||||||
|
if not math.isfinite(float(v)):
|
||||||
|
raise ValueError(f"non-finite IMU value: {v}")
|
||||||
|
|
||||||
|
cov_a = self._p["imu_accel_cov"]
|
||||||
|
cov_g = self._p["imu_gyro_cov"]
|
||||||
|
|
||||||
|
msg = Imu()
|
||||||
|
msg.header = self._make_header(data, self._p["frame_id_imu"])
|
||||||
|
|
||||||
|
msg.linear_acceleration.x = float(accel["x"])
|
||||||
|
msg.linear_acceleration.y = float(accel["y"])
|
||||||
|
msg.linear_acceleration.z = float(accel["z"])
|
||||||
|
msg.linear_acceleration_covariance = [
|
||||||
|
cov_a, 0.0, 0.0,
|
||||||
|
0.0, cov_a, 0.0,
|
||||||
|
0.0, 0.0, cov_a,
|
||||||
|
]
|
||||||
|
|
||||||
|
msg.angular_velocity.x = float(gyro["x"])
|
||||||
|
msg.angular_velocity.y = float(gyro["y"])
|
||||||
|
msg.angular_velocity.z = float(gyro["z"])
|
||||||
|
msg.angular_velocity_covariance = [
|
||||||
|
cov_g, 0.0, 0.0,
|
||||||
|
0.0, cov_g, 0.0,
|
||||||
|
0.0, 0.0, cov_g,
|
||||||
|
]
|
||||||
|
|
||||||
|
# Orientation unknown — set covariance[0] = -1 per REP-145
|
||||||
|
msg.orientation_covariance[0] = -1.0
|
||||||
|
|
||||||
|
self._imu_pub.publish(msg)
|
||||||
|
self._pub_count[_TOPIC_IMU] += 1
|
||||||
|
|
||||||
|
# ── GPS handler ───────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _handle_gps(self, data: dict) -> None:
|
||||||
|
"""
|
||||||
|
JSON: {"ts":..., "lat","lon","alt_m","accuracy_m","speed_ms","bearing_deg","provider"}
|
||||||
|
"""
|
||||||
|
lat = float(data["lat"])
|
||||||
|
lon = float(data["lon"])
|
||||||
|
alt = float(data.get("alt_m", 0.0))
|
||||||
|
acc = float(data.get("accuracy_m", -1.0))
|
||||||
|
|
||||||
|
if not (-90.0 <= lat <= 90.0):
|
||||||
|
raise ValueError(f"invalid latitude: {lat}")
|
||||||
|
if not (-180.0 <= lon <= 180.0):
|
||||||
|
raise ValueError(f"invalid longitude: {lon}")
|
||||||
|
|
||||||
|
msg = NavSatFix()
|
||||||
|
msg.header = self._make_header(data, self._p["frame_id_gps"])
|
||||||
|
|
||||||
|
msg.latitude = lat
|
||||||
|
msg.longitude = lon
|
||||||
|
msg.altitude = alt
|
||||||
|
|
||||||
|
# Status
|
||||||
|
provider = data.get("provider", "unknown").lower()
|
||||||
|
msg.status.service = NavSatStatus.SERVICE_GPS
|
||||||
|
if provider in ("gps", "fused"):
|
||||||
|
msg.status.status = NavSatStatus.STATUS_FIX
|
||||||
|
elif provider == "network":
|
||||||
|
msg.status.status = NavSatStatus.STATUS_FIX
|
||||||
|
msg.status.service = NavSatStatus.SERVICE_COMPASS
|
||||||
|
else:
|
||||||
|
msg.status.status = NavSatStatus.STATUS_NO_FIX
|
||||||
|
|
||||||
|
# Covariance
|
||||||
|
if acc > 0.0:
|
||||||
|
h_var = acc * acc
|
||||||
|
v_var = h_var * self._p["gps_alt_cov_factor"]
|
||||||
|
msg.position_covariance = [
|
||||||
|
h_var, 0.0, 0.0,
|
||||||
|
0.0, h_var, 0.0,
|
||||||
|
0.0, 0.0, v_var,
|
||||||
|
]
|
||||||
|
msg.position_covariance_type = NavSatFix.COVARIANCE_TYPE_APPROXIMATED
|
||||||
|
else:
|
||||||
|
msg.position_covariance_type = NavSatFix.COVARIANCE_TYPE_UNKNOWN
|
||||||
|
|
||||||
|
self._gps_pub.publish(msg)
|
||||||
|
self._pub_count[_TOPIC_GPS] += 1
|
||||||
|
|
||||||
|
# ── Battery handler ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _handle_battery(self, data: dict) -> None:
|
||||||
|
"""
|
||||||
|
JSON: {"ts":..., "pct","charging","temp_c","health","plugged"}
|
||||||
|
"""
|
||||||
|
pct = int(data["pct"])
|
||||||
|
charging = bool(data.get("charging", False))
|
||||||
|
temp_c = float(data.get("temp_c", float("nan")))
|
||||||
|
health = str(data.get("health", "UNKNOWN")).upper()
|
||||||
|
plugged = str(data.get("plugged", "UNPLUGGED")).upper()
|
||||||
|
|
||||||
|
if not (0 <= pct <= 100):
|
||||||
|
raise ValueError(f"invalid battery percentage: {pct}")
|
||||||
|
|
||||||
|
msg = BatteryState()
|
||||||
|
msg.header = self._make_header(data, "phone_battery")
|
||||||
|
|
||||||
|
msg.percentage = float(pct) / 100.0
|
||||||
|
msg.temperature = temp_c + 273.15 if math.isfinite(temp_c) else float("nan")
|
||||||
|
msg.present = True
|
||||||
|
msg.power_supply_technology = BatteryState.POWER_SUPPLY_TECHNOLOGY_LION
|
||||||
|
|
||||||
|
msg.power_supply_status = (
|
||||||
|
BatteryState.POWER_SUPPLY_STATUS_CHARGING
|
||||||
|
if charging else
|
||||||
|
BatteryState.POWER_SUPPLY_STATUS_DISCHARGING
|
||||||
|
)
|
||||||
|
|
||||||
|
msg.power_supply_health = _HEALTH_MAP.get(
|
||||||
|
health, BatteryState.POWER_SUPPLY_HEALTH_UNKNOWN
|
||||||
|
)
|
||||||
|
|
||||||
|
# Voltage / current unknown on Android
|
||||||
|
msg.voltage = float("nan")
|
||||||
|
msg.current = float("nan")
|
||||||
|
msg.charge = float("nan")
|
||||||
|
msg.capacity = float("nan")
|
||||||
|
msg.design_capacity = float("nan")
|
||||||
|
|
||||||
|
msg.location = "phone"
|
||||||
|
msg.serial_number = ""
|
||||||
|
|
||||||
|
self._bat_pub.publish(msg)
|
||||||
|
self._pub_count[_TOPIC_BATTERY] += 1
|
||||||
|
|
||||||
|
# ── Status publisher ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _publish_status(self) -> None:
|
||||||
|
now = time.time()
|
||||||
|
ages = {
|
||||||
|
t: round(now - self._last_rx_ts[t], 1) if self._last_rx_ts[t] > 0 else -1.0
|
||||||
|
for t in (_TOPIC_IMU, _TOPIC_GPS, _TOPIC_BATTERY)
|
||||||
|
}
|
||||||
|
body = {
|
||||||
|
"mqtt_connected": self._mqtt_connected,
|
||||||
|
"rx": dict(self._rx_count),
|
||||||
|
"pub": dict(self._pub_count),
|
||||||
|
"err": dict(self._err_count),
|
||||||
|
"age_s": {t.split("/")[-1]: ages[t] for t in ages},
|
||||||
|
"queue_depth": self._q.qsize(),
|
||||||
|
}
|
||||||
|
msg = String()
|
||||||
|
msg.data = json.dumps(body, separators=(",", ":"))
|
||||||
|
self._status_pub.publish(msg)
|
||||||
|
|
||||||
|
# ── Cleanup ───────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def destroy_node(self) -> None:
|
||||||
|
if _MQTT_OK and hasattr(self, "_mqtt"):
|
||||||
|
self._mqtt.loop_stop()
|
||||||
|
self._mqtt.disconnect()
|
||||||
|
super().destroy_node()
|
||||||
|
|
||||||
|
|
||||||
|
# ── Entry point ───────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def main(args: Any = None) -> None:
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = MqttRos2BridgeNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.try_shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Loading…
x
Reference in New Issue
Block a user