feat: MQTT-to-ROS2 phone sensor bridge (Issue #601) #605

Merged
sl-jetson merged 2 commits from sl-android/issue-601-mqtt-ros2-bridge into main 2026-03-14 15:55:23 -04:00
4 changed files with 556 additions and 0 deletions

View File

@ -0,0 +1,69 @@
#!/usr/bin/env python3
"""
mqtt_bridge.launch.py Launch the MQTT-to-ROS2 phone sensor bridge (Issue #601)
Usage:
ros2 launch saltybot_phone mqtt_bridge.launch.py
ros2 launch saltybot_phone mqtt_bridge.launch.py mqtt_host:=192.168.1.100
ros2 launch saltybot_phone mqtt_bridge.launch.py use_phone_timestamp:=true warn_drift_s:=0.5
"""
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description() -> LaunchDescription:
args = [
DeclareLaunchArgument(
"mqtt_host", default_value="localhost",
description="MQTT broker IP/hostname",
),
DeclareLaunchArgument(
"mqtt_port", default_value="1883",
description="MQTT broker port",
),
DeclareLaunchArgument(
"reconnect_delay_s", default_value="5.0",
description="Seconds between MQTT reconnect attempts",
),
DeclareLaunchArgument(
"use_phone_timestamp", default_value="false",
description="Use phone epoch ts for ROS2 header stamp",
),
DeclareLaunchArgument(
"warn_drift_s", default_value="1.0",
description="Clock drift threshold for warning (s)",
),
DeclareLaunchArgument(
"imu_accel_cov", default_value="0.01",
description="IMU accelerometer diagonal covariance (m/s²)²",
),
DeclareLaunchArgument(
"imu_gyro_cov", default_value="0.001",
description="IMU gyroscope diagonal covariance (rad/s)²",
),
]
bridge_node = Node(
package="saltybot_phone",
executable="mqtt_ros2_bridge",
name="mqtt_ros2_bridge",
parameters=[{
"mqtt_host": LaunchConfiguration("mqtt_host"),
"mqtt_port": LaunchConfiguration("mqtt_port"),
"reconnect_delay_s": LaunchConfiguration("reconnect_delay_s"),
"frame_id_imu": "phone_imu",
"frame_id_gps": "phone_gps",
"use_phone_timestamp": LaunchConfiguration("use_phone_timestamp"),
"warn_drift_s": LaunchConfiguration("warn_drift_s"),
"imu_accel_cov": LaunchConfiguration("imu_accel_cov"),
"imu_gyro_cov": LaunchConfiguration("imu_gyro_cov"),
"gps_alt_cov_factor": 4.0,
"status_hz": 0.2,
}],
output="screen",
emulate_tty=True,
)
return LaunchDescription(args + [bridge_node])

View File

@ -15,6 +15,7 @@
<exec_depend>python3-numpy</exec_depend>
<exec_depend>python3-opencv</exec_depend>
<exec_depend>python3-launch-ros</exec_depend>
<exec_depend>python3-paho-mqtt</exec_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>

View File

@ -0,0 +1,485 @@
#!/usr/bin/env python3
"""
mqtt_ros2_bridge_node.py MQTT-to-ROS2 bridge for Termux phone sensors (Issue #601)
Subscribes to the three MQTT topics published by phone/sensor_dashboard.py and
republishes each as a proper ROS2 message type on the Jetson.
MQTT topics consumed (sensor_dashboard.py JSON format)
saltybot/phone/imu sensor_msgs/Imu
saltybot/phone/gps sensor_msgs/NavSatFix
saltybot/phone/battery sensor_msgs/BatteryState
ROS2 topics published
/saltybot/phone/imu sensor_msgs/Imu
/saltybot/phone/gps sensor_msgs/NavSatFix
/saltybot/phone/battery sensor_msgs/BatteryState
/saltybot/phone/bridge/status std_msgs/String (JSON health/stats)
Timestamp alignment
Header stamp = ROS2 wall clock (monotonic, consistent with rest of system).
Phone epoch ts from JSON is cross-checked to detect clock drift > warn_drift_s;
if drift is within tolerance the phone timestamp can be used instead by
setting use_phone_timestamp:=true.
Parameters
mqtt_host str Broker IP/hostname (default: localhost)
mqtt_port int Broker port (default: 1883)
mqtt_keepalive int Keepalive seconds (default: 60)
reconnect_delay_s float Delay between reconnects (default: 5.0)
frame_id_imu str TF frame for IMU (default: phone_imu)
frame_id_gps str TF frame for GPS (default: phone_gps)
use_phone_timestamp bool Use phone ts for header (default: false)
warn_drift_s float Clock drift warning thresh (default: 1.0)
imu_accel_cov float Accel diagonal covariance (default: 0.01)
imu_gyro_cov float Gyro diagonal covariance (default: 0.001)
gps_alt_cov_factor float Altitude cov = acc*factor (default: 4.0)
status_hz float Bridge status publish rate (default: 0.2)
Dependencies
pip install paho-mqtt (on Jetson)
"""
import json
import math
import queue
import threading
import time
from typing import Any
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import BatteryState, Imu, NavSatFix, NavSatStatus
from std_msgs.msg import Header, String
try:
import paho.mqtt.client as mqtt
_MQTT_OK = True
except ImportError:
_MQTT_OK = False
# ── MQTT topic constants (must match sensor_dashboard.py) ────────────────────
_TOPIC_IMU = "saltybot/phone/imu"
_TOPIC_GPS = "saltybot/phone/gps"
_TOPIC_BATTERY = "saltybot/phone/battery"
# ── BatteryState health/power-supply constants ───────────────────────────────
_HEALTH_MAP = {
"GOOD": BatteryState.POWER_SUPPLY_HEALTH_GOOD,
"OVERHEAT": BatteryState.POWER_SUPPLY_HEALTH_OVERHEAT,
"DEAD": BatteryState.POWER_SUPPLY_HEALTH_DEAD,
"OVER_VOLTAGE": BatteryState.POWER_SUPPLY_HEALTH_OVERVOLTAGE,
"UNSPECIFIED_FAILURE": BatteryState.POWER_SUPPLY_HEALTH_UNSPEC_FAILURE,
"COLD": BatteryState.POWER_SUPPLY_HEALTH_COLD,
"WATCHDOG_TIMER_EXPIRE": BatteryState.POWER_SUPPLY_HEALTH_WATCHDOG_TIMER_EXPIRE,
"SAFETY_TIMER_EXPIRE": BatteryState.POWER_SUPPLY_HEALTH_SAFETY_TIMER_EXPIRE,
}
_PLUGGED_MAP = {
"AC": BatteryState.POWER_SUPPLY_STATUS_CHARGING,
"USB": BatteryState.POWER_SUPPLY_STATUS_CHARGING,
"WIRELESS": BatteryState.POWER_SUPPLY_STATUS_CHARGING,
"UNPLUGGED": BatteryState.POWER_SUPPLY_STATUS_DISCHARGING,
}
class MqttRos2BridgeNode(Node):
"""
Bridges phone sensor MQTT topics into ROS2.
Architecture:
- paho-mqtt loop_start() runs the MQTT network thread independently.
- on_message callback enqueues raw (topic, payload) pairs.
- A ROS2 timer drains the queue and publishes ROS2 messages, keeping
all ROS2 operations on the executor thread.
"""
def __init__(self) -> None:
super().__init__("mqtt_ros2_bridge")
# ── Parameters ────────────────────────────────────────────────────────
self.declare_parameter("mqtt_host", "localhost")
self.declare_parameter("mqtt_port", 1883)
self.declare_parameter("mqtt_keepalive", 60)
self.declare_parameter("reconnect_delay_s", 5.0)
self.declare_parameter("frame_id_imu", "phone_imu")
self.declare_parameter("frame_id_gps", "phone_gps")
self.declare_parameter("use_phone_timestamp", False)
self.declare_parameter("warn_drift_s", 1.0)
self.declare_parameter("imu_accel_cov", 0.01)
self.declare_parameter("imu_gyro_cov", 0.001)
self.declare_parameter("gps_alt_cov_factor", 4.0)
self.declare_parameter("status_hz", 0.2)
p = self._p = self._load_params()
if not _MQTT_OK:
self.get_logger().error(
"paho-mqtt not installed. Run: pip install paho-mqtt"
)
# ── Publishers ────────────────────────────────────────────────────────
self._imu_pub = self.create_publisher(Imu, "/saltybot/phone/imu", 10)
self._gps_pub = self.create_publisher(NavSatFix, "/saltybot/phone/gps", 10)
self._bat_pub = self.create_publisher(BatteryState, "/saltybot/phone/battery", 10)
self._status_pub = self.create_publisher(String, "/saltybot/phone/bridge/status", 10)
# ── Message queue (MQTT thread → ROS2 executor thread) ────────────────
self._q: queue.Queue[tuple[str, bytes]] = queue.Queue(maxsize=200)
# ── Stats ─────────────────────────────────────────────────────────────
self._rx_count = {_TOPIC_IMU: 0, _TOPIC_GPS: 0, _TOPIC_BATTERY: 0}
self._pub_count = {_TOPIC_IMU: 0, _TOPIC_GPS: 0, _TOPIC_BATTERY: 0}
self._err_count = {_TOPIC_IMU: 0, _TOPIC_GPS: 0, _TOPIC_BATTERY: 0}
self._last_rx_ts = {_TOPIC_IMU: 0.0, _TOPIC_GPS: 0.0, _TOPIC_BATTERY: 0.0}
self._mqtt_connected = False
# ── Timers ────────────────────────────────────────────────────────────
# Drain queue at 50 Hz (>> sensor rates so latency stays low)
self.create_timer(0.02, self._drain_queue)
# Status publisher
self.create_timer(1.0 / max(p["status_hz"], 0.01), self._publish_status)
# ── MQTT client ───────────────────────────────────────────────────────
if _MQTT_OK:
self._mqtt = mqtt.Client(
client_id="saltybot-mqtt-ros2-bridge", clean_session=True
)
self._mqtt.on_connect = self._on_mqtt_connect
self._mqtt.on_disconnect = self._on_mqtt_disconnect
self._mqtt.on_message = self._on_mqtt_message
self._mqtt.reconnect_delay_set(
min_delay=int(p["reconnect_delay_s"]),
max_delay=int(p["reconnect_delay_s"]) * 4,
)
self._mqtt_connect()
self.get_logger().info(
"MQTT→ROS2 bridge started — broker=%s:%d use_phone_ts=%s",
p["mqtt_host"], p["mqtt_port"], p["use_phone_timestamp"],
)
# ── Param helper ─────────────────────────────────────────────────────────
def _load_params(self) -> dict:
return {
"mqtt_host": self.get_parameter("mqtt_host").value,
"mqtt_port": self.get_parameter("mqtt_port").value,
"mqtt_keepalive": self.get_parameter("mqtt_keepalive").value,
"reconnect_delay_s": self.get_parameter("reconnect_delay_s").value,
"frame_id_imu": self.get_parameter("frame_id_imu").value,
"frame_id_gps": self.get_parameter("frame_id_gps").value,
"use_phone_ts": self.get_parameter("use_phone_timestamp").value,
"warn_drift_s": self.get_parameter("warn_drift_s").value,
"imu_accel_cov": self.get_parameter("imu_accel_cov").value,
"imu_gyro_cov": self.get_parameter("imu_gyro_cov").value,
"gps_alt_cov_factor": self.get_parameter("gps_alt_cov_factor").value,
"status_hz": self.get_parameter("status_hz").value,
}
# ── MQTT connection ───────────────────────────────────────────────────────
def _mqtt_connect(self) -> None:
try:
self._mqtt.connect_async(
self._p["mqtt_host"],
self._p["mqtt_port"],
keepalive=self._p["mqtt_keepalive"],
)
self._mqtt.loop_start()
except Exception as e:
self.get_logger().warning("MQTT initial connect error: %s", str(e))
def _on_mqtt_connect(self, client, userdata, flags, rc) -> None:
if rc == 0:
self._mqtt_connected = True
# Subscribe to all phone sensor topics
for topic in (_TOPIC_IMU, _TOPIC_GPS, _TOPIC_BATTERY):
client.subscribe(topic, qos=0)
self.get_logger().info(
"MQTT connected to %s:%d — subscribed to 3 phone topics",
self._p["mqtt_host"], self._p["mqtt_port"],
)
else:
self.get_logger().warning("MQTT connect failed rc=%d", rc)
def _on_mqtt_disconnect(self, client, userdata, rc) -> None:
self._mqtt_connected = False
if rc != 0:
self.get_logger().warning(
"MQTT disconnected (rc=%d) — paho will reconnect", rc
)
def _on_mqtt_message(self, client, userdata, message) -> None:
"""Called in paho network thread — only enqueue, never publish here."""
topic = message.topic
if topic in self._rx_count:
self._rx_count[topic] += 1
self._last_rx_ts[topic] = time.time()
try:
self._q.put_nowait((topic, message.payload))
except queue.Full:
self.get_logger().debug("MQTT queue full — dropping %s", topic)
# ── Queue drain (ROS2 executor thread) ────────────────────────────────────
def _drain_queue(self) -> None:
"""Drain up to 50 queued messages per timer tick to bound latency."""
for _ in range(50):
try:
topic, payload = self._q.get_nowait()
except queue.Empty:
break
self._dispatch(topic, payload)
def _dispatch(self, topic: str, payload: bytes) -> None:
try:
data = json.loads(payload)
except (json.JSONDecodeError, UnicodeDecodeError) as e:
self._err_count.get(topic, {}) # just access
if topic in self._err_count:
self._err_count[topic] += 1
self.get_logger().debug("JSON decode error on %s: %s", topic, e)
return
try:
if topic == _TOPIC_IMU:
self._handle_imu(data)
elif topic == _TOPIC_GPS:
self._handle_gps(data)
elif topic == _TOPIC_BATTERY:
self._handle_battery(data)
except (KeyError, TypeError, ValueError) as e:
if topic in self._err_count:
self._err_count[topic] += 1
self.get_logger().debug("Payload error on %s: %s%s", topic, e, data)
# ── Timestamp helper ──────────────────────────────────────────────────────
def _make_header(self, data: dict, frame_id: str) -> Header:
"""
Build a ROS2 Header.
By default uses the ROS2 wall clock so timestamps are consistent with
other nodes. If use_phone_ts is true and the phone's `ts` is within
warn_drift_s of wall time, uses the phone's epoch timestamp instead.
"""
hdr = Header()
hdr.frame_id = frame_id
phone_ts: float = float(data.get("ts", 0.0))
now_epoch = time.time()
if self._p["use_phone_ts"] and phone_ts > 0.0:
drift = abs(now_epoch - phone_ts)
if drift > self._p["warn_drift_s"]:
self.get_logger().warning(
"Phone clock drift %.2f s (frame_id=%s) — using ROS2 clock",
drift, frame_id,
)
hdr.stamp = self.get_clock().now().to_msg()
else:
# Convert phone epoch to ROS2 Time
sec = int(phone_ts)
nsec = int((phone_ts - sec) * 1e9)
hdr.stamp.sec = sec
hdr.stamp.nanosec = nsec
else:
hdr.stamp = self.get_clock().now().to_msg()
return hdr
# ── IMU handler ───────────────────────────────────────────────────────────
def _handle_imu(self, data: dict) -> None:
"""
JSON: {"ts":..., "accel":{"x","y","z"}, "gyro":{"x","y","z"}}
Accel units: m/ Gyro units: rad/s
"""
accel = data["accel"]
gyro = data["gyro"]
# Validate finite values
for v in (accel["x"], accel["y"], accel["z"],
gyro["x"], gyro["y"], gyro["z"]):
if not math.isfinite(float(v)):
raise ValueError(f"non-finite IMU value: {v}")
cov_a = self._p["imu_accel_cov"]
cov_g = self._p["imu_gyro_cov"]
msg = Imu()
msg.header = self._make_header(data, self._p["frame_id_imu"])
msg.linear_acceleration.x = float(accel["x"])
msg.linear_acceleration.y = float(accel["y"])
msg.linear_acceleration.z = float(accel["z"])
msg.linear_acceleration_covariance = [
cov_a, 0.0, 0.0,
0.0, cov_a, 0.0,
0.0, 0.0, cov_a,
]
msg.angular_velocity.x = float(gyro["x"])
msg.angular_velocity.y = float(gyro["y"])
msg.angular_velocity.z = float(gyro["z"])
msg.angular_velocity_covariance = [
cov_g, 0.0, 0.0,
0.0, cov_g, 0.0,
0.0, 0.0, cov_g,
]
# Orientation unknown — set covariance[0] = -1 per REP-145
msg.orientation_covariance[0] = -1.0
self._imu_pub.publish(msg)
self._pub_count[_TOPIC_IMU] += 1
# ── GPS handler ───────────────────────────────────────────────────────────
def _handle_gps(self, data: dict) -> None:
"""
JSON: {"ts":..., "lat","lon","alt_m","accuracy_m","speed_ms","bearing_deg","provider"}
"""
lat = float(data["lat"])
lon = float(data["lon"])
alt = float(data.get("alt_m", 0.0))
acc = float(data.get("accuracy_m", -1.0))
if not (-90.0 <= lat <= 90.0):
raise ValueError(f"invalid latitude: {lat}")
if not (-180.0 <= lon <= 180.0):
raise ValueError(f"invalid longitude: {lon}")
msg = NavSatFix()
msg.header = self._make_header(data, self._p["frame_id_gps"])
msg.latitude = lat
msg.longitude = lon
msg.altitude = alt
# Status
provider = data.get("provider", "unknown").lower()
msg.status.service = NavSatStatus.SERVICE_GPS
if provider in ("gps", "fused"):
msg.status.status = NavSatStatus.STATUS_FIX
elif provider == "network":
msg.status.status = NavSatStatus.STATUS_FIX
msg.status.service = NavSatStatus.SERVICE_COMPASS
else:
msg.status.status = NavSatStatus.STATUS_NO_FIX
# Covariance
if acc > 0.0:
h_var = acc * acc
v_var = h_var * self._p["gps_alt_cov_factor"]
msg.position_covariance = [
h_var, 0.0, 0.0,
0.0, h_var, 0.0,
0.0, 0.0, v_var,
]
msg.position_covariance_type = NavSatFix.COVARIANCE_TYPE_APPROXIMATED
else:
msg.position_covariance_type = NavSatFix.COVARIANCE_TYPE_UNKNOWN
self._gps_pub.publish(msg)
self._pub_count[_TOPIC_GPS] += 1
# ── Battery handler ───────────────────────────────────────────────────────
def _handle_battery(self, data: dict) -> None:
"""
JSON: {"ts":..., "pct","charging","temp_c","health","plugged"}
"""
pct = int(data["pct"])
charging = bool(data.get("charging", False))
temp_c = float(data.get("temp_c", float("nan")))
health = str(data.get("health", "UNKNOWN")).upper()
plugged = str(data.get("plugged", "UNPLUGGED")).upper()
if not (0 <= pct <= 100):
raise ValueError(f"invalid battery percentage: {pct}")
msg = BatteryState()
msg.header = self._make_header(data, "phone_battery")
msg.percentage = float(pct) / 100.0
msg.temperature = temp_c + 273.15 if math.isfinite(temp_c) else float("nan")
msg.present = True
msg.power_supply_technology = BatteryState.POWER_SUPPLY_TECHNOLOGY_LION
msg.power_supply_status = (
BatteryState.POWER_SUPPLY_STATUS_CHARGING
if charging else
BatteryState.POWER_SUPPLY_STATUS_DISCHARGING
)
msg.power_supply_health = _HEALTH_MAP.get(
health, BatteryState.POWER_SUPPLY_HEALTH_UNKNOWN
)
# Voltage / current unknown on Android
msg.voltage = float("nan")
msg.current = float("nan")
msg.charge = float("nan")
msg.capacity = float("nan")
msg.design_capacity = float("nan")
msg.location = "phone"
msg.serial_number = ""
self._bat_pub.publish(msg)
self._pub_count[_TOPIC_BATTERY] += 1
# ── Status publisher ─────────────────────────────────────────────────────
def _publish_status(self) -> None:
now = time.time()
ages = {
t: round(now - self._last_rx_ts[t], 1) if self._last_rx_ts[t] > 0 else -1.0
for t in (_TOPIC_IMU, _TOPIC_GPS, _TOPIC_BATTERY)
}
body = {
"mqtt_connected": self._mqtt_connected,
"rx": dict(self._rx_count),
"pub": dict(self._pub_count),
"err": dict(self._err_count),
"age_s": {t.split("/")[-1]: ages[t] for t in ages},
"queue_depth": self._q.qsize(),
}
msg = String()
msg.data = json.dumps(body, separators=(",", ":"))
self._status_pub.publish(msg)
# ── Cleanup ───────────────────────────────────────────────────────────────
def destroy_node(self) -> None:
if _MQTT_OK and hasattr(self, "_mqtt"):
self._mqtt.loop_stop()
self._mqtt.disconnect()
super().destroy_node()
# ── Entry point ───────────────────────────────────────────────────────────────
def main(args: Any = None) -> None:
rclpy.init(args=args)
node = MqttRos2BridgeNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.try_shutdown()
if __name__ == "__main__":
main()

View File

@ -29,6 +29,7 @@ setup(
'openclaw_chat = saltybot_phone.openclaw_chat_node:main',
'phone_bridge = saltybot_phone.ws_bridge:main',
'phone_camera_node = saltybot_phone.phone_camera_node:main',
'mqtt_ros2_bridge = saltybot_phone.mqtt_ros2_bridge_node:main',
],
},
)