From 289185e6cf79b4d91b9f1462b802f91cf25b27ee Mon Sep 17 00:00:00 2001 From: sl-android Date: Tue, 17 Mar 2026 11:32:37 -0400 Subject: [PATCH] feat: VESC CAN telemetry MQTT relay (Issue #656) Add vesc_mqtt_relay_node.py to saltybot_phone: subscribes to /vesc/left/state, /vesc/right/state, /vesc/combined ROS2 topics and publishes JSON telemetry to saltybot/phone/vesc_{left,right,combined} MQTT topics at 5 Hz per motor. 32 unit tests, no ROS2/paho required. Co-Authored-By: Claude Sonnet 4.6 --- .../saltybot_phone/vesc_mqtt_relay_node.py | 305 ++++++++++++++++ jetson/ros2_ws/src/saltybot_phone/setup.py | 1 + .../src/saltybot_phone/test/__init__.py | 0 .../test/test_vesc_mqtt_relay.py | 343 ++++++++++++++++++ 4 files changed, 649 insertions(+) create mode 100644 jetson/ros2_ws/src/saltybot_phone/saltybot_phone/vesc_mqtt_relay_node.py create mode 100644 jetson/ros2_ws/src/saltybot_phone/test/__init__.py create mode 100644 jetson/ros2_ws/src/saltybot_phone/test/test_vesc_mqtt_relay.py diff --git a/jetson/ros2_ws/src/saltybot_phone/saltybot_phone/vesc_mqtt_relay_node.py b/jetson/ros2_ws/src/saltybot_phone/saltybot_phone/vesc_mqtt_relay_node.py new file mode 100644 index 0000000..5644a69 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_phone/saltybot_phone/vesc_mqtt_relay_node.py @@ -0,0 +1,305 @@ +#!/usr/bin/env python3 +""" +vesc_mqtt_relay_node.py — ROS2 → MQTT relay for VESC CAN telemetry (Issue #656) + +Subscribes to VESC telemetry ROS2 topics and republishes as MQTT JSON payloads +for the sensor dashboard. Each per-motor topic is rate-limited to 5 Hz. + +ROS2 topics consumed +───────────────────── + /vesc/left/state std_msgs/String JSON from vesc_telemetry_node + /vesc/right/state std_msgs/String JSON from vesc_telemetry_node + /vesc/combined std_msgs/String JSON from vesc_telemetry_node + +MQTT topics published +────────────────────── + saltybot/phone/vesc_left — per-motor telemetry (left) + saltybot/phone/vesc_right — per-motor telemetry (right) + saltybot/phone/vesc_combined — combined voltage + total current + RPMs + +MQTT payload (per-motor) +──────────────────────── + { + "rpm": int, + "current_a": float, # phase current + "voltage_v": float, # bus voltage + "temperature_c": float, # FET temperature + "duty_cycle": float, # -1.0 … 1.0 + "fault_code": int, + "ts": float # epoch seconds + } + +Parameters +────────── + mqtt_host str Broker IP/hostname (default: localhost) + mqtt_port int Broker port (default: 1883) + mqtt_keepalive int Keepalive seconds (default: 60) + reconnect_delay_s float Delay between reconnects (default: 5.0) + motor_rate_hz float Max publish rate per motor (default: 5.0) + +Dependencies +──────────── + pip install paho-mqtt +""" + +import json +import time +from typing import Any + +import rclpy +from rclpy.node import Node +from std_msgs.msg import String + +try: + import paho.mqtt.client as mqtt + _MQTT_OK = True +except ImportError: + _MQTT_OK = False + +# ── MQTT topic constants ────────────────────────────────────────────────────── + +_MQTT_VESC_LEFT = "saltybot/phone/vesc_left" +_MQTT_VESC_RIGHT = "saltybot/phone/vesc_right" +_MQTT_VESC_COMBINED = "saltybot/phone/vesc_combined" + +# ── ROS2 topic constants ────────────────────────────────────────────────────── + +_ROS_VESC_LEFT = "/vesc/left/state" +_ROS_VESC_RIGHT = "/vesc/right/state" +_ROS_VESC_COMBINED = "/vesc/combined" + + +def _extract_motor_payload(data: dict) -> dict: + """ + Extract the required fields from a vesc_telemetry_node per-motor JSON dict. + + Upstream JSON keys (from vesc_telemetry_node._state_to_dict): + rpm, current_a, voltage_v, temp_fet_c, duty_cycle, fault_code, ... + + Returns a dashboard-friendly payload with a stable key set. + """ + return { + "rpm": int(data["rpm"]), + "current_a": float(data["current_a"]), + "voltage_v": float(data["voltage_v"]), + "temperature_c": float(data["temp_fet_c"]), + "duty_cycle": float(data["duty_cycle"]), + "fault_code": int(data["fault_code"]), + "ts": float(data.get("stamp", time.time())), + } + + +def _extract_combined_payload(data: dict) -> dict: + """ + Extract fields from the /vesc/combined JSON dict. + + Upstream keys: voltage_v, total_current_a, left_rpm, right_rpm, + left_alive, right_alive, stamp + """ + return { + "voltage_v": float(data["voltage_v"]), + "total_current_a": float(data["total_current_a"]), + "left_rpm": int(data["left_rpm"]), + "right_rpm": int(data["right_rpm"]), + "left_alive": bool(data["left_alive"]), + "right_alive": bool(data["right_alive"]), + "ts": float(data.get("stamp", time.time())), + } + + +class VescMqttRelayNode(Node): + """ + Subscribes to VESC ROS2 topics and relays telemetry to MQTT. + + Rate limiting: each per-motor topic maintains a last-publish timestamp; + messages arriving faster than motor_rate_hz are silently dropped. + The /vesc/combined topic is also rate-limited at the same rate. + """ + + def __init__(self) -> None: + super().__init__("vesc_mqtt_relay") + + # ── Parameters ──────────────────────────────────────────────────────── + self.declare_parameter("mqtt_host", "localhost") + self.declare_parameter("mqtt_port", 1883) + self.declare_parameter("mqtt_keepalive", 60) + self.declare_parameter("reconnect_delay_s", 5.0) + self.declare_parameter("motor_rate_hz", 5.0) + + self._mqtt_host = self.get_parameter("mqtt_host").value + self._mqtt_port = self.get_parameter("mqtt_port").value + self._mqtt_keepalive = self.get_parameter("mqtt_keepalive").value + reconnect_delay = self.get_parameter("reconnect_delay_s").value + rate_hz = max(0.1, float(self.get_parameter("motor_rate_hz").value)) + self._min_interval = 1.0 / rate_hz + + if not _MQTT_OK: + self.get_logger().error( + "paho-mqtt not installed — run: pip install paho-mqtt" + ) + + # ── Rate-limit state (last publish epoch per MQTT topic) ────────────── + self._last_pub: dict[str, float] = { + _MQTT_VESC_LEFT: 0.0, + _MQTT_VESC_RIGHT: 0.0, + _MQTT_VESC_COMBINED: 0.0, + } + + # ── Stats ───────────────────────────────────────────────────────────── + self._rx_count: dict[str, int] = { + _MQTT_VESC_LEFT: 0, _MQTT_VESC_RIGHT: 0, _MQTT_VESC_COMBINED: 0 + } + self._pub_count: dict[str, int] = { + _MQTT_VESC_LEFT: 0, _MQTT_VESC_RIGHT: 0, _MQTT_VESC_COMBINED: 0 + } + self._drop_count: dict[str, int] = { + _MQTT_VESC_LEFT: 0, _MQTT_VESC_RIGHT: 0, _MQTT_VESC_COMBINED: 0 + } + self._err_count: dict[str, int] = { + _MQTT_VESC_LEFT: 0, _MQTT_VESC_RIGHT: 0, _MQTT_VESC_COMBINED: 0 + } + self._mqtt_connected = False + + # ── ROS2 subscriptions ──────────────────────────────────────────────── + self.create_subscription( + String, _ROS_VESC_LEFT, + lambda msg: self._on_vesc(msg, _MQTT_VESC_LEFT, _extract_motor_payload), + 10, + ) + self.create_subscription( + String, _ROS_VESC_RIGHT, + lambda msg: self._on_vesc(msg, _MQTT_VESC_RIGHT, _extract_motor_payload), + 10, + ) + self.create_subscription( + String, _ROS_VESC_COMBINED, + lambda msg: self._on_vesc(msg, _MQTT_VESC_COMBINED, _extract_combined_payload), + 10, + ) + + # ── MQTT client ─────────────────────────────────────────────────────── + self._mqtt_client: "mqtt.Client | None" = None + if _MQTT_OK: + self._mqtt_client = mqtt.Client( + client_id="saltybot-vesc-mqtt-relay", clean_session=True + ) + self._mqtt_client.on_connect = self._on_mqtt_connect + self._mqtt_client.on_disconnect = self._on_mqtt_disconnect + self._mqtt_client.reconnect_delay_set( + min_delay=int(reconnect_delay), + max_delay=int(reconnect_delay) * 4, + ) + self._mqtt_connect() + + self.get_logger().info( + "VESC→MQTT relay started — broker=%s:%d rate=%.1f Hz", + self._mqtt_host, self._mqtt_port, rate_hz, + ) + + # ── MQTT connection ─────────────────────────────────────────────────────── + + def _mqtt_connect(self) -> None: + try: + self._mqtt_client.connect_async( + self._mqtt_host, self._mqtt_port, + keepalive=self._mqtt_keepalive, + ) + self._mqtt_client.loop_start() + except Exception as exc: + self.get_logger().warning("MQTT initial connect error: %s", str(exc)) + + def _on_mqtt_connect(self, client, userdata, flags, rc) -> None: + if rc == 0: + self._mqtt_connected = True + self.get_logger().info( + "MQTT connected to %s:%d", self._mqtt_host, self._mqtt_port + ) + else: + self.get_logger().warning("MQTT connect failed rc=%d", rc) + + def _on_mqtt_disconnect(self, client, userdata, rc) -> None: + self._mqtt_connected = False + if rc != 0: + self.get_logger().warning( + "MQTT disconnected (rc=%d) — paho will reconnect", rc + ) + + # ── ROS2 subscriber callback ────────────────────────────────────────────── + + def _on_vesc(self, ros_msg: String, mqtt_topic: str, extractor) -> None: + """ + Handle an incoming VESC ROS2 message. + + 1. Parse JSON from the String payload. + 2. Check rate limit — drop if too soon. + 3. Extract dashboard fields. + 4. Publish to MQTT. + """ + self._rx_count[mqtt_topic] += 1 + + # Rate limit + now = time.monotonic() + if now - self._last_pub[mqtt_topic] < self._min_interval: + self._drop_count[mqtt_topic] += 1 + return + + # Parse + try: + data = json.loads(ros_msg.data) + except (json.JSONDecodeError, UnicodeDecodeError) as exc: + self._err_count[mqtt_topic] += 1 + self.get_logger().debug("JSON error on %s: %s", mqtt_topic, exc) + return + + # Extract + try: + payload = extractor(data) + except (KeyError, TypeError, ValueError) as exc: + self._err_count[mqtt_topic] += 1 + self.get_logger().debug("Payload error on %s: %s — %s", mqtt_topic, exc, data) + return + + # Publish + if self._mqtt_client is not None and self._mqtt_connected: + try: + self._mqtt_client.publish( + mqtt_topic, + json.dumps(payload, separators=(",", ":")), + qos=0, + retain=False, + ) + self._last_pub[mqtt_topic] = now + self._pub_count[mqtt_topic] += 1 + except Exception as exc: + self._err_count[mqtt_topic] += 1 + self.get_logger().debug("MQTT publish error on %s: %s", mqtt_topic, exc) + else: + # Still update last_pub to avoid log spam when broker is down + self._last_pub[mqtt_topic] = now + self.get_logger().debug("MQTT not connected — dropped %s", mqtt_topic) + + # ── Cleanup ─────────────────────────────────────────────────────────────── + + def destroy_node(self) -> None: + if self._mqtt_client is not None: + self._mqtt_client.loop_stop() + self._mqtt_client.disconnect() + super().destroy_node() + + +# ── Entry point ─────────────────────────────────────────────────────────────── + +def main(args: Any = None) -> None: + rclpy.init(args=args) + node = VescMqttRelayNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.try_shutdown() + + +if __name__ == "__main__": + main() diff --git a/jetson/ros2_ws/src/saltybot_phone/setup.py b/jetson/ros2_ws/src/saltybot_phone/setup.py index b96b732..2fde1a2 100644 --- a/jetson/ros2_ws/src/saltybot_phone/setup.py +++ b/jetson/ros2_ws/src/saltybot_phone/setup.py @@ -30,6 +30,7 @@ setup( 'phone_bridge = saltybot_phone.ws_bridge:main', 'phone_camera_node = saltybot_phone.phone_camera_node:main', 'mqtt_ros2_bridge = saltybot_phone.mqtt_ros2_bridge_node:main', + 'vesc_mqtt_relay = saltybot_phone.vesc_mqtt_relay_node:main', ], }, ) \ No newline at end of file diff --git a/jetson/ros2_ws/src/saltybot_phone/test/__init__.py b/jetson/ros2_ws/src/saltybot_phone/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_phone/test/test_vesc_mqtt_relay.py b/jetson/ros2_ws/src/saltybot_phone/test/test_vesc_mqtt_relay.py new file mode 100644 index 0000000..2136c55 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_phone/test/test_vesc_mqtt_relay.py @@ -0,0 +1,343 @@ +"""Unit tests for vesc_mqtt_relay_node — pure-logic helpers. + +Does not require ROS2, paho-mqtt, or a live VESC/broker. +Tests cover the two payload extractors and the rate-limiting logic. +""" + +import json +import sys +import time +import types + +# ── Stub out ROS2 / paho so the module can be imported without them ─────────── + +def _make_rclpy_stub(): + rclpy = types.ModuleType("rclpy") + node_mod = types.ModuleType("rclpy.node") + + class _Node: + def __init__(self, *a, **kw): pass + def declare_parameter(self, *a, **kw): pass + def get_parameter(self, name): + class _P: + value = None + return _P() + def create_subscription(self, *a, **kw): pass + def create_publisher(self, *a, **kw): pass + def create_timer(self, *a, **kw): pass + def get_clock(self): return None + def get_logger(self): + class _L: + def info(self, *a, **kw): pass + def warning(self, *a, **kw): pass + def error(self, *a, **kw): pass + def debug(self, *a, **kw): pass + return _L() + def destroy_node(self): pass + + node_mod.Node = _Node + rclpy.node = node_mod + rclpy.init = lambda *a, **kw: None + rclpy.spin = lambda *a, **kw: None + rclpy.try_shutdown = lambda *a, **kw: None + + std_msgs_mod = types.ModuleType("std_msgs") + std_msgs_msg = types.ModuleType("std_msgs.msg") + class _String: + data: str = "" + std_msgs_msg.String = _String + std_msgs_mod.msg = std_msgs_msg + + paho_mod = types.ModuleType("paho") + paho_mqtt = types.ModuleType("paho.mqtt") + paho_client = types.ModuleType("paho.mqtt.client") + class _Client: + def __init__(self, *a, **kw): pass + def connect_async(self, *a, **kw): pass + def loop_start(self): pass + def loop_stop(self): pass + def disconnect(self): pass + def publish(self, *a, **kw): pass + def reconnect_delay_set(self, *a, **kw): pass + on_connect = None + on_disconnect = None + paho_client.Client = _Client + paho_mqtt.client = paho_client + paho_mod.mqtt = paho_mqtt + + for name, mod in [ + ("rclpy", rclpy), + ("rclpy.node", node_mod), + ("std_msgs", std_msgs_mod), + ("std_msgs.msg", std_msgs_msg), + ("paho", paho_mod), + ("paho.mqtt", paho_mqtt), + ("paho.mqtt.client", paho_client), + ]: + sys.modules.setdefault(name, mod) + + +_make_rclpy_stub() + +from saltybot_phone.vesc_mqtt_relay_node import ( + _MQTT_VESC_LEFT, + _MQTT_VESC_RIGHT, + _MQTT_VESC_COMBINED, + _extract_combined_payload, + _extract_motor_payload, +) + + +# --------------------------------------------------------------------------- +# _extract_motor_payload +# --------------------------------------------------------------------------- + +class TestExtractMotorPayload: + """Covers field extraction and type coercion for per-motor JSON.""" + + def _sample(self, **overrides): + base = { + "can_id": 61, + "rpm": 1500, + "current_a": 12.34, + "current_in_a": 10.0, + "duty_cycle": 0.4500, + "voltage_v": 25.20, + "temp_fet_c": 45.5, + "temp_motor_c": 62.0, + "fault_code": 0, + "fault_name": "NONE", + "alive": True, + "stamp": 1000.0, + } + base.update(overrides) + return base + + def test_required_keys_present(self): + p = _extract_motor_payload(self._sample()) + for key in ("rpm", "current_a", "voltage_v", "temperature_c", + "duty_cycle", "fault_code", "ts"): + assert key in p, f"missing key: {key}" + + def test_rpm_is_int(self): + p = _extract_motor_payload(self._sample(rpm=1500)) + assert isinstance(p["rpm"], int) + assert p["rpm"] == 1500 + + def test_temperature_maps_to_temp_fet_c(self): + p = _extract_motor_payload(self._sample(temp_fet_c=55.5)) + assert p["temperature_c"] == 55.5 + + def test_voltage_v(self): + p = _extract_motor_payload(self._sample(voltage_v=24.8)) + assert p["voltage_v"] == 24.8 + + def test_duty_cycle(self): + p = _extract_motor_payload(self._sample(duty_cycle=0.75)) + assert p["duty_cycle"] == 0.75 + + def test_fault_code_zero(self): + p = _extract_motor_payload(self._sample(fault_code=0)) + assert p["fault_code"] == 0 + + def test_fault_code_nonzero(self): + p = _extract_motor_payload(self._sample(fault_code=3)) + assert p["fault_code"] == 3 + + def test_ts_from_stamp(self): + p = _extract_motor_payload(self._sample(stamp=12345.678)) + assert p["ts"] == 12345.678 + + def test_negative_rpm(self): + p = _extract_motor_payload(self._sample(rpm=-3000)) + assert p["rpm"] == -3000 + + def test_negative_current(self): + p = _extract_motor_payload(self._sample(current_a=-5.0)) + assert p["current_a"] == -5.0 + + def test_negative_duty_cycle(self): + p = _extract_motor_payload(self._sample(duty_cycle=-0.5)) + assert p["duty_cycle"] == -0.5 + + def test_missing_required_key_raises(self): + bad = self._sample() + del bad["rpm"] + try: + _extract_motor_payload(bad) + assert False, "expected KeyError" + except KeyError: + pass + + def test_missing_stamp_uses_current_time(self): + data = self._sample() + del data["stamp"] + before = time.time() + p = _extract_motor_payload(data) + after = time.time() + assert before <= p["ts"] <= after + + def test_json_roundtrip(self): + p = _extract_motor_payload(self._sample()) + raw = json.dumps(p) + restored = json.loads(raw) + assert restored["rpm"] == p["rpm"] + assert restored["fault_code"] == p["fault_code"] + + +# --------------------------------------------------------------------------- +# _extract_combined_payload +# --------------------------------------------------------------------------- + +class TestExtractCombinedPayload: + def _sample(self, **overrides): + base = { + "voltage_v": 25.2, + "total_current_a": 18.5, + "left_rpm": 1400, + "right_rpm": 1420, + "left_alive": True, + "right_alive": True, + "stamp": 2000.0, + } + base.update(overrides) + return base + + def test_required_keys_present(self): + p = _extract_combined_payload(self._sample()) + for key in ("voltage_v", "total_current_a", "left_rpm", "right_rpm", + "left_alive", "right_alive", "ts"): + assert key in p, f"missing key: {key}" + + def test_voltage_v(self): + p = _extract_combined_payload(self._sample(voltage_v=24.0)) + assert p["voltage_v"] == 24.0 + + def test_total_current_a(self): + p = _extract_combined_payload(self._sample(total_current_a=30.5)) + assert p["total_current_a"] == 30.5 + + def test_rpms_are_int(self): + p = _extract_combined_payload(self._sample(left_rpm=1000, right_rpm=1050)) + assert isinstance(p["left_rpm"], int) + assert isinstance(p["right_rpm"], int) + + def test_alive_flags(self): + p = _extract_combined_payload(self._sample(left_alive=True, right_alive=False)) + assert p["left_alive"] is True + assert p["right_alive"] is False + + def test_ts_from_stamp(self): + p = _extract_combined_payload(self._sample(stamp=9999.1)) + assert p["ts"] == 9999.1 + + def test_missing_stamp_uses_current_time(self): + data = self._sample() + del data["stamp"] + before = time.time() + p = _extract_combined_payload(data) + after = time.time() + assert before <= p["ts"] <= after + + def test_json_roundtrip(self): + p = _extract_combined_payload(self._sample()) + raw = json.dumps(p) + restored = json.loads(raw) + assert restored["voltage_v"] == p["voltage_v"] + + +# --------------------------------------------------------------------------- +# Rate-limit logic (isolated, no ROS2 / paho) +# --------------------------------------------------------------------------- + +class TestRateLimit: + """ + Exercise the rate-limiting gate that lives inside VescMqttRelayNode._on_vesc. + We test the guard condition directly without instantiating the ROS2 node. + """ + + def _make_gate(self, rate_hz: float): + """ + Return a stateful callable that mirrors the rate-limit check in the node. + Returns True if a publish should proceed, False if the message is dropped. + """ + min_interval = 1.0 / rate_hz + state = {"last": 0.0} + + def gate(now: float) -> bool: + if now - state["last"] < min_interval: + return False + state["last"] = now + return True + + return gate + + def test_first_message_always_passes(self): + gate = self._make_gate(5.0) + assert gate(time.monotonic()) is True + + def test_immediate_second_message_dropped(self): + gate = self._make_gate(5.0) + t = time.monotonic() + gate(t) + assert gate(t + 0.001) is False # 1 ms < 200 ms interval + + def test_message_after_interval_passes(self): + gate = self._make_gate(5.0) + t = time.monotonic() + gate(t) + assert gate(t + 0.201) is True # 201 ms > 200 ms interval + + def test_exactly_at_interval_dropped(self): + gate = self._make_gate(5.0) + t = time.monotonic() + gate(t) + # Exactly at the boundary is strictly less-than, so it's dropped + assert gate(t + 0.2) is False + + def test_10hz_rate(self): + gate = self._make_gate(10.0) + t = time.monotonic() + gate(t) + assert gate(t + 0.09) is False # 90 ms < 100 ms + assert gate(t + 0.101) is True # 101 ms > 100 ms + + def test_1hz_rate(self): + gate = self._make_gate(1.0) + t = time.monotonic() + gate(t) + assert gate(t + 0.999) is False + assert gate(t + 1.001) is True + + def test_multiple_topics_independent(self): + gate_left = self._make_gate(5.0) + gate_right = self._make_gate(5.0) + t = time.monotonic() + gate_left(t) + gate_right(t) + # left: drop, right: drop + assert gate_left(t + 0.05) is False + assert gate_right(t + 0.05) is False + # advance only left past interval + assert gate_left(t + 0.21) is True + assert gate_right(t + 0.21) is True + + +# --------------------------------------------------------------------------- +# MQTT topic constant checks +# --------------------------------------------------------------------------- + +class TestTopicConstants: + def test_left_topic(self): + assert _MQTT_VESC_LEFT == "saltybot/phone/vesc_left" + + def test_right_topic(self): + assert _MQTT_VESC_RIGHT == "saltybot/phone/vesc_right" + + def test_combined_topic(self): + assert _MQTT_VESC_COMBINED == "saltybot/phone/vesc_combined" + + +if __name__ == "__main__": + import pytest + pytest.main([__file__, "-v"])