feat: Implement VESC UART driver node (Issue #383)

ROS2 driver for Flipsky FSESC 4.20 Plus (VESC dual ESC) motor control.
Replaces hoverboard ESC communication with pyvesc library.

Features:
- UART serial communication (configurable port/baud)
- Dual command modes: duty_cycle (-100 to 100) and RPM setpoint
- Telemetry publishing: voltage, current, RPM, temperature, fault codes
- Command timeout: auto-zero throttle if no cmd_vel received
- Heartbeat-based connection management
- Comprehensive error handling and logging

Topics:
- Subscribe: /cmd_vel (geometry_msgs/Twist)
- Publish: /vesc/state (JSON telemetry)
- Publish: /vesc/raw_telemetry (debug)

Launch: ros2 launch saltybot_vesc_driver vesc_driver.launch.py
Config: config/vesc_params.yaml

Next phase: Integrate with cmd_vel_mux + safety layer.

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
sl-controls 2026-03-04 07:05:46 -05:00
parent bc3ed1a0c7
commit a11722e872
10 changed files with 381 additions and 0 deletions

View File

@ -0,0 +1,20 @@
# VESC Driver Configuration for SaltyBot
# Flipsky FSESC 4.20 Plus (VESC dual ESC) motor control
vesc_driver:
ros__parameters:
# Serial communication
port: "/dev/ttyUSB0"
baudrate: 115200
# Command mode: "duty_cycle" or "rpm"
# duty_cycle: Direct motor duty cycle (-100 to 100)
# rpm: RPM setpoint mode (closed-loop speed control)
command_mode: "duty_cycle"
# Motor limits
max_rpm: 60000
max_current_a: 50.0
# Command timeout: If no cmd_vel received for this duration, motor stops
timeout_s: 1.0

View File

@ -0,0 +1,36 @@
"""Launch file for VESC driver node."""
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.substitutions import LaunchConfiguration
from launch.actions import DeclareLaunchArgument
import os
from ament_index_python.packages import get_package_share_directory
def generate_launch_description():
"""Generate launch description for VESC driver."""
pkg_dir = get_package_share_directory("saltybot_vesc_driver")
config_file = os.path.join(pkg_dir, "config", "vesc_params.yaml")
return LaunchDescription(
[
DeclareLaunchArgument(
"config_file",
default_value=config_file,
description="Path to configuration YAML file",
),
DeclareLaunchArgument(
"port",
default_value="/dev/ttyUSB0",
description="Serial port for VESC",
),
Node(
package="saltybot_vesc_driver",
executable="vesc_driver_node",
name="vesc_driver",
output="screen",
parameters=[LaunchConfiguration("config_file")],
),
]
)

View File

@ -0,0 +1,27 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_vesc_driver</name>
<version>0.1.0</version>
<description>
VESC (Flipsky FSESC 4.20 Plus) UART driver for SaltyBot.
Handles motor control via pyvesc library and telemetry feedback.
</description>
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>geometry_msgs</depend>
<depend>std_msgs</depend>
<buildtool_depend>ament_python</buildtool_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1 @@
"""SaltyBot VESC driver package."""

View File

@ -0,0 +1,222 @@
#!/usr/bin/env python3
"""VESC UART driver node for SaltyBot.
Uses pyvesc library to communicate with Flipsky FSESC 4.20 Plus (VESC dual ESC).
Subscribes to velocity commands and publishes motor telemetry.
Subscribed topics:
/cmd_vel (geometry_msgs/Twist) - Velocity command (linear.x = m/s, angular.z = rad/s)
Published topics:
/vesc/state (VESCState) - Motor telemetry (voltage, current, RPM, temperature, fault)
/vesc/raw_telemetry (String) - Raw telemetry JSON for debugging
"""
import json
import threading
import time
from enum import Enum
from typing import Optional
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
from std_msgs.msg import String
import serial
try:
import pyvesc
except ImportError:
pyvesc = None
class VESCCommandMode(Enum):
"""VESC command modes."""
DUTY_CYCLE = "duty_cycle" # Direct duty cycle (-100 to 100)
RPM = "rpm" # RPM setpoint
CURRENT = "current" # Current (A)
BRAKE_CURRENT = "brake" # Brake current (A)
class VESCState:
"""VESC telemetry state."""
def __init__(self):
self.voltage_v = 0.0
self.current_a = 0.0
self.rpm = 0
self.temperature_c = 0.0
self.fault_code = 0
self.timestamp = time.time()
class VESCDriverNode(Node):
"""ROS2 node for VESC motor control and telemetry."""
def __init__(self):
super().__init__("vesc_driver")
# Parameters
self.declare_parameter("port", "/dev/ttyUSB0")
self.declare_parameter("baudrate", 115200)
self.declare_parameter("command_mode", "duty_cycle") # or "rpm"
self.declare_parameter("max_rpm", 60000)
self.declare_parameter("max_current_a", 50.0)
self.declare_parameter("timeout_s", 1.0)
self.port = self.get_parameter("port").value
self.baudrate = self.get_parameter("baudrate").value
self.command_mode = self.get_parameter("command_mode").value
self.max_rpm = self.get_parameter("max_rpm").value
self.max_current_a = self.get_parameter("max_current_a").value
self.timeout_s = self.get_parameter("timeout_s").value
# Serial connection
self.serial = None
self.vesc = None
self.last_cmd_time = 0
self.state = VESCState()
# Subscriptions
self.create_subscription(Twist, "/cmd_vel", self._on_cmd_vel, 10)
# Publishers
self.pub_state = self.create_publisher(String, "/vesc/state", 10)
self.pub_telemetry = self.create_publisher(String, "/vesc/raw_telemetry", 10)
# Timer for telemetry polling (100 Hz)
self.create_timer(0.01, self._poll_telemetry)
# Initialize VESC connection
self._init_vesc()
self.get_logger().info(
f"VESC driver initialized: port={self.port}, baud={self.baudrate}, "
f"mode={self.command_mode}, timeout={self.timeout_s}s"
)
def _init_vesc(self) -> bool:
"""Initialize serial connection to VESC."""
try:
if pyvesc is None:
self.get_logger().error("pyvesc not installed. Install with: pip install pyvesc")
return False
self.serial = serial.Serial(
port=self.port,
baudrate=self.baudrate,
timeout=0.1,
)
self.vesc = pyvesc.VescUart(
serial_port=self.serial,
has_sensor=False, # No wheel speed sensor
start_heartbeat=True,
)
self.get_logger().info(f"Connected to VESC on {self.port} @ {self.baudrate} baud")
return True
except (serial.SerialException, Exception) as e:
self.get_logger().error(f"Failed to initialize VESC: {e}")
return False
def _on_cmd_vel(self, msg: Twist) -> None:
"""Handle velocity command from /cmd_vel."""
if self.vesc is None:
return
try:
# Extract linear velocity (m/s) and angular velocity (rad/s)
linear_v = msg.linear.x
angular_v = msg.angular.z
# Convert velocity to VESC command based on mode
if self.command_mode == "duty_cycle":
# Map velocity to duty cycle (-100 to 100)
# Assuming max velocity ~5 m/s maps to ~100% duty
duty = max(-100, min(100, (linear_v / 5.0) * 100))
self.vesc.set_duty(duty / 100.0) # pyvesc expects 0-1 or -1-0
elif self.command_mode == "rpm":
# Map velocity to RPM
# Assuming max velocity 5 m/s → max_rpm
rpm = max(-self.max_rpm, min(self.max_rpm, (linear_v / 5.0) * self.max_rpm))
self.vesc.set_rpm(int(rpm))
# Angular velocity could control steering if dual motors are used differently
# For now, it's ignored (single dual-motor ESC)
self.last_cmd_time = time.time()
except Exception as e:
self.get_logger().error(f"Error sending command to VESC: {e}")
def _poll_telemetry(self) -> None:
"""Poll VESC for telemetry and publish state."""
if self.vesc is None:
return
try:
# Read telemetry from VESC
if self.vesc.read_values():
# Update state from VESC data
self.state.voltage_v = self.vesc.data.v_in
self.state.current_a = self.vesc.data.current_in
self.state.rpm = self.vesc.data.rpm
self.state.temperature_c = self.vesc.data.temp_fet
self.state.fault_code = self.vesc.data.fault_code
self.state.timestamp = time.time()
# Check for timeout
if time.time() - self.last_cmd_time > self.timeout_s:
# No recent command, send zero
if self.command_mode == "duty_cycle":
self.vesc.set_duty(0.0)
else:
self.vesc.set_rpm(0)
# Publish state
self._publish_state()
else:
self.get_logger().warn("Failed to read VESC telemetry")
except Exception as e:
self.get_logger().error(f"Error polling VESC: {e}")
def _publish_state(self) -> None:
"""Publish VESC state as JSON."""
state_dict = {
"voltage_v": round(self.state.voltage_v, 2),
"current_a": round(self.state.current_a, 2),
"rpm": self.state.rpm,
"temperature_c": round(self.state.temperature_c, 1),
"fault_code": self.state.fault_code,
"timestamp": self.state.timestamp,
}
msg = String(data=json.dumps(state_dict))
self.pub_state.publish(msg)
self.pub_telemetry.publish(msg)
# Log fault codes
if self.state.fault_code != 0:
self.get_logger().warn(f"VESC fault code: {self.state.fault_code}")
def main(args=None):
rclpy.init(args=args)
node = VESCDriverNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
if node.vesc and node.serial:
node.vesc.set_duty(0.0) # Zero throttle on shutdown
node.serial.close()
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,5 @@
[develop]
script_dir=$base/lib/saltybot_vesc_driver
[install]
install_scripts=$base/lib/saltybot_vesc_driver

View File

@ -0,0 +1,27 @@
from setuptools import setup
package_name = "saltybot_vesc_driver"
setup(
name=package_name,
version="0.1.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(f"share/{package_name}/launch", ["launch/vesc_driver.launch.py"]),
(f"share/{package_name}/config", ["config/vesc_params.yaml"]),
],
install_requires=["setuptools", "pyvesc"],
zip_safe=True,
maintainer="sl-controls",
maintainer_email="sl-controls@saltylab.local",
description="VESC UART driver for motor control and telemetry",
license="MIT",
tests_require=["pytest"],
entry_points={
"console_scripts": [
"vesc_driver_node = saltybot_vesc_driver.vesc_driver_node:main",
],
},
)

View File

@ -0,0 +1,43 @@
"""Unit tests for VESC driver node."""
import pytest
def test_vesc_driver_init():
"""Test VESC driver node initialization."""
import rclpy
rclpy.init(allow_reuse=True)
try:
from saltybot_vesc_driver.vesc_driver_node import VESCDriverNode
node = VESCDriverNode()
assert node is not None
node.destroy_node()
except Exception as e:
# pyvesc may not be installed in test env
assert "pyvesc" in str(e) or "serial" in str(e)
def test_vesc_state():
"""Test VESC state object."""
from saltybot_vesc_driver.vesc_driver_node import VESCState
state = VESCState()
assert state.voltage_v == 0.0
assert state.current_a == 0.0
assert state.rpm == 0
assert state.fault_code == 0
def test_vesc_command_mode():
"""Test VESC command mode enum."""
from saltybot_vesc_driver.vesc_driver_node import VESCCommandMode
assert VESCCommandMode.DUTY_CYCLE.value == "duty_cycle"
assert VESCCommandMode.RPM.value == "rpm"
assert VESCCommandMode.CURRENT.value == "current"
if __name__ == "__main__":
pytest.main([__file__, "-v"])