diff --git a/jetson/ros2_ws/src/saltybot_phone/launch/mqtt_bridge.launch.py b/jetson/ros2_ws/src/saltybot_phone/launch/mqtt_bridge.launch.py new file mode 100644 index 0000000..af18411 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_phone/launch/mqtt_bridge.launch.py @@ -0,0 +1,69 @@ +#!/usr/bin/env python3 +""" +mqtt_bridge.launch.py — Launch the MQTT-to-ROS2 phone sensor bridge (Issue #601) + +Usage: + ros2 launch saltybot_phone mqtt_bridge.launch.py + ros2 launch saltybot_phone mqtt_bridge.launch.py mqtt_host:=192.168.1.100 + ros2 launch saltybot_phone mqtt_bridge.launch.py use_phone_timestamp:=true warn_drift_s:=0.5 +""" +from launch import LaunchDescription +from launch.actions import DeclareLaunchArgument +from launch.substitutions import LaunchConfiguration +from launch_ros.actions import Node + + +def generate_launch_description() -> LaunchDescription: + args = [ + DeclareLaunchArgument( + "mqtt_host", default_value="localhost", + description="MQTT broker IP/hostname", + ), + DeclareLaunchArgument( + "mqtt_port", default_value="1883", + description="MQTT broker port", + ), + DeclareLaunchArgument( + "reconnect_delay_s", default_value="5.0", + description="Seconds between MQTT reconnect attempts", + ), + DeclareLaunchArgument( + "use_phone_timestamp", default_value="false", + description="Use phone epoch ts for ROS2 header stamp", + ), + DeclareLaunchArgument( + "warn_drift_s", default_value="1.0", + description="Clock drift threshold for warning (s)", + ), + DeclareLaunchArgument( + "imu_accel_cov", default_value="0.01", + description="IMU accelerometer diagonal covariance (m/s²)²", + ), + DeclareLaunchArgument( + "imu_gyro_cov", default_value="0.001", + description="IMU gyroscope diagonal covariance (rad/s)²", + ), + ] + + bridge_node = Node( + package="saltybot_phone", + executable="mqtt_ros2_bridge", + name="mqtt_ros2_bridge", + parameters=[{ + "mqtt_host": LaunchConfiguration("mqtt_host"), + "mqtt_port": LaunchConfiguration("mqtt_port"), + "reconnect_delay_s": LaunchConfiguration("reconnect_delay_s"), + "frame_id_imu": "phone_imu", + "frame_id_gps": "phone_gps", + "use_phone_timestamp": LaunchConfiguration("use_phone_timestamp"), + "warn_drift_s": LaunchConfiguration("warn_drift_s"), + "imu_accel_cov": LaunchConfiguration("imu_accel_cov"), + "imu_gyro_cov": LaunchConfiguration("imu_gyro_cov"), + "gps_alt_cov_factor": 4.0, + "status_hz": 0.2, + }], + output="screen", + emulate_tty=True, + ) + + return LaunchDescription(args + [bridge_node]) diff --git a/jetson/ros2_ws/src/saltybot_phone/package.xml b/jetson/ros2_ws/src/saltybot_phone/package.xml index 7209b56..9f900ce 100644 --- a/jetson/ros2_ws/src/saltybot_phone/package.xml +++ b/jetson/ros2_ws/src/saltybot_phone/package.xml @@ -15,6 +15,7 @@ python3-numpy python3-opencv python3-launch-ros + python3-paho-mqtt ament_copyright ament_flake8 ament_pep257 diff --git a/jetson/ros2_ws/src/saltybot_phone/saltybot_phone/mqtt_ros2_bridge_node.py b/jetson/ros2_ws/src/saltybot_phone/saltybot_phone/mqtt_ros2_bridge_node.py new file mode 100644 index 0000000..bfa9e27 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_phone/saltybot_phone/mqtt_ros2_bridge_node.py @@ -0,0 +1,485 @@ +#!/usr/bin/env python3 +""" +mqtt_ros2_bridge_node.py — MQTT-to-ROS2 bridge for Termux phone sensors (Issue #601) + +Subscribes to the three MQTT topics published by phone/sensor_dashboard.py and +republishes each as a proper ROS2 message type on the Jetson. + +MQTT topics consumed (sensor_dashboard.py JSON format) +─────────────────────────────────────────────────────── + saltybot/phone/imu → sensor_msgs/Imu + saltybot/phone/gps → sensor_msgs/NavSatFix + saltybot/phone/battery → sensor_msgs/BatteryState + +ROS2 topics published +────────────────────── + /saltybot/phone/imu sensor_msgs/Imu + /saltybot/phone/gps sensor_msgs/NavSatFix + /saltybot/phone/battery sensor_msgs/BatteryState + /saltybot/phone/bridge/status std_msgs/String (JSON health/stats) + +Timestamp alignment +──────────────────── + Header stamp = ROS2 wall clock (monotonic, consistent with rest of system). + Phone epoch ts from JSON is cross-checked to detect clock drift > warn_drift_s; + if drift is within tolerance the phone timestamp can be used instead by + setting use_phone_timestamp:=true. + +Parameters +────────── + mqtt_host str Broker IP/hostname (default: localhost) + mqtt_port int Broker port (default: 1883) + mqtt_keepalive int Keepalive seconds (default: 60) + reconnect_delay_s float Delay between reconnects (default: 5.0) + frame_id_imu str TF frame for IMU (default: phone_imu) + frame_id_gps str TF frame for GPS (default: phone_gps) + use_phone_timestamp bool Use phone ts for header (default: false) + warn_drift_s float Clock drift warning thresh (default: 1.0) + imu_accel_cov float Accel diagonal covariance (default: 0.01) + imu_gyro_cov float Gyro diagonal covariance (default: 0.001) + gps_alt_cov_factor float Altitude cov = acc*factor (default: 4.0) + status_hz float Bridge status publish rate (default: 0.2) + +Dependencies +──────────── + pip install paho-mqtt (on Jetson) +""" + +import json +import math +import queue +import threading +import time +from typing import Any + +import rclpy +from rclpy.node import Node +from sensor_msgs.msg import BatteryState, Imu, NavSatFix, NavSatStatus +from std_msgs.msg import Header, String + +try: + import paho.mqtt.client as mqtt + _MQTT_OK = True +except ImportError: + _MQTT_OK = False + +# ── MQTT topic constants (must match sensor_dashboard.py) ──────────────────── + +_TOPIC_IMU = "saltybot/phone/imu" +_TOPIC_GPS = "saltybot/phone/gps" +_TOPIC_BATTERY = "saltybot/phone/battery" + +# ── BatteryState health/power-supply constants ─────────────────────────────── + +_HEALTH_MAP = { + "GOOD": BatteryState.POWER_SUPPLY_HEALTH_GOOD, + "OVERHEAT": BatteryState.POWER_SUPPLY_HEALTH_OVERHEAT, + "DEAD": BatteryState.POWER_SUPPLY_HEALTH_DEAD, + "OVER_VOLTAGE": BatteryState.POWER_SUPPLY_HEALTH_OVERVOLTAGE, + "UNSPECIFIED_FAILURE": BatteryState.POWER_SUPPLY_HEALTH_UNSPEC_FAILURE, + "COLD": BatteryState.POWER_SUPPLY_HEALTH_COLD, + "WATCHDOG_TIMER_EXPIRE": BatteryState.POWER_SUPPLY_HEALTH_WATCHDOG_TIMER_EXPIRE, + "SAFETY_TIMER_EXPIRE": BatteryState.POWER_SUPPLY_HEALTH_SAFETY_TIMER_EXPIRE, +} + +_PLUGGED_MAP = { + "AC": BatteryState.POWER_SUPPLY_STATUS_CHARGING, + "USB": BatteryState.POWER_SUPPLY_STATUS_CHARGING, + "WIRELESS": BatteryState.POWER_SUPPLY_STATUS_CHARGING, + "UNPLUGGED": BatteryState.POWER_SUPPLY_STATUS_DISCHARGING, +} + + +class MqttRos2BridgeNode(Node): + """ + Bridges phone sensor MQTT topics into ROS2. + + Architecture: + - paho-mqtt loop_start() runs the MQTT network thread independently. + - on_message callback enqueues raw (topic, payload) pairs. + - A ROS2 timer drains the queue and publishes ROS2 messages, keeping + all ROS2 operations on the executor thread. + """ + + def __init__(self) -> None: + super().__init__("mqtt_ros2_bridge") + + # ── Parameters ──────────────────────────────────────────────────────── + self.declare_parameter("mqtt_host", "localhost") + self.declare_parameter("mqtt_port", 1883) + self.declare_parameter("mqtt_keepalive", 60) + self.declare_parameter("reconnect_delay_s", 5.0) + self.declare_parameter("frame_id_imu", "phone_imu") + self.declare_parameter("frame_id_gps", "phone_gps") + self.declare_parameter("use_phone_timestamp", False) + self.declare_parameter("warn_drift_s", 1.0) + self.declare_parameter("imu_accel_cov", 0.01) + self.declare_parameter("imu_gyro_cov", 0.001) + self.declare_parameter("gps_alt_cov_factor", 4.0) + self.declare_parameter("status_hz", 0.2) + + p = self._p = self._load_params() + + if not _MQTT_OK: + self.get_logger().error( + "paho-mqtt not installed. Run: pip install paho-mqtt" + ) + + # ── Publishers ──────────────────────────────────────────────────────── + self._imu_pub = self.create_publisher(Imu, "/saltybot/phone/imu", 10) + self._gps_pub = self.create_publisher(NavSatFix, "/saltybot/phone/gps", 10) + self._bat_pub = self.create_publisher(BatteryState, "/saltybot/phone/battery", 10) + self._status_pub = self.create_publisher(String, "/saltybot/phone/bridge/status", 10) + + # ── Message queue (MQTT thread → ROS2 executor thread) ──────────────── + self._q: queue.Queue[tuple[str, bytes]] = queue.Queue(maxsize=200) + + # ── Stats ───────────────────────────────────────────────────────────── + self._rx_count = {_TOPIC_IMU: 0, _TOPIC_GPS: 0, _TOPIC_BATTERY: 0} + self._pub_count = {_TOPIC_IMU: 0, _TOPIC_GPS: 0, _TOPIC_BATTERY: 0} + self._err_count = {_TOPIC_IMU: 0, _TOPIC_GPS: 0, _TOPIC_BATTERY: 0} + self._last_rx_ts = {_TOPIC_IMU: 0.0, _TOPIC_GPS: 0.0, _TOPIC_BATTERY: 0.0} + self._mqtt_connected = False + + # ── Timers ──────────────────────────────────────────────────────────── + # Drain queue at 50 Hz (>> sensor rates so latency stays low) + self.create_timer(0.02, self._drain_queue) + # Status publisher + self.create_timer(1.0 / max(p["status_hz"], 0.01), self._publish_status) + + # ── MQTT client ─────────────────────────────────────────────────────── + if _MQTT_OK: + self._mqtt = mqtt.Client( + client_id="saltybot-mqtt-ros2-bridge", clean_session=True + ) + self._mqtt.on_connect = self._on_mqtt_connect + self._mqtt.on_disconnect = self._on_mqtt_disconnect + self._mqtt.on_message = self._on_mqtt_message + self._mqtt.reconnect_delay_set( + min_delay=int(p["reconnect_delay_s"]), + max_delay=int(p["reconnect_delay_s"]) * 4, + ) + self._mqtt_connect() + + self.get_logger().info( + "MQTT→ROS2 bridge started — broker=%s:%d use_phone_ts=%s", + p["mqtt_host"], p["mqtt_port"], p["use_phone_timestamp"], + ) + + # ── Param helper ───────────────────────────────────────────────────────── + + def _load_params(self) -> dict: + return { + "mqtt_host": self.get_parameter("mqtt_host").value, + "mqtt_port": self.get_parameter("mqtt_port").value, + "mqtt_keepalive": self.get_parameter("mqtt_keepalive").value, + "reconnect_delay_s": self.get_parameter("reconnect_delay_s").value, + "frame_id_imu": self.get_parameter("frame_id_imu").value, + "frame_id_gps": self.get_parameter("frame_id_gps").value, + "use_phone_ts": self.get_parameter("use_phone_timestamp").value, + "warn_drift_s": self.get_parameter("warn_drift_s").value, + "imu_accel_cov": self.get_parameter("imu_accel_cov").value, + "imu_gyro_cov": self.get_parameter("imu_gyro_cov").value, + "gps_alt_cov_factor": self.get_parameter("gps_alt_cov_factor").value, + "status_hz": self.get_parameter("status_hz").value, + } + + # ── MQTT connection ─────────────────────────────────────────────────────── + + def _mqtt_connect(self) -> None: + try: + self._mqtt.connect_async( + self._p["mqtt_host"], + self._p["mqtt_port"], + keepalive=self._p["mqtt_keepalive"], + ) + self._mqtt.loop_start() + except Exception as e: + self.get_logger().warning("MQTT initial connect error: %s", str(e)) + + def _on_mqtt_connect(self, client, userdata, flags, rc) -> None: + if rc == 0: + self._mqtt_connected = True + # Subscribe to all phone sensor topics + for topic in (_TOPIC_IMU, _TOPIC_GPS, _TOPIC_BATTERY): + client.subscribe(topic, qos=0) + self.get_logger().info( + "MQTT connected to %s:%d — subscribed to 3 phone topics", + self._p["mqtt_host"], self._p["mqtt_port"], + ) + else: + self.get_logger().warning("MQTT connect failed rc=%d", rc) + + def _on_mqtt_disconnect(self, client, userdata, rc) -> None: + self._mqtt_connected = False + if rc != 0: + self.get_logger().warning( + "MQTT disconnected (rc=%d) — paho will reconnect", rc + ) + + def _on_mqtt_message(self, client, userdata, message) -> None: + """Called in paho network thread — only enqueue, never publish here.""" + topic = message.topic + if topic in self._rx_count: + self._rx_count[topic] += 1 + self._last_rx_ts[topic] = time.time() + try: + self._q.put_nowait((topic, message.payload)) + except queue.Full: + self.get_logger().debug("MQTT queue full — dropping %s", topic) + + # ── Queue drain (ROS2 executor thread) ──────────────────────────────────── + + def _drain_queue(self) -> None: + """Drain up to 50 queued messages per timer tick to bound latency.""" + for _ in range(50): + try: + topic, payload = self._q.get_nowait() + except queue.Empty: + break + self._dispatch(topic, payload) + + def _dispatch(self, topic: str, payload: bytes) -> None: + try: + data = json.loads(payload) + except (json.JSONDecodeError, UnicodeDecodeError) as e: + self._err_count.get(topic, {}) # just access + if topic in self._err_count: + self._err_count[topic] += 1 + self.get_logger().debug("JSON decode error on %s: %s", topic, e) + return + + try: + if topic == _TOPIC_IMU: + self._handle_imu(data) + elif topic == _TOPIC_GPS: + self._handle_gps(data) + elif topic == _TOPIC_BATTERY: + self._handle_battery(data) + except (KeyError, TypeError, ValueError) as e: + if topic in self._err_count: + self._err_count[topic] += 1 + self.get_logger().debug("Payload error on %s: %s — %s", topic, e, data) + + # ── Timestamp helper ────────────────────────────────────────────────────── + + def _make_header(self, data: dict, frame_id: str) -> Header: + """ + Build a ROS2 Header. + + By default uses the ROS2 wall clock so timestamps are consistent with + other nodes. If use_phone_ts is true and the phone's `ts` is within + warn_drift_s of wall time, uses the phone's epoch timestamp instead. + """ + hdr = Header() + hdr.frame_id = frame_id + + phone_ts: float = float(data.get("ts", 0.0)) + now_epoch = time.time() + + if self._p["use_phone_ts"] and phone_ts > 0.0: + drift = abs(now_epoch - phone_ts) + if drift > self._p["warn_drift_s"]: + self.get_logger().warning( + "Phone clock drift %.2f s (frame_id=%s) — using ROS2 clock", + drift, frame_id, + ) + hdr.stamp = self.get_clock().now().to_msg() + else: + # Convert phone epoch to ROS2 Time + sec = int(phone_ts) + nsec = int((phone_ts - sec) * 1e9) + hdr.stamp.sec = sec + hdr.stamp.nanosec = nsec + else: + hdr.stamp = self.get_clock().now().to_msg() + + return hdr + + # ── IMU handler ─────────────────────────────────────────────────────────── + + def _handle_imu(self, data: dict) -> None: + """ + JSON: {"ts":..., "accel":{"x","y","z"}, "gyro":{"x","y","z"}} + Accel units: m/s² Gyro units: rad/s + """ + accel = data["accel"] + gyro = data["gyro"] + + # Validate finite values + for v in (accel["x"], accel["y"], accel["z"], + gyro["x"], gyro["y"], gyro["z"]): + if not math.isfinite(float(v)): + raise ValueError(f"non-finite IMU value: {v}") + + cov_a = self._p["imu_accel_cov"] + cov_g = self._p["imu_gyro_cov"] + + msg = Imu() + msg.header = self._make_header(data, self._p["frame_id_imu"]) + + msg.linear_acceleration.x = float(accel["x"]) + msg.linear_acceleration.y = float(accel["y"]) + msg.linear_acceleration.z = float(accel["z"]) + msg.linear_acceleration_covariance = [ + cov_a, 0.0, 0.0, + 0.0, cov_a, 0.0, + 0.0, 0.0, cov_a, + ] + + msg.angular_velocity.x = float(gyro["x"]) + msg.angular_velocity.y = float(gyro["y"]) + msg.angular_velocity.z = float(gyro["z"]) + msg.angular_velocity_covariance = [ + cov_g, 0.0, 0.0, + 0.0, cov_g, 0.0, + 0.0, 0.0, cov_g, + ] + + # Orientation unknown — set covariance[0] = -1 per REP-145 + msg.orientation_covariance[0] = -1.0 + + self._imu_pub.publish(msg) + self._pub_count[_TOPIC_IMU] += 1 + + # ── GPS handler ─────────────────────────────────────────────────────────── + + def _handle_gps(self, data: dict) -> None: + """ + JSON: {"ts":..., "lat","lon","alt_m","accuracy_m","speed_ms","bearing_deg","provider"} + """ + lat = float(data["lat"]) + lon = float(data["lon"]) + alt = float(data.get("alt_m", 0.0)) + acc = float(data.get("accuracy_m", -1.0)) + + if not (-90.0 <= lat <= 90.0): + raise ValueError(f"invalid latitude: {lat}") + if not (-180.0 <= lon <= 180.0): + raise ValueError(f"invalid longitude: {lon}") + + msg = NavSatFix() + msg.header = self._make_header(data, self._p["frame_id_gps"]) + + msg.latitude = lat + msg.longitude = lon + msg.altitude = alt + + # Status + provider = data.get("provider", "unknown").lower() + msg.status.service = NavSatStatus.SERVICE_GPS + if provider in ("gps", "fused"): + msg.status.status = NavSatStatus.STATUS_FIX + elif provider == "network": + msg.status.status = NavSatStatus.STATUS_FIX + msg.status.service = NavSatStatus.SERVICE_COMPASS + else: + msg.status.status = NavSatStatus.STATUS_NO_FIX + + # Covariance + if acc > 0.0: + h_var = acc * acc + v_var = h_var * self._p["gps_alt_cov_factor"] + msg.position_covariance = [ + h_var, 0.0, 0.0, + 0.0, h_var, 0.0, + 0.0, 0.0, v_var, + ] + msg.position_covariance_type = NavSatFix.COVARIANCE_TYPE_APPROXIMATED + else: + msg.position_covariance_type = NavSatFix.COVARIANCE_TYPE_UNKNOWN + + self._gps_pub.publish(msg) + self._pub_count[_TOPIC_GPS] += 1 + + # ── Battery handler ─────────────────────────────────────────────────────── + + def _handle_battery(self, data: dict) -> None: + """ + JSON: {"ts":..., "pct","charging","temp_c","health","plugged"} + """ + pct = int(data["pct"]) + charging = bool(data.get("charging", False)) + temp_c = float(data.get("temp_c", float("nan"))) + health = str(data.get("health", "UNKNOWN")).upper() + plugged = str(data.get("plugged", "UNPLUGGED")).upper() + + if not (0 <= pct <= 100): + raise ValueError(f"invalid battery percentage: {pct}") + + msg = BatteryState() + msg.header = self._make_header(data, "phone_battery") + + msg.percentage = float(pct) / 100.0 + msg.temperature = temp_c + 273.15 if math.isfinite(temp_c) else float("nan") + msg.present = True + msg.power_supply_technology = BatteryState.POWER_SUPPLY_TECHNOLOGY_LION + + msg.power_supply_status = ( + BatteryState.POWER_SUPPLY_STATUS_CHARGING + if charging else + BatteryState.POWER_SUPPLY_STATUS_DISCHARGING + ) + + msg.power_supply_health = _HEALTH_MAP.get( + health, BatteryState.POWER_SUPPLY_HEALTH_UNKNOWN + ) + + # Voltage / current unknown on Android + msg.voltage = float("nan") + msg.current = float("nan") + msg.charge = float("nan") + msg.capacity = float("nan") + msg.design_capacity = float("nan") + + msg.location = "phone" + msg.serial_number = "" + + self._bat_pub.publish(msg) + self._pub_count[_TOPIC_BATTERY] += 1 + + # ── Status publisher ───────────────────────────────────────────────────── + + def _publish_status(self) -> None: + now = time.time() + ages = { + t: round(now - self._last_rx_ts[t], 1) if self._last_rx_ts[t] > 0 else -1.0 + for t in (_TOPIC_IMU, _TOPIC_GPS, _TOPIC_BATTERY) + } + body = { + "mqtt_connected": self._mqtt_connected, + "rx": dict(self._rx_count), + "pub": dict(self._pub_count), + "err": dict(self._err_count), + "age_s": {t.split("/")[-1]: ages[t] for t in ages}, + "queue_depth": self._q.qsize(), + } + msg = String() + msg.data = json.dumps(body, separators=(",", ":")) + self._status_pub.publish(msg) + + # ── Cleanup ─────────────────────────────────────────────────────────────── + + def destroy_node(self) -> None: + if _MQTT_OK and hasattr(self, "_mqtt"): + self._mqtt.loop_stop() + self._mqtt.disconnect() + super().destroy_node() + + +# ── Entry point ─────────────────────────────────────────────────────────────── + +def main(args: Any = None) -> None: + rclpy.init(args=args) + node = MqttRos2BridgeNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.try_shutdown() + + +if __name__ == "__main__": + main() diff --git a/jetson/ros2_ws/src/saltybot_phone/setup.py b/jetson/ros2_ws/src/saltybot_phone/setup.py index dc3b12f..b96b732 100644 --- a/jetson/ros2_ws/src/saltybot_phone/setup.py +++ b/jetson/ros2_ws/src/saltybot_phone/setup.py @@ -29,6 +29,7 @@ setup( 'openclaw_chat = saltybot_phone.openclaw_chat_node:main', 'phone_bridge = saltybot_phone.ws_bridge:main', 'phone_camera_node = saltybot_phone.phone_camera_node:main', + 'mqtt_ros2_bridge = saltybot_phone.mqtt_ros2_bridge_node:main', ], }, ) \ No newline at end of file