Merge pull request 'feat: VESC MQTT telemetry relay (Issue #656)' (#660) from sl-android/issue-656-vesc-mqtt-relay into main

This commit is contained in:
sl-jetson 2026-03-18 07:55:42 -04:00
commit 6561e35fd6
4 changed files with 649 additions and 0 deletions

View File

@ -0,0 +1,305 @@
#!/usr/bin/env python3
"""
vesc_mqtt_relay_node.py ROS2 MQTT relay for VESC CAN telemetry (Issue #656)
Subscribes to VESC telemetry ROS2 topics and republishes as MQTT JSON payloads
for the sensor dashboard. Each per-motor topic is rate-limited to 5 Hz.
ROS2 topics consumed
/vesc/left/state std_msgs/String JSON from vesc_telemetry_node
/vesc/right/state std_msgs/String JSON from vesc_telemetry_node
/vesc/combined std_msgs/String JSON from vesc_telemetry_node
MQTT topics published
saltybot/phone/vesc_left per-motor telemetry (left)
saltybot/phone/vesc_right per-motor telemetry (right)
saltybot/phone/vesc_combined combined voltage + total current + RPMs
MQTT payload (per-motor)
{
"rpm": int,
"current_a": float, # phase current
"voltage_v": float, # bus voltage
"temperature_c": float, # FET temperature
"duty_cycle": float, # -1.0 … 1.0
"fault_code": int,
"ts": float # epoch seconds
}
Parameters
mqtt_host str Broker IP/hostname (default: localhost)
mqtt_port int Broker port (default: 1883)
mqtt_keepalive int Keepalive seconds (default: 60)
reconnect_delay_s float Delay between reconnects (default: 5.0)
motor_rate_hz float Max publish rate per motor (default: 5.0)
Dependencies
pip install paho-mqtt
"""
import json
import time
from typing import Any
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
try:
import paho.mqtt.client as mqtt
_MQTT_OK = True
except ImportError:
_MQTT_OK = False
# ── MQTT topic constants ──────────────────────────────────────────────────────
_MQTT_VESC_LEFT = "saltybot/phone/vesc_left"
_MQTT_VESC_RIGHT = "saltybot/phone/vesc_right"
_MQTT_VESC_COMBINED = "saltybot/phone/vesc_combined"
# ── ROS2 topic constants ──────────────────────────────────────────────────────
_ROS_VESC_LEFT = "/vesc/left/state"
_ROS_VESC_RIGHT = "/vesc/right/state"
_ROS_VESC_COMBINED = "/vesc/combined"
def _extract_motor_payload(data: dict) -> dict:
"""
Extract the required fields from a vesc_telemetry_node per-motor JSON dict.
Upstream JSON keys (from vesc_telemetry_node._state_to_dict):
rpm, current_a, voltage_v, temp_fet_c, duty_cycle, fault_code, ...
Returns a dashboard-friendly payload with a stable key set.
"""
return {
"rpm": int(data["rpm"]),
"current_a": float(data["current_a"]),
"voltage_v": float(data["voltage_v"]),
"temperature_c": float(data["temp_fet_c"]),
"duty_cycle": float(data["duty_cycle"]),
"fault_code": int(data["fault_code"]),
"ts": float(data.get("stamp", time.time())),
}
def _extract_combined_payload(data: dict) -> dict:
"""
Extract fields from the /vesc/combined JSON dict.
Upstream keys: voltage_v, total_current_a, left_rpm, right_rpm,
left_alive, right_alive, stamp
"""
return {
"voltage_v": float(data["voltage_v"]),
"total_current_a": float(data["total_current_a"]),
"left_rpm": int(data["left_rpm"]),
"right_rpm": int(data["right_rpm"]),
"left_alive": bool(data["left_alive"]),
"right_alive": bool(data["right_alive"]),
"ts": float(data.get("stamp", time.time())),
}
class VescMqttRelayNode(Node):
"""
Subscribes to VESC ROS2 topics and relays telemetry to MQTT.
Rate limiting: each per-motor topic maintains a last-publish timestamp;
messages arriving faster than motor_rate_hz are silently dropped.
The /vesc/combined topic is also rate-limited at the same rate.
"""
def __init__(self) -> None:
super().__init__("vesc_mqtt_relay")
# ── Parameters ────────────────────────────────────────────────────────
self.declare_parameter("mqtt_host", "localhost")
self.declare_parameter("mqtt_port", 1883)
self.declare_parameter("mqtt_keepalive", 60)
self.declare_parameter("reconnect_delay_s", 5.0)
self.declare_parameter("motor_rate_hz", 5.0)
self._mqtt_host = self.get_parameter("mqtt_host").value
self._mqtt_port = self.get_parameter("mqtt_port").value
self._mqtt_keepalive = self.get_parameter("mqtt_keepalive").value
reconnect_delay = self.get_parameter("reconnect_delay_s").value
rate_hz = max(0.1, float(self.get_parameter("motor_rate_hz").value))
self._min_interval = 1.0 / rate_hz
if not _MQTT_OK:
self.get_logger().error(
"paho-mqtt not installed — run: pip install paho-mqtt"
)
# ── Rate-limit state (last publish epoch per MQTT topic) ──────────────
self._last_pub: dict[str, float] = {
_MQTT_VESC_LEFT: 0.0,
_MQTT_VESC_RIGHT: 0.0,
_MQTT_VESC_COMBINED: 0.0,
}
# ── Stats ─────────────────────────────────────────────────────────────
self._rx_count: dict[str, int] = {
_MQTT_VESC_LEFT: 0, _MQTT_VESC_RIGHT: 0, _MQTT_VESC_COMBINED: 0
}
self._pub_count: dict[str, int] = {
_MQTT_VESC_LEFT: 0, _MQTT_VESC_RIGHT: 0, _MQTT_VESC_COMBINED: 0
}
self._drop_count: dict[str, int] = {
_MQTT_VESC_LEFT: 0, _MQTT_VESC_RIGHT: 0, _MQTT_VESC_COMBINED: 0
}
self._err_count: dict[str, int] = {
_MQTT_VESC_LEFT: 0, _MQTT_VESC_RIGHT: 0, _MQTT_VESC_COMBINED: 0
}
self._mqtt_connected = False
# ── ROS2 subscriptions ────────────────────────────────────────────────
self.create_subscription(
String, _ROS_VESC_LEFT,
lambda msg: self._on_vesc(msg, _MQTT_VESC_LEFT, _extract_motor_payload),
10,
)
self.create_subscription(
String, _ROS_VESC_RIGHT,
lambda msg: self._on_vesc(msg, _MQTT_VESC_RIGHT, _extract_motor_payload),
10,
)
self.create_subscription(
String, _ROS_VESC_COMBINED,
lambda msg: self._on_vesc(msg, _MQTT_VESC_COMBINED, _extract_combined_payload),
10,
)
# ── MQTT client ───────────────────────────────────────────────────────
self._mqtt_client: "mqtt.Client | None" = None
if _MQTT_OK:
self._mqtt_client = mqtt.Client(
client_id="saltybot-vesc-mqtt-relay", clean_session=True
)
self._mqtt_client.on_connect = self._on_mqtt_connect
self._mqtt_client.on_disconnect = self._on_mqtt_disconnect
self._mqtt_client.reconnect_delay_set(
min_delay=int(reconnect_delay),
max_delay=int(reconnect_delay) * 4,
)
self._mqtt_connect()
self.get_logger().info(
"VESC→MQTT relay started — broker=%s:%d rate=%.1f Hz",
self._mqtt_host, self._mqtt_port, rate_hz,
)
# ── MQTT connection ───────────────────────────────────────────────────────
def _mqtt_connect(self) -> None:
try:
self._mqtt_client.connect_async(
self._mqtt_host, self._mqtt_port,
keepalive=self._mqtt_keepalive,
)
self._mqtt_client.loop_start()
except Exception as exc:
self.get_logger().warning("MQTT initial connect error: %s", str(exc))
def _on_mqtt_connect(self, client, userdata, flags, rc) -> None:
if rc == 0:
self._mqtt_connected = True
self.get_logger().info(
"MQTT connected to %s:%d", self._mqtt_host, self._mqtt_port
)
else:
self.get_logger().warning("MQTT connect failed rc=%d", rc)
def _on_mqtt_disconnect(self, client, userdata, rc) -> None:
self._mqtt_connected = False
if rc != 0:
self.get_logger().warning(
"MQTT disconnected (rc=%d) — paho will reconnect", rc
)
# ── ROS2 subscriber callback ──────────────────────────────────────────────
def _on_vesc(self, ros_msg: String, mqtt_topic: str, extractor) -> None:
"""
Handle an incoming VESC ROS2 message.
1. Parse JSON from the String payload.
2. Check rate limit drop if too soon.
3. Extract dashboard fields.
4. Publish to MQTT.
"""
self._rx_count[mqtt_topic] += 1
# Rate limit
now = time.monotonic()
if now - self._last_pub[mqtt_topic] < self._min_interval:
self._drop_count[mqtt_topic] += 1
return
# Parse
try:
data = json.loads(ros_msg.data)
except (json.JSONDecodeError, UnicodeDecodeError) as exc:
self._err_count[mqtt_topic] += 1
self.get_logger().debug("JSON error on %s: %s", mqtt_topic, exc)
return
# Extract
try:
payload = extractor(data)
except (KeyError, TypeError, ValueError) as exc:
self._err_count[mqtt_topic] += 1
self.get_logger().debug("Payload error on %s: %s%s", mqtt_topic, exc, data)
return
# Publish
if self._mqtt_client is not None and self._mqtt_connected:
try:
self._mqtt_client.publish(
mqtt_topic,
json.dumps(payload, separators=(",", ":")),
qos=0,
retain=False,
)
self._last_pub[mqtt_topic] = now
self._pub_count[mqtt_topic] += 1
except Exception as exc:
self._err_count[mqtt_topic] += 1
self.get_logger().debug("MQTT publish error on %s: %s", mqtt_topic, exc)
else:
# Still update last_pub to avoid log spam when broker is down
self._last_pub[mqtt_topic] = now
self.get_logger().debug("MQTT not connected — dropped %s", mqtt_topic)
# ── Cleanup ───────────────────────────────────────────────────────────────
def destroy_node(self) -> None:
if self._mqtt_client is not None:
self._mqtt_client.loop_stop()
self._mqtt_client.disconnect()
super().destroy_node()
# ── Entry point ───────────────────────────────────────────────────────────────
def main(args: Any = None) -> None:
rclpy.init(args=args)
node = VescMqttRelayNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.try_shutdown()
if __name__ == "__main__":
main()

View File

@ -30,6 +30,7 @@ setup(
'phone_bridge = saltybot_phone.ws_bridge:main', 'phone_bridge = saltybot_phone.ws_bridge:main',
'phone_camera_node = saltybot_phone.phone_camera_node:main', 'phone_camera_node = saltybot_phone.phone_camera_node:main',
'mqtt_ros2_bridge = saltybot_phone.mqtt_ros2_bridge_node:main', 'mqtt_ros2_bridge = saltybot_phone.mqtt_ros2_bridge_node:main',
'vesc_mqtt_relay = saltybot_phone.vesc_mqtt_relay_node:main',
], ],
}, },
) )

View File

@ -0,0 +1,343 @@
"""Unit tests for vesc_mqtt_relay_node — pure-logic helpers.
Does not require ROS2, paho-mqtt, or a live VESC/broker.
Tests cover the two payload extractors and the rate-limiting logic.
"""
import json
import sys
import time
import types
# ── Stub out ROS2 / paho so the module can be imported without them ───────────
def _make_rclpy_stub():
rclpy = types.ModuleType("rclpy")
node_mod = types.ModuleType("rclpy.node")
class _Node:
def __init__(self, *a, **kw): pass
def declare_parameter(self, *a, **kw): pass
def get_parameter(self, name):
class _P:
value = None
return _P()
def create_subscription(self, *a, **kw): pass
def create_publisher(self, *a, **kw): pass
def create_timer(self, *a, **kw): pass
def get_clock(self): return None
def get_logger(self):
class _L:
def info(self, *a, **kw): pass
def warning(self, *a, **kw): pass
def error(self, *a, **kw): pass
def debug(self, *a, **kw): pass
return _L()
def destroy_node(self): pass
node_mod.Node = _Node
rclpy.node = node_mod
rclpy.init = lambda *a, **kw: None
rclpy.spin = lambda *a, **kw: None
rclpy.try_shutdown = lambda *a, **kw: None
std_msgs_mod = types.ModuleType("std_msgs")
std_msgs_msg = types.ModuleType("std_msgs.msg")
class _String:
data: str = ""
std_msgs_msg.String = _String
std_msgs_mod.msg = std_msgs_msg
paho_mod = types.ModuleType("paho")
paho_mqtt = types.ModuleType("paho.mqtt")
paho_client = types.ModuleType("paho.mqtt.client")
class _Client:
def __init__(self, *a, **kw): pass
def connect_async(self, *a, **kw): pass
def loop_start(self): pass
def loop_stop(self): pass
def disconnect(self): pass
def publish(self, *a, **kw): pass
def reconnect_delay_set(self, *a, **kw): pass
on_connect = None
on_disconnect = None
paho_client.Client = _Client
paho_mqtt.client = paho_client
paho_mod.mqtt = paho_mqtt
for name, mod in [
("rclpy", rclpy),
("rclpy.node", node_mod),
("std_msgs", std_msgs_mod),
("std_msgs.msg", std_msgs_msg),
("paho", paho_mod),
("paho.mqtt", paho_mqtt),
("paho.mqtt.client", paho_client),
]:
sys.modules.setdefault(name, mod)
_make_rclpy_stub()
from saltybot_phone.vesc_mqtt_relay_node import (
_MQTT_VESC_LEFT,
_MQTT_VESC_RIGHT,
_MQTT_VESC_COMBINED,
_extract_combined_payload,
_extract_motor_payload,
)
# ---------------------------------------------------------------------------
# _extract_motor_payload
# ---------------------------------------------------------------------------
class TestExtractMotorPayload:
"""Covers field extraction and type coercion for per-motor JSON."""
def _sample(self, **overrides):
base = {
"can_id": 61,
"rpm": 1500,
"current_a": 12.34,
"current_in_a": 10.0,
"duty_cycle": 0.4500,
"voltage_v": 25.20,
"temp_fet_c": 45.5,
"temp_motor_c": 62.0,
"fault_code": 0,
"fault_name": "NONE",
"alive": True,
"stamp": 1000.0,
}
base.update(overrides)
return base
def test_required_keys_present(self):
p = _extract_motor_payload(self._sample())
for key in ("rpm", "current_a", "voltage_v", "temperature_c",
"duty_cycle", "fault_code", "ts"):
assert key in p, f"missing key: {key}"
def test_rpm_is_int(self):
p = _extract_motor_payload(self._sample(rpm=1500))
assert isinstance(p["rpm"], int)
assert p["rpm"] == 1500
def test_temperature_maps_to_temp_fet_c(self):
p = _extract_motor_payload(self._sample(temp_fet_c=55.5))
assert p["temperature_c"] == 55.5
def test_voltage_v(self):
p = _extract_motor_payload(self._sample(voltage_v=24.8))
assert p["voltage_v"] == 24.8
def test_duty_cycle(self):
p = _extract_motor_payload(self._sample(duty_cycle=0.75))
assert p["duty_cycle"] == 0.75
def test_fault_code_zero(self):
p = _extract_motor_payload(self._sample(fault_code=0))
assert p["fault_code"] == 0
def test_fault_code_nonzero(self):
p = _extract_motor_payload(self._sample(fault_code=3))
assert p["fault_code"] == 3
def test_ts_from_stamp(self):
p = _extract_motor_payload(self._sample(stamp=12345.678))
assert p["ts"] == 12345.678
def test_negative_rpm(self):
p = _extract_motor_payload(self._sample(rpm=-3000))
assert p["rpm"] == -3000
def test_negative_current(self):
p = _extract_motor_payload(self._sample(current_a=-5.0))
assert p["current_a"] == -5.0
def test_negative_duty_cycle(self):
p = _extract_motor_payload(self._sample(duty_cycle=-0.5))
assert p["duty_cycle"] == -0.5
def test_missing_required_key_raises(self):
bad = self._sample()
del bad["rpm"]
try:
_extract_motor_payload(bad)
assert False, "expected KeyError"
except KeyError:
pass
def test_missing_stamp_uses_current_time(self):
data = self._sample()
del data["stamp"]
before = time.time()
p = _extract_motor_payload(data)
after = time.time()
assert before <= p["ts"] <= after
def test_json_roundtrip(self):
p = _extract_motor_payload(self._sample())
raw = json.dumps(p)
restored = json.loads(raw)
assert restored["rpm"] == p["rpm"]
assert restored["fault_code"] == p["fault_code"]
# ---------------------------------------------------------------------------
# _extract_combined_payload
# ---------------------------------------------------------------------------
class TestExtractCombinedPayload:
def _sample(self, **overrides):
base = {
"voltage_v": 25.2,
"total_current_a": 18.5,
"left_rpm": 1400,
"right_rpm": 1420,
"left_alive": True,
"right_alive": True,
"stamp": 2000.0,
}
base.update(overrides)
return base
def test_required_keys_present(self):
p = _extract_combined_payload(self._sample())
for key in ("voltage_v", "total_current_a", "left_rpm", "right_rpm",
"left_alive", "right_alive", "ts"):
assert key in p, f"missing key: {key}"
def test_voltage_v(self):
p = _extract_combined_payload(self._sample(voltage_v=24.0))
assert p["voltage_v"] == 24.0
def test_total_current_a(self):
p = _extract_combined_payload(self._sample(total_current_a=30.5))
assert p["total_current_a"] == 30.5
def test_rpms_are_int(self):
p = _extract_combined_payload(self._sample(left_rpm=1000, right_rpm=1050))
assert isinstance(p["left_rpm"], int)
assert isinstance(p["right_rpm"], int)
def test_alive_flags(self):
p = _extract_combined_payload(self._sample(left_alive=True, right_alive=False))
assert p["left_alive"] is True
assert p["right_alive"] is False
def test_ts_from_stamp(self):
p = _extract_combined_payload(self._sample(stamp=9999.1))
assert p["ts"] == 9999.1
def test_missing_stamp_uses_current_time(self):
data = self._sample()
del data["stamp"]
before = time.time()
p = _extract_combined_payload(data)
after = time.time()
assert before <= p["ts"] <= after
def test_json_roundtrip(self):
p = _extract_combined_payload(self._sample())
raw = json.dumps(p)
restored = json.loads(raw)
assert restored["voltage_v"] == p["voltage_v"]
# ---------------------------------------------------------------------------
# Rate-limit logic (isolated, no ROS2 / paho)
# ---------------------------------------------------------------------------
class TestRateLimit:
"""
Exercise the rate-limiting gate that lives inside VescMqttRelayNode._on_vesc.
We test the guard condition directly without instantiating the ROS2 node.
"""
def _make_gate(self, rate_hz: float):
"""
Return a stateful callable that mirrors the rate-limit check in the node.
Returns True if a publish should proceed, False if the message is dropped.
"""
min_interval = 1.0 / rate_hz
state = {"last": 0.0}
def gate(now: float) -> bool:
if now - state["last"] < min_interval:
return False
state["last"] = now
return True
return gate
def test_first_message_always_passes(self):
gate = self._make_gate(5.0)
assert gate(time.monotonic()) is True
def test_immediate_second_message_dropped(self):
gate = self._make_gate(5.0)
t = time.monotonic()
gate(t)
assert gate(t + 0.001) is False # 1 ms < 200 ms interval
def test_message_after_interval_passes(self):
gate = self._make_gate(5.0)
t = time.monotonic()
gate(t)
assert gate(t + 0.201) is True # 201 ms > 200 ms interval
def test_exactly_at_interval_dropped(self):
gate = self._make_gate(5.0)
t = time.monotonic()
gate(t)
# Exactly at the boundary is strictly less-than, so it's dropped
assert gate(t + 0.2) is False
def test_10hz_rate(self):
gate = self._make_gate(10.0)
t = time.monotonic()
gate(t)
assert gate(t + 0.09) is False # 90 ms < 100 ms
assert gate(t + 0.101) is True # 101 ms > 100 ms
def test_1hz_rate(self):
gate = self._make_gate(1.0)
t = time.monotonic()
gate(t)
assert gate(t + 0.999) is False
assert gate(t + 1.001) is True
def test_multiple_topics_independent(self):
gate_left = self._make_gate(5.0)
gate_right = self._make_gate(5.0)
t = time.monotonic()
gate_left(t)
gate_right(t)
# left: drop, right: drop
assert gate_left(t + 0.05) is False
assert gate_right(t + 0.05) is False
# advance only left past interval
assert gate_left(t + 0.21) is True
assert gate_right(t + 0.21) is True
# ---------------------------------------------------------------------------
# MQTT topic constant checks
# ---------------------------------------------------------------------------
class TestTopicConstants:
def test_left_topic(self):
assert _MQTT_VESC_LEFT == "saltybot/phone/vesc_left"
def test_right_topic(self):
assert _MQTT_VESC_RIGHT == "saltybot/phone/vesc_right"
def test_combined_topic(self):
assert _MQTT_VESC_COMBINED == "saltybot/phone/vesc_combined"
if __name__ == "__main__":
import pytest
pytest.main([__file__, "-v"])