feat: Implement VESC UART driver node (Issue #383)
ROS2 driver for Flipsky FSESC 4.20 Plus (VESC dual ESC) motor control. Replaces hoverboard ESC communication with pyvesc library. Features: - UART serial communication (configurable port/baud) - Dual command modes: duty_cycle (-100 to 100) and RPM setpoint - Telemetry publishing: voltage, current, RPM, temperature, fault codes - Command timeout: auto-zero throttle if no cmd_vel received - Heartbeat-based connection management - Comprehensive error handling and logging Topics: - Subscribe: /cmd_vel (geometry_msgs/Twist) - Publish: /vesc/state (JSON telemetry) - Publish: /vesc/raw_telemetry (debug) Launch: ros2 launch saltybot_vesc_driver vesc_driver.launch.py Config: config/vesc_params.yaml Next phase: Integrate with cmd_vel_mux + safety layer. Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
55915ed737
commit
87bb2999c2
@ -0,0 +1,20 @@
|
||||
# VESC Driver Configuration for SaltyBot
|
||||
# Flipsky FSESC 4.20 Plus (VESC dual ESC) motor control
|
||||
|
||||
vesc_driver:
|
||||
ros__parameters:
|
||||
# Serial communication
|
||||
port: "/dev/ttyUSB0"
|
||||
baudrate: 115200
|
||||
|
||||
# Command mode: "duty_cycle" or "rpm"
|
||||
# duty_cycle: Direct motor duty cycle (-100 to 100)
|
||||
# rpm: RPM setpoint mode (closed-loop speed control)
|
||||
command_mode: "duty_cycle"
|
||||
|
||||
# Motor limits
|
||||
max_rpm: 60000
|
||||
max_current_a: 50.0
|
||||
|
||||
# Command timeout: If no cmd_vel received for this duration, motor stops
|
||||
timeout_s: 1.0
|
||||
@ -0,0 +1,36 @@
|
||||
"""Launch file for VESC driver node."""
|
||||
|
||||
from launch import LaunchDescription
|
||||
from launch_ros.actions import Node
|
||||
from launch.substitutions import LaunchConfiguration
|
||||
from launch.actions import DeclareLaunchArgument
|
||||
import os
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
|
||||
|
||||
def generate_launch_description():
|
||||
"""Generate launch description for VESC driver."""
|
||||
pkg_dir = get_package_share_directory("saltybot_vesc_driver")
|
||||
config_file = os.path.join(pkg_dir, "config", "vesc_params.yaml")
|
||||
|
||||
return LaunchDescription(
|
||||
[
|
||||
DeclareLaunchArgument(
|
||||
"config_file",
|
||||
default_value=config_file,
|
||||
description="Path to configuration YAML file",
|
||||
),
|
||||
DeclareLaunchArgument(
|
||||
"port",
|
||||
default_value="/dev/ttyUSB0",
|
||||
description="Serial port for VESC",
|
||||
),
|
||||
Node(
|
||||
package="saltybot_vesc_driver",
|
||||
executable="vesc_driver_node",
|
||||
name="vesc_driver",
|
||||
output="screen",
|
||||
parameters=[LaunchConfiguration("config_file")],
|
||||
),
|
||||
]
|
||||
)
|
||||
27
jetson/ros2_ws/src/saltybot_vesc_driver/package.xml
Normal file
27
jetson/ros2_ws/src/saltybot_vesc_driver/package.xml
Normal file
@ -0,0 +1,27 @@
|
||||
<?xml version="1.0"?>
|
||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="3">
|
||||
<name>saltybot_vesc_driver</name>
|
||||
<version>0.1.0</version>
|
||||
<description>
|
||||
VESC (Flipsky FSESC 4.20 Plus) UART driver for SaltyBot.
|
||||
Handles motor control via pyvesc library and telemetry feedback.
|
||||
</description>
|
||||
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
|
||||
<license>MIT</license>
|
||||
|
||||
<depend>rclpy</depend>
|
||||
<depend>geometry_msgs</depend>
|
||||
<depend>std_msgs</depend>
|
||||
|
||||
<buildtool_depend>ament_python</buildtool_depend>
|
||||
|
||||
<test_depend>ament_copyright</test_depend>
|
||||
<test_depend>ament_flake8</test_depend>
|
||||
<test_depend>ament_pep257</test_depend>
|
||||
<test_depend>python3-pytest</test_depend>
|
||||
|
||||
<export>
|
||||
<build_type>ament_python</build_type>
|
||||
</export>
|
||||
</package>
|
||||
@ -0,0 +1 @@
|
||||
"""SaltyBot VESC driver package."""
|
||||
@ -0,0 +1,222 @@
|
||||
#!/usr/bin/env python3
|
||||
"""VESC UART driver node for SaltyBot.
|
||||
|
||||
Uses pyvesc library to communicate with Flipsky FSESC 4.20 Plus (VESC dual ESC).
|
||||
Subscribes to velocity commands and publishes motor telemetry.
|
||||
|
||||
Subscribed topics:
|
||||
/cmd_vel (geometry_msgs/Twist) - Velocity command (linear.x = m/s, angular.z = rad/s)
|
||||
|
||||
Published topics:
|
||||
/vesc/state (VESCState) - Motor telemetry (voltage, current, RPM, temperature, fault)
|
||||
/vesc/raw_telemetry (String) - Raw telemetry JSON for debugging
|
||||
"""
|
||||
|
||||
import json
|
||||
import threading
|
||||
import time
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from geometry_msgs.msg import Twist
|
||||
from std_msgs.msg import String
|
||||
import serial
|
||||
|
||||
try:
|
||||
import pyvesc
|
||||
except ImportError:
|
||||
pyvesc = None
|
||||
|
||||
|
||||
class VESCCommandMode(Enum):
|
||||
"""VESC command modes."""
|
||||
DUTY_CYCLE = "duty_cycle" # Direct duty cycle (-100 to 100)
|
||||
RPM = "rpm" # RPM setpoint
|
||||
CURRENT = "current" # Current (A)
|
||||
BRAKE_CURRENT = "brake" # Brake current (A)
|
||||
|
||||
|
||||
class VESCState:
|
||||
"""VESC telemetry state."""
|
||||
|
||||
def __init__(self):
|
||||
self.voltage_v = 0.0
|
||||
self.current_a = 0.0
|
||||
self.rpm = 0
|
||||
self.temperature_c = 0.0
|
||||
self.fault_code = 0
|
||||
self.timestamp = time.time()
|
||||
|
||||
|
||||
class VESCDriverNode(Node):
|
||||
"""ROS2 node for VESC motor control and telemetry."""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__("vesc_driver")
|
||||
|
||||
# Parameters
|
||||
self.declare_parameter("port", "/dev/ttyUSB0")
|
||||
self.declare_parameter("baudrate", 115200)
|
||||
self.declare_parameter("command_mode", "duty_cycle") # or "rpm"
|
||||
self.declare_parameter("max_rpm", 60000)
|
||||
self.declare_parameter("max_current_a", 50.0)
|
||||
self.declare_parameter("timeout_s", 1.0)
|
||||
|
||||
self.port = self.get_parameter("port").value
|
||||
self.baudrate = self.get_parameter("baudrate").value
|
||||
self.command_mode = self.get_parameter("command_mode").value
|
||||
self.max_rpm = self.get_parameter("max_rpm").value
|
||||
self.max_current_a = self.get_parameter("max_current_a").value
|
||||
self.timeout_s = self.get_parameter("timeout_s").value
|
||||
|
||||
# Serial connection
|
||||
self.serial = None
|
||||
self.vesc = None
|
||||
self.last_cmd_time = 0
|
||||
self.state = VESCState()
|
||||
|
||||
# Subscriptions
|
||||
self.create_subscription(Twist, "/cmd_vel", self._on_cmd_vel, 10)
|
||||
|
||||
# Publishers
|
||||
self.pub_state = self.create_publisher(String, "/vesc/state", 10)
|
||||
self.pub_telemetry = self.create_publisher(String, "/vesc/raw_telemetry", 10)
|
||||
|
||||
# Timer for telemetry polling (100 Hz)
|
||||
self.create_timer(0.01, self._poll_telemetry)
|
||||
|
||||
# Initialize VESC connection
|
||||
self._init_vesc()
|
||||
|
||||
self.get_logger().info(
|
||||
f"VESC driver initialized: port={self.port}, baud={self.baudrate}, "
|
||||
f"mode={self.command_mode}, timeout={self.timeout_s}s"
|
||||
)
|
||||
|
||||
def _init_vesc(self) -> bool:
|
||||
"""Initialize serial connection to VESC."""
|
||||
try:
|
||||
if pyvesc is None:
|
||||
self.get_logger().error("pyvesc not installed. Install with: pip install pyvesc")
|
||||
return False
|
||||
|
||||
self.serial = serial.Serial(
|
||||
port=self.port,
|
||||
baudrate=self.baudrate,
|
||||
timeout=0.1,
|
||||
)
|
||||
self.vesc = pyvesc.VescUart(
|
||||
serial_port=self.serial,
|
||||
has_sensor=False, # No wheel speed sensor
|
||||
start_heartbeat=True,
|
||||
)
|
||||
|
||||
self.get_logger().info(f"Connected to VESC on {self.port} @ {self.baudrate} baud")
|
||||
return True
|
||||
|
||||
except (serial.SerialException, Exception) as e:
|
||||
self.get_logger().error(f"Failed to initialize VESC: {e}")
|
||||
return False
|
||||
|
||||
def _on_cmd_vel(self, msg: Twist) -> None:
|
||||
"""Handle velocity command from /cmd_vel."""
|
||||
if self.vesc is None:
|
||||
return
|
||||
|
||||
try:
|
||||
# Extract linear velocity (m/s) and angular velocity (rad/s)
|
||||
linear_v = msg.linear.x
|
||||
angular_v = msg.angular.z
|
||||
|
||||
# Convert velocity to VESC command based on mode
|
||||
if self.command_mode == "duty_cycle":
|
||||
# Map velocity to duty cycle (-100 to 100)
|
||||
# Assuming max velocity ~5 m/s maps to ~100% duty
|
||||
duty = max(-100, min(100, (linear_v / 5.0) * 100))
|
||||
self.vesc.set_duty(duty / 100.0) # pyvesc expects 0-1 or -1-0
|
||||
|
||||
elif self.command_mode == "rpm":
|
||||
# Map velocity to RPM
|
||||
# Assuming max velocity 5 m/s → max_rpm
|
||||
rpm = max(-self.max_rpm, min(self.max_rpm, (linear_v / 5.0) * self.max_rpm))
|
||||
self.vesc.set_rpm(int(rpm))
|
||||
|
||||
# Angular velocity could control steering if dual motors are used differently
|
||||
# For now, it's ignored (single dual-motor ESC)
|
||||
|
||||
self.last_cmd_time = time.time()
|
||||
|
||||
except Exception as e:
|
||||
self.get_logger().error(f"Error sending command to VESC: {e}")
|
||||
|
||||
def _poll_telemetry(self) -> None:
|
||||
"""Poll VESC for telemetry and publish state."""
|
||||
if self.vesc is None:
|
||||
return
|
||||
|
||||
try:
|
||||
# Read telemetry from VESC
|
||||
if self.vesc.read_values():
|
||||
# Update state from VESC data
|
||||
self.state.voltage_v = self.vesc.data.v_in
|
||||
self.state.current_a = self.vesc.data.current_in
|
||||
self.state.rpm = self.vesc.data.rpm
|
||||
self.state.temperature_c = self.vesc.data.temp_fet
|
||||
self.state.fault_code = self.vesc.data.fault_code
|
||||
self.state.timestamp = time.time()
|
||||
|
||||
# Check for timeout
|
||||
if time.time() - self.last_cmd_time > self.timeout_s:
|
||||
# No recent command, send zero
|
||||
if self.command_mode == "duty_cycle":
|
||||
self.vesc.set_duty(0.0)
|
||||
else:
|
||||
self.vesc.set_rpm(0)
|
||||
|
||||
# Publish state
|
||||
self._publish_state()
|
||||
else:
|
||||
self.get_logger().warn("Failed to read VESC telemetry")
|
||||
|
||||
except Exception as e:
|
||||
self.get_logger().error(f"Error polling VESC: {e}")
|
||||
|
||||
def _publish_state(self) -> None:
|
||||
"""Publish VESC state as JSON."""
|
||||
state_dict = {
|
||||
"voltage_v": round(self.state.voltage_v, 2),
|
||||
"current_a": round(self.state.current_a, 2),
|
||||
"rpm": self.state.rpm,
|
||||
"temperature_c": round(self.state.temperature_c, 1),
|
||||
"fault_code": self.state.fault_code,
|
||||
"timestamp": self.state.timestamp,
|
||||
}
|
||||
|
||||
msg = String(data=json.dumps(state_dict))
|
||||
self.pub_state.publish(msg)
|
||||
self.pub_telemetry.publish(msg)
|
||||
|
||||
# Log fault codes
|
||||
if self.state.fault_code != 0:
|
||||
self.get_logger().warn(f"VESC fault code: {self.state.fault_code}")
|
||||
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = VESCDriverNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
if node.vesc and node.serial:
|
||||
node.vesc.set_duty(0.0) # Zero throttle on shutdown
|
||||
node.serial.close()
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
5
jetson/ros2_ws/src/saltybot_vesc_driver/setup.cfg
Normal file
5
jetson/ros2_ws/src/saltybot_vesc_driver/setup.cfg
Normal file
@ -0,0 +1,5 @@
|
||||
[develop]
|
||||
script_dir=$base/lib/saltybot_vesc_driver
|
||||
|
||||
[install]
|
||||
install_scripts=$base/lib/saltybot_vesc_driver
|
||||
27
jetson/ros2_ws/src/saltybot_vesc_driver/setup.py
Normal file
27
jetson/ros2_ws/src/saltybot_vesc_driver/setup.py
Normal file
@ -0,0 +1,27 @@
|
||||
from setuptools import setup
|
||||
|
||||
package_name = "saltybot_vesc_driver"
|
||||
|
||||
setup(
|
||||
name=package_name,
|
||||
version="0.1.0",
|
||||
packages=[package_name],
|
||||
data_files=[
|
||||
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
|
||||
(f"share/{package_name}", ["package.xml"]),
|
||||
(f"share/{package_name}/launch", ["launch/vesc_driver.launch.py"]),
|
||||
(f"share/{package_name}/config", ["config/vesc_params.yaml"]),
|
||||
],
|
||||
install_requires=["setuptools", "pyvesc"],
|
||||
zip_safe=True,
|
||||
maintainer="sl-controls",
|
||||
maintainer_email="sl-controls@saltylab.local",
|
||||
description="VESC UART driver for motor control and telemetry",
|
||||
license="MIT",
|
||||
tests_require=["pytest"],
|
||||
entry_points={
|
||||
"console_scripts": [
|
||||
"vesc_driver_node = saltybot_vesc_driver.vesc_driver_node:main",
|
||||
],
|
||||
},
|
||||
)
|
||||
@ -0,0 +1,43 @@
|
||||
"""Unit tests for VESC driver node."""
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
def test_vesc_driver_init():
|
||||
"""Test VESC driver node initialization."""
|
||||
import rclpy
|
||||
|
||||
rclpy.init(allow_reuse=True)
|
||||
try:
|
||||
from saltybot_vesc_driver.vesc_driver_node import VESCDriverNode
|
||||
|
||||
node = VESCDriverNode()
|
||||
assert node is not None
|
||||
node.destroy_node()
|
||||
except Exception as e:
|
||||
# pyvesc may not be installed in test env
|
||||
assert "pyvesc" in str(e) or "serial" in str(e)
|
||||
|
||||
|
||||
def test_vesc_state():
|
||||
"""Test VESC state object."""
|
||||
from saltybot_vesc_driver.vesc_driver_node import VESCState
|
||||
|
||||
state = VESCState()
|
||||
assert state.voltage_v == 0.0
|
||||
assert state.current_a == 0.0
|
||||
assert state.rpm == 0
|
||||
assert state.fault_code == 0
|
||||
|
||||
|
||||
def test_vesc_command_mode():
|
||||
"""Test VESC command mode enum."""
|
||||
from saltybot_vesc_driver.vesc_driver_node import VESCCommandMode
|
||||
|
||||
assert VESCCommandMode.DUTY_CYCLE.value == "duty_cycle"
|
||||
assert VESCCommandMode.RPM.value == "rpm"
|
||||
assert VESCCommandMode.CURRENT.value == "current"
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
Loading…
x
Reference in New Issue
Block a user