feat: VESC CAN telemetry for dual motors (Issue #645)

New saltybot_vesc_telemetry ROS2 package — SocketCAN (python-can, can0)
telemetry for dual FSESC 6.7 Pro (FW 6.6) on CAN IDs 61 (left) and 79 (right).

- vesc_can_protocol.py: STATUS/STATUS_4/STATUS_5 frame parsers, VescState
  dataclass, GET_VALUES request builder (CAN_PACKET_PROCESS_SHORT_BUFFER)
- vesc_telemetry_node.py: ROS2 node; background CAN RX thread; publishes
  /vesc/left/state, /vesc/right/state, /vesc/combined (JSON String msgs),
  /diagnostics (DiagnosticArray); overcurrent/overtemp/fault alerting;
  configurable poll rate 10-50 Hz (default 20 Hz)
- test_vesc_telemetry.py: 31 unit tests, all passing (no ROS/CAN required)
- config/vesc_telemetry_params.yaml, launch file

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
sl-firmware 2026-03-17 09:53:09 -04:00
parent 0fcad75cb4
commit b2c9f368f6
11 changed files with 917 additions and 0 deletions

View File

@ -0,0 +1,29 @@
# VESC CAN Telemetry Node — SaltyBot dual FSESC 6.7 Pro (FW 6.6)
# SocketCAN interface: can0 (SN65HVD230 transceiver on MAMBA F722S CAN2)
vesc_telemetry:
ros__parameters:
# SocketCAN interface name
can_interface: "can0"
# CAN IDs assigned to each VESC in VESC Tool
# Left motor VESC: ID 61 (0x3D)
# Right motor VESC: ID 79 (0x4F)
left_can_id: 61
right_can_id: 79
# This node's CAN ID used as sender_id in GET_VALUES requests (0-127)
sender_can_id: 127
# Telemetry publish rate and GET_VALUES poll rate (10-50 Hz)
poll_rate_hz: 20
# Fault alert thresholds
# Phase current threshold — FSESC 6.7 Pro rated 80 A peak; alert at 60 A
overcurrent_threshold_a: 60.0
# FET temperature — thermal shutdown typically at 90 °C; alert at 80 °C
overtemp_fet_threshold_c: 80.0
# Motor temperature — alert at 100 °C
overtemp_motor_threshold_c: 100.0

View File

@ -0,0 +1,44 @@
"""Launch file for VESC CAN telemetry node."""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
pkg_dir = get_package_share_directory("saltybot_vesc_telemetry")
config_file = os.path.join(pkg_dir, "config", "vesc_telemetry_params.yaml")
return LaunchDescription([
DeclareLaunchArgument(
"config_file",
default_value=config_file,
description="Path to vesc_telemetry_params.yaml",
),
DeclareLaunchArgument(
"can_interface",
default_value="can0",
description="SocketCAN interface (e.g. can0, vcan0 for testing)",
),
DeclareLaunchArgument(
"poll_rate_hz",
default_value="20",
description="Telemetry publish + GET_VALUES poll rate (10-50 Hz)",
),
Node(
package="saltybot_vesc_telemetry",
executable="vesc_telemetry_node",
name="vesc_telemetry",
output="screen",
parameters=[
LaunchConfiguration("config_file"),
{
"can_interface": LaunchConfiguration("can_interface"),
"poll_rate_hz": LaunchConfiguration("poll_rate_hz"),
},
],
),
])

View File

@ -0,0 +1,30 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_vesc_telemetry</name>
<version>0.1.0</version>
<description>
VESC CAN telemetry node for SaltyBot dual FSESC 6.7 Pro (FW 6.6).
Polls CAN IDs 61 (left) and 79 (right) via SocketCAN (python-can, can0).
Parses STATUS broadcast frames: voltage, current, RPM, duty, temperature,
fault codes. Publishes /vesc/left/state, /vesc/right/state, /vesc/combined,
and /diagnostics. Issues #645.
</description>
<maintainer email="sl-firmware@saltylab.local">sl-firmware</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>diagnostic_msgs</depend>
<buildtool_depend>ament_python</buildtool_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,176 @@
"""VESC CAN protocol constants and frame parsers (FW 6.6).
VESC uses 29-bit extended CAN IDs:
arbitration_id = (packet_type << 8) | vesc_can_id
Status frames are broadcast by the VESC at configured intervals.
GET_VALUES requests use CAN_PACKET_PROCESS_SHORT_BUFFER.
"""
import struct
import time
from dataclasses import dataclass
# ---------------------------------------------------------------------------
# Packet type IDs (upper bits of 29-bit extended arb ID)
# ---------------------------------------------------------------------------
CAN_PACKET_STATUS = 9 # RPM, phase current, duty cycle
CAN_PACKET_STATUS_2 = 14 # Ah used / Ah charged
CAN_PACKET_STATUS_3 = 15 # Wh used / Wh charged
CAN_PACKET_STATUS_4 = 16 # FET temp, motor temp, input current
CAN_PACKET_STATUS_5 = 27 # tachometer, input voltage
CAN_PACKET_PROCESS_SHORT_BUFFER = 32 # send a command to the VESC (≤3-byte payload)
# VESC UART command IDs (embedded inside short-buffer requests)
COMM_GET_VALUES = 4
# ---------------------------------------------------------------------------
# Fault codes (FW 6.6)
# ---------------------------------------------------------------------------
FAULT_CODE_NONE = 0
FAULT_CODE_OVER_VOLTAGE = 1
FAULT_CODE_UNDER_VOLTAGE = 2
FAULT_CODE_DRV = 3
FAULT_CODE_ABS_OVER_CURRENT = 4
FAULT_CODE_OVER_TEMP_FET = 5
FAULT_CODE_OVER_TEMP_MOTOR = 6
FAULT_CODE_GATE_DRIVER_OVER_VOLTAGE = 7
FAULT_CODE_GATE_DRIVER_UNDER_VOLTAGE = 8
FAULT_CODE_MCU_UNDER_VOLTAGE = 9
FAULT_CODE_BOOTING_FROM_WATCHDOG_RESET = 10
FAULT_NAMES: dict[int, str] = {
0: "NONE",
1: "OVER_VOLTAGE",
2: "UNDER_VOLTAGE",
3: "DRV",
4: "ABS_OVER_CURRENT",
5: "OVER_TEMP_FET",
6: "OVER_TEMP_MOTOR",
7: "GATE_DRIVER_OVER_VOLTAGE",
8: "GATE_DRIVER_UNDER_VOLTAGE",
9: "MCU_UNDER_VOLTAGE",
10: "BOOTING_FROM_WATCHDOG_RESET",
}
# ---------------------------------------------------------------------------
# State dataclass
# ---------------------------------------------------------------------------
@dataclass
class VescState:
"""Telemetry snapshot for one VESC, populated from status broadcast frames."""
can_id: int = 0
# From STATUS (9)
rpm: int = 0
current_a: float = 0.0 # phase current (A)
duty_cycle: float = 0.0 # -1.0 .. +1.0
# From STATUS_4 (16)
temp_fet_c: float = 0.0
temp_motor_c: float = 0.0
current_in_a: float = 0.0 # battery/input current (A)
# From STATUS_5 (27)
voltage_v: float = 0.0
# Fault (embedded in STATUS per FW 6.6 — lower byte of duty word)
fault_code: int = 0
# Timestamps (monotonic seconds)
last_status_ts: float = 0.0
last_status5_ts: float = 0.0
ALIVE_TIMEOUT_S: float = 1.0 # mark offline if no STATUS frame for this long
@property
def fault_name(self) -> str:
return FAULT_NAMES.get(self.fault_code, f"UNKNOWN_{self.fault_code}")
@property
def is_alive(self) -> bool:
return (time.monotonic() - self.last_status_ts) < self.ALIVE_TIMEOUT_S
@property
def has_fault(self) -> bool:
return self.fault_code != FAULT_CODE_NONE
# ---------------------------------------------------------------------------
# Frame parsers
# ---------------------------------------------------------------------------
def parse_status(data: bytes) -> dict | None:
"""Parse CAN_PACKET_STATUS (9): RPM, phase current, duty cycle.
Wire format (8 bytes, all big-endian):
bytes [0:4] RPM : int32
bytes [4:6] current×10 : int16 ÷10 = A
bytes [6:8] duty×1000 : int16 ÷1000 = fraction (-1..+1)
"""
if len(data) < 8:
return None
rpm, current_x10, duty_x1000 = struct.unpack_from(">ihh", data, 0)
return {
"rpm": rpm,
"current_a": current_x10 / 10.0,
"duty_cycle": duty_x1000 / 1000.0,
}
def parse_status_4(data: bytes) -> dict | None:
"""Parse CAN_PACKET_STATUS_4 (16): temperatures and input current.
Wire format (8 bytes, all big-endian):
bytes [0:2] temp_fet×10 : int16 ÷10 = °C
bytes [2:4] temp_motor×10 : int16 ÷10 = °C
bytes [4:6] current_in×10 : int16 ÷10 = A
bytes [6:8] pid_pos×50 : int16 (not used here)
"""
if len(data) < 8:
return None
tf_x10, tm_x10, ci_x10, _pid = struct.unpack_from(">hhhh", data, 0)
return {
"temp_fet_c": tf_x10 / 10.0,
"temp_motor_c": tm_x10 / 10.0,
"current_in_a": ci_x10 / 10.0,
}
def parse_status_5(data: bytes) -> dict | None:
"""Parse CAN_PACKET_STATUS_5 (27): input voltage (and tachometer).
Wire format (8 bytes, all big-endian):
bytes [0:4] tachometer : int32 (unused here)
bytes [4:6] v_in×10 : int16 ÷10 = V
bytes [6:8] reserved
"""
if len(data) < 6:
return None
_tacho, v_in_x10 = struct.unpack_from(">ih", data, 0)
return {
"voltage_v": v_in_x10 / 10.0,
}
# ---------------------------------------------------------------------------
# Request builder
# ---------------------------------------------------------------------------
def make_get_values_request(sender_id: int, target_id: int) -> tuple[int, bytes]:
"""Build a CAN_PACKET_PROCESS_SHORT_BUFFER GET_VALUES request frame.
The VESC responds by refreshing its STATUS broadcast cadence.
Args:
sender_id: This node's CAN ID (0-127; use 127 for a host controller).
target_id: CAN ID of the target VESC.
Returns:
(arbitration_id, data) pass directly to can.Message().
"""
arb_id = (CAN_PACKET_PROCESS_SHORT_BUFFER << 8) | (target_id & 0xFF)
payload = bytes([sender_id & 0xFF, 0x00, COMM_GET_VALUES])
return arb_id, payload

View File

@ -0,0 +1,367 @@
#!/usr/bin/env python3
"""VESC CAN telemetry node for SaltyBot dual FSESC 6.7 Pro (FW 6.6).
Listens on SocketCAN (can0) for VESC status broadcast frames from two VESCs
and publishes parsed telemetry to ROS2 topics. Also sends periodic GET_VALUES
requests to trigger fresh STATUS broadcasts.
Published topics
----------------
/vesc/left/state (std_msgs/String) JSON telemetry for left VESC
/vesc/right/state (std_msgs/String) JSON telemetry for right VESC
/vesc/combined (std_msgs/String) voltage, total current, both RPMs
/diagnostics (diagnostic_msgs/DiagnosticArray)
Parameters
----------
can_interface str 'can0' SocketCAN interface
left_can_id int 61 CAN ID of left VESC
right_can_id int 79 CAN ID of right VESC
poll_rate_hz int 20 Publish + request rate (10-50 Hz)
sender_can_id int 127 This node's CAN ID
overcurrent_threshold_a float 60.0 Fault alert threshold (A)
overtemp_fet_threshold_c float 80.0 FET overtemp alert threshold (°C)
overtemp_motor_threshold_c float 100.0 Motor overtemp alert threshold (°C)
"""
import json
import threading
import time
from typing import Optional
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
try:
import can
_CAN_AVAILABLE = True
except ImportError:
_CAN_AVAILABLE = False
from .vesc_can_protocol import (
CAN_PACKET_STATUS,
CAN_PACKET_STATUS_4,
CAN_PACKET_STATUS_5,
VescState,
make_get_values_request,
parse_status,
parse_status_4,
parse_status_5,
)
class VescTelemetryNode(Node):
"""ROS2 node: VESC CAN telemetry via SocketCAN."""
def __init__(self):
super().__init__("vesc_telemetry")
# ----------------------------------------------------------------
# Parameters
# ----------------------------------------------------------------
self.declare_parameter("can_interface", "can0")
self.declare_parameter("left_can_id", 61)
self.declare_parameter("right_can_id", 79)
self.declare_parameter("poll_rate_hz", 20)
self.declare_parameter("sender_can_id", 127)
self.declare_parameter("overcurrent_threshold_a", 60.0)
self.declare_parameter("overtemp_fet_threshold_c", 80.0)
self.declare_parameter("overtemp_motor_threshold_c", 100.0)
self._iface = self.get_parameter("can_interface").value
self._left_id = self.get_parameter("left_can_id").value
self._right_id = self.get_parameter("right_can_id").value
hz = int(max(10, min(50, self.get_parameter("poll_rate_hz").value)))
self._sender_id = self.get_parameter("sender_can_id").value
self._oc_thresh = self.get_parameter("overcurrent_threshold_a").value
self._otf_thresh = self.get_parameter("overtemp_fet_threshold_c").value
self._otm_thresh = self.get_parameter("overtemp_motor_threshold_c").value
# ----------------------------------------------------------------
# Per-VESC state (protected by _state_lock)
# ----------------------------------------------------------------
self._state: dict[int, VescState] = {
self._left_id: VescState(can_id=self._left_id),
self._right_id: VescState(can_id=self._right_id),
}
self._state_lock = threading.Lock()
# Track which faults we have already logged to avoid log spam
self._fault_logged: dict[int, int] = {
self._left_id: 0, self._right_id: 0
}
# ----------------------------------------------------------------
# Publishers
# ----------------------------------------------------------------
self._pub_left = self.create_publisher(String, "/vesc/left/state", 10)
self._pub_right = self.create_publisher(String, "/vesc/right/state", 10)
self._pub_combined = self.create_publisher(String, "/vesc/combined", 10)
self._pub_diag = self.create_publisher(DiagnosticArray, "/diagnostics", 10)
# ----------------------------------------------------------------
# SocketCAN
# ----------------------------------------------------------------
self._bus: Optional["can.BusABC"] = None
self._rx_thread: Optional[threading.Thread] = None
self._running = False
self._init_can()
# ----------------------------------------------------------------
# Publish + poll timer
# ----------------------------------------------------------------
self.create_timer(1.0 / hz, self._on_timer)
self.get_logger().info(
f"vesc_telemetry: iface={self._iface}, "
f"left_id={self._left_id}, right_id={self._right_id}, "
f"poll={hz} Hz, sender_id={self._sender_id}"
)
# ----------------------------------------------------------------
# CAN initialisation
# ----------------------------------------------------------------
def _init_can(self) -> None:
if not _CAN_AVAILABLE:
self.get_logger().error(
"python-can not installed — run: pip install python-can"
)
return
try:
self._bus = can.interface.Bus(channel=self._iface, bustype="socketcan")
self._running = True
self._rx_thread = threading.Thread(
target=self._rx_loop, name="vesc_can_rx", daemon=True
)
self._rx_thread.start()
self.get_logger().info(f"SocketCAN opened: {self._iface}")
except Exception as exc:
self.get_logger().error(f"Failed to open {self._iface}: {exc}")
# ----------------------------------------------------------------
# Receive loop (background thread)
# ----------------------------------------------------------------
def _rx_loop(self) -> None:
"""Drain SocketCAN frames and update VESC state."""
while self._running and self._bus is not None:
try:
msg = self._bus.recv(timeout=0.1)
except Exception:
continue
if msg is None or not msg.is_extended_id:
continue
self._dispatch_frame(msg.arbitration_id, bytes(msg.data))
def _dispatch_frame(self, arb_id: int, data: bytes) -> None:
"""Decode extended CAN ID and update the matching VescState."""
packet_type = (arb_id >> 8) & 0xFFFF
vesc_id = arb_id & 0xFF
if vesc_id not in self._state:
return
now = time.monotonic()
with self._state_lock:
s = self._state[vesc_id]
if packet_type == CAN_PACKET_STATUS:
parsed = parse_status(data)
if parsed:
s.rpm = parsed["rpm"]
s.current_a = parsed["current_a"]
s.duty_cycle = parsed["duty_cycle"]
s.last_status_ts = now
elif packet_type == CAN_PACKET_STATUS_4:
parsed = parse_status_4(data)
if parsed:
s.temp_fet_c = parsed["temp_fet_c"]
s.temp_motor_c = parsed["temp_motor_c"]
s.current_in_a = parsed["current_in_a"]
elif packet_type == CAN_PACKET_STATUS_5:
parsed = parse_status_5(data)
if parsed:
s.voltage_v = parsed["voltage_v"]
s.last_status5_ts = now
# ----------------------------------------------------------------
# Timer: poll + publish
# ----------------------------------------------------------------
def _on_timer(self) -> None:
self._send_poll_requests()
self._publish_all()
def _send_poll_requests(self) -> None:
"""Send GET_VALUES to both VESCs to prompt STATUS broadcast refresh."""
if self._bus is None:
return
for vesc_id in (self._left_id, self._right_id):
try:
arb_id, payload = make_get_values_request(self._sender_id, vesc_id)
self._bus.send(can.Message(
arbitration_id=arb_id,
data=payload,
is_extended_id=True,
))
except Exception as exc:
self.get_logger().debug(
f"GET_VALUES send failed (vesc_id={vesc_id}): {exc}"
)
def _publish_all(self) -> None:
with self._state_lock:
left = self._state[self._left_id]
right = self._state[self._right_id]
# Snapshot dicts while holding lock
l_dict = self._state_to_dict(left)
r_dict = self._state_to_dict(right)
left_alive = left.is_alive
right_alive = right.is_alive
voltage_v = left.voltage_v if left_alive else right.voltage_v
combined = {
"voltage_v": round(voltage_v, 2),
"total_current_a": round(left.current_in_a + right.current_in_a, 2),
"left_rpm": left.rpm,
"right_rpm": right.rpm,
"left_alive": left_alive,
"right_alive": right_alive,
"stamp": time.time(),
}
# Build diagnostics while holding lock so values are consistent
diag_array = self._build_diagnostics(left, right)
self._pub_left.publish(String(data=json.dumps(l_dict)))
self._pub_right.publish(String(data=json.dumps(r_dict)))
self._pub_combined.publish(String(data=json.dumps(combined)))
self._pub_diag.publish(diag_array)
# Fault alerting (outside lock — uses snapshots)
self._check_faults_from_dict(l_dict, self._left_id, "left")
self._check_faults_from_dict(r_dict, self._right_id, "right")
@staticmethod
def _state_to_dict(s: VescState) -> dict:
return {
"can_id": s.can_id,
"rpm": s.rpm,
"current_a": round(s.current_a, 2),
"current_in_a": round(s.current_in_a, 2),
"duty_cycle": round(s.duty_cycle, 4),
"voltage_v": round(s.voltage_v, 2),
"temp_fet_c": round(s.temp_fet_c, 1),
"temp_motor_c": round(s.temp_motor_c, 1),
"fault_code": s.fault_code,
"fault_name": s.fault_name,
"alive": s.is_alive,
"stamp": time.time(),
}
def _build_diagnostics(
self, left: VescState, right: VescState
) -> DiagnosticArray:
array = DiagnosticArray()
array.header.stamp = self.get_clock().now().to_msg()
for s, label in ((left, "left"), (right, "right")):
status = DiagnosticStatus()
status.name = f"VESC/{label} (CAN ID {s.can_id})"
status.hardware_id = f"vesc_can_{s.can_id}"
if not s.is_alive:
status.level = DiagnosticStatus.ERROR
status.message = "No CAN frames received (offline)"
elif s.has_fault:
status.level = DiagnosticStatus.ERROR
status.message = f"Fault: {s.fault_name}"
elif (
s.temp_fet_c > self._otf_thresh
or s.temp_motor_c > self._otm_thresh
or abs(s.current_a) > self._oc_thresh
):
status.level = DiagnosticStatus.WARN
status.message = "Threshold exceeded"
else:
status.level = DiagnosticStatus.OK
status.message = "OK"
status.values = [
KeyValue(key="rpm", value=str(s.rpm)),
KeyValue(key="current_a", value=f"{s.current_a:.2f}"),
KeyValue(key="current_in_a", value=f"{s.current_in_a:.2f}"),
KeyValue(key="duty_cycle", value=f"{s.duty_cycle:.4f}"),
KeyValue(key="voltage_v", value=f"{s.voltage_v:.2f}"),
KeyValue(key="temp_fet_c", value=f"{s.temp_fet_c:.1f}"),
KeyValue(key="temp_motor_c", value=f"{s.temp_motor_c:.1f}"),
KeyValue(key="fault_code", value=str(s.fault_code)),
KeyValue(key="fault_name", value=s.fault_name),
]
array.status.append(status)
return array
def _check_faults_from_dict(self, d: dict, vesc_id: int, label: str) -> None:
"""Log fault/threshold warnings; suppress repeated identical messages."""
if not d["alive"]:
return
if abs(d["current_a"]) > self._oc_thresh:
self.get_logger().warn(
f"VESC {label}: overcurrent {d['current_a']:.1f} A "
f"(threshold {self._oc_thresh:.0f} A)"
)
if d["temp_fet_c"] > self._otf_thresh:
self.get_logger().warn(
f"VESC {label}: FET overtemp {d['temp_fet_c']:.1f} °C "
f"(threshold {self._otf_thresh:.0f} °C)"
)
if d["temp_motor_c"] > self._otm_thresh:
self.get_logger().warn(
f"VESC {label}: motor overtemp {d['temp_motor_c']:.1f} °C "
f"(threshold {self._otm_thresh:.0f} °C)"
)
fault_code = d["fault_code"]
if fault_code != 0 and self._fault_logged.get(vesc_id) != fault_code:
self.get_logger().error(
f"VESC {label}: fault {fault_code} ({d['fault_name']})"
)
self._fault_logged[vesc_id] = fault_code
elif fault_code == 0 and self._fault_logged.get(vesc_id, 0) != 0:
self.get_logger().info(f"VESC {label}: fault cleared")
self._fault_logged[vesc_id] = 0
# ----------------------------------------------------------------
# Cleanup
# ----------------------------------------------------------------
def destroy_node(self) -> None:
self._running = False
if self._rx_thread is not None:
self._rx_thread.join(timeout=1.0)
if self._bus is not None:
self._bus.shutdown()
super().destroy_node()
def main(args=None):
rclpy.init(args=args)
node = VescTelemetryNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/saltybot_vesc_telemetry
[install]
install_scripts=$base/lib/saltybot_vesc_telemetry

View File

@ -0,0 +1,27 @@
from setuptools import setup
package_name = "saltybot_vesc_telemetry"
setup(
name=package_name,
version="0.1.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(f"share/{package_name}/launch", ["launch/vesc_telemetry.launch.py"]),
(f"share/{package_name}/config", ["config/vesc_telemetry_params.yaml"]),
],
install_requires=["setuptools", "python-can"],
zip_safe=True,
maintainer="sl-firmware",
maintainer_email="sl-firmware@saltylab.local",
description="VESC CAN telemetry node for dual FSESC 6.7 Pro",
license="MIT",
tests_require=["pytest"],
entry_points={
"console_scripts": [
"vesc_telemetry_node = saltybot_vesc_telemetry.vesc_telemetry_node:main",
],
},
)

View File

@ -0,0 +1,240 @@
"""Unit tests for VESC CAN telemetry — protocol parsing and state logic.
These tests cover pure functions in vesc_can_protocol and do not require
ROS2, python-can, or a live CAN bus.
"""
import struct
import time
import pytest
from saltybot_vesc_telemetry.vesc_can_protocol import (
CAN_PACKET_STATUS,
CAN_PACKET_STATUS_4,
CAN_PACKET_STATUS_5,
CAN_PACKET_PROCESS_SHORT_BUFFER,
COMM_GET_VALUES,
FAULT_CODE_NONE,
FAULT_CODE_ABS_OVER_CURRENT,
FAULT_CODE_OVER_TEMP_FET,
FAULT_CODE_OVER_TEMP_MOTOR,
FAULT_NAMES,
VescState,
make_get_values_request,
parse_status,
parse_status_4,
parse_status_5,
)
# ---------------------------------------------------------------------------
# parse_status (CAN_PACKET_STATUS = 9)
# ---------------------------------------------------------------------------
class TestParseStatus:
def test_forward_rpm(self):
# 1000 RPM, 5.0 A, 0.300 duty
data = struct.pack(">ihh", 1000, 50, 300)
r = parse_status(data)
assert r is not None
assert r["rpm"] == 1000
assert r["current_a"] == pytest.approx(5.0)
assert r["duty_cycle"] == pytest.approx(0.300)
def test_reverse_rpm(self):
data = struct.pack(">ihh", -3000, -80, -500)
r = parse_status(data)
assert r["rpm"] == -3000
assert r["current_a"] == pytest.approx(-8.0)
assert r["duty_cycle"] == pytest.approx(-0.5)
def test_zero(self):
data = struct.pack(">ihh", 0, 0, 0)
r = parse_status(data)
assert r["rpm"] == 0
assert r["current_a"] == pytest.approx(0.0)
assert r["duty_cycle"] == pytest.approx(0.0)
def test_max_duty(self):
# duty 1.0 → encoded as 1000
data = struct.pack(">ihh", 50000, 200, 1000)
r = parse_status(data)
assert r["duty_cycle"] == pytest.approx(1.0)
def test_too_short_returns_none(self):
assert parse_status(b"\x00\x01\x02") is None
def test_exact_minimum_length(self):
data = struct.pack(">ihh", 42, 10, 100)
assert parse_status(data) is not None
# ---------------------------------------------------------------------------
# parse_status_4 (CAN_PACKET_STATUS_4 = 16)
# ---------------------------------------------------------------------------
class TestParseStatus4:
def _make(self, temp_fet, temp_motor, current_in, pid_pos=0):
return struct.pack(
">hhhh",
int(temp_fet * 10),
int(temp_motor * 10),
int(current_in * 10),
pid_pos,
)
def test_normal_temps(self):
data = self._make(45.5, 62.3, 12.0)
r = parse_status_4(data)
assert r is not None
assert r["temp_fet_c"] == pytest.approx(45.5)
assert r["temp_motor_c"] == pytest.approx(62.3)
assert r["current_in_a"] == pytest.approx(12.0)
def test_negative_current_in(self):
data = self._make(30.0, 40.0, -5.0)
r = parse_status_4(data)
assert r["current_in_a"] == pytest.approx(-5.0)
def test_too_short_returns_none(self):
assert parse_status_4(b"\x00" * 7) is None
def test_zero_values(self):
data = self._make(0, 0, 0)
r = parse_status_4(data)
assert r["temp_fet_c"] == pytest.approx(0.0)
assert r["temp_motor_c"] == pytest.approx(0.0)
assert r["current_in_a"] == pytest.approx(0.0)
# ---------------------------------------------------------------------------
# parse_status_5 (CAN_PACKET_STATUS_5 = 27)
# ---------------------------------------------------------------------------
class TestParseStatus5:
def _make(self, voltage_v, tacho=0):
return struct.pack(">ih", tacho, int(voltage_v * 10))
def test_nominal_voltage(self):
data = self._make(25.2)
r = parse_status_5(data)
assert r is not None
assert r["voltage_v"] == pytest.approx(25.2)
def test_low_voltage(self):
data = self._make(18.6)
r = parse_status_5(data)
assert r["voltage_v"] == pytest.approx(18.6)
def test_too_short_returns_none(self):
assert parse_status_5(b"\x00\x01\x02\x03\x04") is None
def test_with_tachometer(self):
data = struct.pack(">ih", 123456, 252)
r = parse_status_5(data)
assert r["voltage_v"] == pytest.approx(25.2)
# ---------------------------------------------------------------------------
# make_get_values_request
# ---------------------------------------------------------------------------
class TestMakeGetValuesRequest:
def test_arb_id_encodes_packet_type_and_target(self):
arb_id, payload = make_get_values_request(sender_id=127, target_id=61)
expected_arb = (CAN_PACKET_PROCESS_SHORT_BUFFER << 8) | 61
assert arb_id == expected_arb
def test_payload_contains_sender_command(self):
_, payload = make_get_values_request(sender_id=127, target_id=61)
assert len(payload) == 3
assert payload[0] == 127 # sender_id
assert payload[1] == 0x00 # send_mode
assert payload[2] == COMM_GET_VALUES
def test_right_vesc(self):
arb_id, payload = make_get_values_request(sender_id=127, target_id=79)
assert (arb_id & 0xFF) == 79
assert payload[2] == COMM_GET_VALUES
def test_sender_id_in_payload(self):
_, payload = make_get_values_request(sender_id=42, target_id=61)
assert payload[0] == 42
def test_arb_id_is_extended(self):
# Extended IDs can exceed 0x7FF (11-bit limit)
arb_id, _ = make_get_values_request(127, 61)
assert arb_id > 0x7FF
# ---------------------------------------------------------------------------
# VescState properties
# ---------------------------------------------------------------------------
class TestVescState:
def test_defaults(self):
s = VescState(can_id=61)
assert s.rpm == 0
assert s.current_a == pytest.approx(0.0)
assert s.voltage_v == pytest.approx(0.0)
assert s.fault_code == FAULT_CODE_NONE
assert s.has_fault is False
def test_is_alive_fresh(self):
s = VescState(can_id=61)
s.last_status_ts = time.monotonic()
assert s.is_alive is True
def test_is_alive_stale(self):
s = VescState(can_id=61)
s.last_status_ts = time.monotonic() - 2.0 # 2 s ago > 1 s timeout
assert s.is_alive is False
def test_is_alive_never_received(self):
s = VescState(can_id=61)
# last_status_ts defaults to 0.0 — monotonic() will be >> 1 s
assert s.is_alive is False
def test_has_fault_true(self):
s = VescState(can_id=61)
s.fault_code = FAULT_CODE_ABS_OVER_CURRENT
assert s.has_fault is True
def test_has_fault_false_on_none(self):
s = VescState(can_id=61)
s.fault_code = FAULT_CODE_NONE
assert s.has_fault is False
def test_fault_name_known(self):
s = VescState(can_id=61)
s.fault_code = FAULT_CODE_OVER_TEMP_FET
assert s.fault_name == "OVER_TEMP_FET"
def test_fault_name_unknown(self):
s = VescState(can_id=61)
s.fault_code = 99
assert "UNKNOWN" in s.fault_name
def test_fault_name_none(self):
s = VescState(can_id=61)
assert s.fault_name == "NONE"
# ---------------------------------------------------------------------------
# FAULT_NAMES completeness
# ---------------------------------------------------------------------------
class TestFaultNames:
def test_all_common_codes_named(self):
for code in (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10):
assert code in FAULT_NAMES, f"fault code {code} missing from FAULT_NAMES"
def test_none_is_zero(self):
assert FAULT_NAMES[0] == "NONE"
def test_overcurrent_code(self):
assert FAULT_NAMES[FAULT_CODE_ABS_OVER_CURRENT] == "ABS_OVER_CURRENT"
if __name__ == "__main__":
pytest.main([__file__, "-v"])