From a11722e8727183b2694ceaaee39d6ee44e2d7277 Mon Sep 17 00:00:00 2001 From: sl-controls Date: Wed, 4 Mar 2026 07:05:46 -0500 Subject: [PATCH] feat: Implement VESC UART driver node (Issue #383) ROS2 driver for Flipsky FSESC 4.20 Plus (VESC dual ESC) motor control. Replaces hoverboard ESC communication with pyvesc library. Features: - UART serial communication (configurable port/baud) - Dual command modes: duty_cycle (-100 to 100) and RPM setpoint - Telemetry publishing: voltage, current, RPM, temperature, fault codes - Command timeout: auto-zero throttle if no cmd_vel received - Heartbeat-based connection management - Comprehensive error handling and logging Topics: - Subscribe: /cmd_vel (geometry_msgs/Twist) - Publish: /vesc/state (JSON telemetry) - Publish: /vesc/raw_telemetry (debug) Launch: ros2 launch saltybot_vesc_driver vesc_driver.launch.py Config: config/vesc_params.yaml Next phase: Integrate with cmd_vel_mux + safety layer. Co-Authored-By: Claude Haiku 4.5 --- .../config/vesc_params.yaml | 20 ++ .../launch/vesc_driver.launch.py | 36 +++ .../src/saltybot_vesc_driver/package.xml | 27 +++ .../resource/saltybot_vesc_driver | 0 .../saltybot_vesc_driver/__init__.py | 1 + .../saltybot_vesc_driver/vesc_driver_node.py | 222 ++++++++++++++++++ .../src/saltybot_vesc_driver/setup.cfg | 5 + .../ros2_ws/src/saltybot_vesc_driver/setup.py | 27 +++ .../src/saltybot_vesc_driver/test/__init__.py | 0 .../test/test_vesc_driver.py | 43 ++++ 10 files changed, 381 insertions(+) create mode 100644 jetson/ros2_ws/src/saltybot_vesc_driver/config/vesc_params.yaml create mode 100644 jetson/ros2_ws/src/saltybot_vesc_driver/launch/vesc_driver.launch.py create mode 100644 jetson/ros2_ws/src/saltybot_vesc_driver/package.xml create mode 100644 jetson/ros2_ws/src/saltybot_vesc_driver/resource/saltybot_vesc_driver create mode 100644 jetson/ros2_ws/src/saltybot_vesc_driver/saltybot_vesc_driver/__init__.py create mode 100644 jetson/ros2_ws/src/saltybot_vesc_driver/saltybot_vesc_driver/vesc_driver_node.py create mode 100644 jetson/ros2_ws/src/saltybot_vesc_driver/setup.cfg create mode 100644 jetson/ros2_ws/src/saltybot_vesc_driver/setup.py create mode 100644 jetson/ros2_ws/src/saltybot_vesc_driver/test/__init__.py create mode 100644 jetson/ros2_ws/src/saltybot_vesc_driver/test/test_vesc_driver.py diff --git a/jetson/ros2_ws/src/saltybot_vesc_driver/config/vesc_params.yaml b/jetson/ros2_ws/src/saltybot_vesc_driver/config/vesc_params.yaml new file mode 100644 index 0000000..7eca4da --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_vesc_driver/config/vesc_params.yaml @@ -0,0 +1,20 @@ +# VESC Driver Configuration for SaltyBot +# Flipsky FSESC 4.20 Plus (VESC dual ESC) motor control + +vesc_driver: + ros__parameters: + # Serial communication + port: "/dev/ttyUSB0" + baudrate: 115200 + + # Command mode: "duty_cycle" or "rpm" + # duty_cycle: Direct motor duty cycle (-100 to 100) + # rpm: RPM setpoint mode (closed-loop speed control) + command_mode: "duty_cycle" + + # Motor limits + max_rpm: 60000 + max_current_a: 50.0 + + # Command timeout: If no cmd_vel received for this duration, motor stops + timeout_s: 1.0 diff --git a/jetson/ros2_ws/src/saltybot_vesc_driver/launch/vesc_driver.launch.py b/jetson/ros2_ws/src/saltybot_vesc_driver/launch/vesc_driver.launch.py new file mode 100644 index 0000000..2b54e81 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_vesc_driver/launch/vesc_driver.launch.py @@ -0,0 +1,36 @@ +"""Launch file for VESC driver node.""" + +from launch import LaunchDescription +from launch_ros.actions import Node +from launch.substitutions import LaunchConfiguration +from launch.actions import DeclareLaunchArgument +import os +from ament_index_python.packages import get_package_share_directory + + +def generate_launch_description(): + """Generate launch description for VESC driver.""" + pkg_dir = get_package_share_directory("saltybot_vesc_driver") + config_file = os.path.join(pkg_dir, "config", "vesc_params.yaml") + + return LaunchDescription( + [ + DeclareLaunchArgument( + "config_file", + default_value=config_file, + description="Path to configuration YAML file", + ), + DeclareLaunchArgument( + "port", + default_value="/dev/ttyUSB0", + description="Serial port for VESC", + ), + Node( + package="saltybot_vesc_driver", + executable="vesc_driver_node", + name="vesc_driver", + output="screen", + parameters=[LaunchConfiguration("config_file")], + ), + ] + ) diff --git a/jetson/ros2_ws/src/saltybot_vesc_driver/package.xml b/jetson/ros2_ws/src/saltybot_vesc_driver/package.xml new file mode 100644 index 0000000..1facb4c --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_vesc_driver/package.xml @@ -0,0 +1,27 @@ + + + + saltybot_vesc_driver + 0.1.0 + + VESC (Flipsky FSESC 4.20 Plus) UART driver for SaltyBot. + Handles motor control via pyvesc library and telemetry feedback. + + sl-controls + MIT + + rclpy + geometry_msgs + std_msgs + + ament_python + + ament_copyright + ament_flake8 + ament_pep257 + python3-pytest + + + ament_python + + diff --git a/jetson/ros2_ws/src/saltybot_vesc_driver/resource/saltybot_vesc_driver b/jetson/ros2_ws/src/saltybot_vesc_driver/resource/saltybot_vesc_driver new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_vesc_driver/saltybot_vesc_driver/__init__.py b/jetson/ros2_ws/src/saltybot_vesc_driver/saltybot_vesc_driver/__init__.py new file mode 100644 index 0000000..efb9c85 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_vesc_driver/saltybot_vesc_driver/__init__.py @@ -0,0 +1 @@ +"""SaltyBot VESC driver package.""" diff --git a/jetson/ros2_ws/src/saltybot_vesc_driver/saltybot_vesc_driver/vesc_driver_node.py b/jetson/ros2_ws/src/saltybot_vesc_driver/saltybot_vesc_driver/vesc_driver_node.py new file mode 100644 index 0000000..b535d92 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_vesc_driver/saltybot_vesc_driver/vesc_driver_node.py @@ -0,0 +1,222 @@ +#!/usr/bin/env python3 +"""VESC UART driver node for SaltyBot. + +Uses pyvesc library to communicate with Flipsky FSESC 4.20 Plus (VESC dual ESC). +Subscribes to velocity commands and publishes motor telemetry. + +Subscribed topics: + /cmd_vel (geometry_msgs/Twist) - Velocity command (linear.x = m/s, angular.z = rad/s) + +Published topics: + /vesc/state (VESCState) - Motor telemetry (voltage, current, RPM, temperature, fault) + /vesc/raw_telemetry (String) - Raw telemetry JSON for debugging +""" + +import json +import threading +import time +from enum import Enum +from typing import Optional + +import rclpy +from rclpy.node import Node +from geometry_msgs.msg import Twist +from std_msgs.msg import String +import serial + +try: + import pyvesc +except ImportError: + pyvesc = None + + +class VESCCommandMode(Enum): + """VESC command modes.""" + DUTY_CYCLE = "duty_cycle" # Direct duty cycle (-100 to 100) + RPM = "rpm" # RPM setpoint + CURRENT = "current" # Current (A) + BRAKE_CURRENT = "brake" # Brake current (A) + + +class VESCState: + """VESC telemetry state.""" + + def __init__(self): + self.voltage_v = 0.0 + self.current_a = 0.0 + self.rpm = 0 + self.temperature_c = 0.0 + self.fault_code = 0 + self.timestamp = time.time() + + +class VESCDriverNode(Node): + """ROS2 node for VESC motor control and telemetry.""" + + def __init__(self): + super().__init__("vesc_driver") + + # Parameters + self.declare_parameter("port", "/dev/ttyUSB0") + self.declare_parameter("baudrate", 115200) + self.declare_parameter("command_mode", "duty_cycle") # or "rpm" + self.declare_parameter("max_rpm", 60000) + self.declare_parameter("max_current_a", 50.0) + self.declare_parameter("timeout_s", 1.0) + + self.port = self.get_parameter("port").value + self.baudrate = self.get_parameter("baudrate").value + self.command_mode = self.get_parameter("command_mode").value + self.max_rpm = self.get_parameter("max_rpm").value + self.max_current_a = self.get_parameter("max_current_a").value + self.timeout_s = self.get_parameter("timeout_s").value + + # Serial connection + self.serial = None + self.vesc = None + self.last_cmd_time = 0 + self.state = VESCState() + + # Subscriptions + self.create_subscription(Twist, "/cmd_vel", self._on_cmd_vel, 10) + + # Publishers + self.pub_state = self.create_publisher(String, "/vesc/state", 10) + self.pub_telemetry = self.create_publisher(String, "/vesc/raw_telemetry", 10) + + # Timer for telemetry polling (100 Hz) + self.create_timer(0.01, self._poll_telemetry) + + # Initialize VESC connection + self._init_vesc() + + self.get_logger().info( + f"VESC driver initialized: port={self.port}, baud={self.baudrate}, " + f"mode={self.command_mode}, timeout={self.timeout_s}s" + ) + + def _init_vesc(self) -> bool: + """Initialize serial connection to VESC.""" + try: + if pyvesc is None: + self.get_logger().error("pyvesc not installed. Install with: pip install pyvesc") + return False + + self.serial = serial.Serial( + port=self.port, + baudrate=self.baudrate, + timeout=0.1, + ) + self.vesc = pyvesc.VescUart( + serial_port=self.serial, + has_sensor=False, # No wheel speed sensor + start_heartbeat=True, + ) + + self.get_logger().info(f"Connected to VESC on {self.port} @ {self.baudrate} baud") + return True + + except (serial.SerialException, Exception) as e: + self.get_logger().error(f"Failed to initialize VESC: {e}") + return False + + def _on_cmd_vel(self, msg: Twist) -> None: + """Handle velocity command from /cmd_vel.""" + if self.vesc is None: + return + + try: + # Extract linear velocity (m/s) and angular velocity (rad/s) + linear_v = msg.linear.x + angular_v = msg.angular.z + + # Convert velocity to VESC command based on mode + if self.command_mode == "duty_cycle": + # Map velocity to duty cycle (-100 to 100) + # Assuming max velocity ~5 m/s maps to ~100% duty + duty = max(-100, min(100, (linear_v / 5.0) * 100)) + self.vesc.set_duty(duty / 100.0) # pyvesc expects 0-1 or -1-0 + + elif self.command_mode == "rpm": + # Map velocity to RPM + # Assuming max velocity 5 m/s → max_rpm + rpm = max(-self.max_rpm, min(self.max_rpm, (linear_v / 5.0) * self.max_rpm)) + self.vesc.set_rpm(int(rpm)) + + # Angular velocity could control steering if dual motors are used differently + # For now, it's ignored (single dual-motor ESC) + + self.last_cmd_time = time.time() + + except Exception as e: + self.get_logger().error(f"Error sending command to VESC: {e}") + + def _poll_telemetry(self) -> None: + """Poll VESC for telemetry and publish state.""" + if self.vesc is None: + return + + try: + # Read telemetry from VESC + if self.vesc.read_values(): + # Update state from VESC data + self.state.voltage_v = self.vesc.data.v_in + self.state.current_a = self.vesc.data.current_in + self.state.rpm = self.vesc.data.rpm + self.state.temperature_c = self.vesc.data.temp_fet + self.state.fault_code = self.vesc.data.fault_code + self.state.timestamp = time.time() + + # Check for timeout + if time.time() - self.last_cmd_time > self.timeout_s: + # No recent command, send zero + if self.command_mode == "duty_cycle": + self.vesc.set_duty(0.0) + else: + self.vesc.set_rpm(0) + + # Publish state + self._publish_state() + else: + self.get_logger().warn("Failed to read VESC telemetry") + + except Exception as e: + self.get_logger().error(f"Error polling VESC: {e}") + + def _publish_state(self) -> None: + """Publish VESC state as JSON.""" + state_dict = { + "voltage_v": round(self.state.voltage_v, 2), + "current_a": round(self.state.current_a, 2), + "rpm": self.state.rpm, + "temperature_c": round(self.state.temperature_c, 1), + "fault_code": self.state.fault_code, + "timestamp": self.state.timestamp, + } + + msg = String(data=json.dumps(state_dict)) + self.pub_state.publish(msg) + self.pub_telemetry.publish(msg) + + # Log fault codes + if self.state.fault_code != 0: + self.get_logger().warn(f"VESC fault code: {self.state.fault_code}") + + +def main(args=None): + rclpy.init(args=args) + node = VESCDriverNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + if node.vesc and node.serial: + node.vesc.set_duty(0.0) # Zero throttle on shutdown + node.serial.close() + node.destroy_node() + rclpy.shutdown() + + +if __name__ == "__main__": + main() diff --git a/jetson/ros2_ws/src/saltybot_vesc_driver/setup.cfg b/jetson/ros2_ws/src/saltybot_vesc_driver/setup.cfg new file mode 100644 index 0000000..c5c5e27 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_vesc_driver/setup.cfg @@ -0,0 +1,5 @@ +[develop] +script_dir=$base/lib/saltybot_vesc_driver + +[install] +install_scripts=$base/lib/saltybot_vesc_driver diff --git a/jetson/ros2_ws/src/saltybot_vesc_driver/setup.py b/jetson/ros2_ws/src/saltybot_vesc_driver/setup.py new file mode 100644 index 0000000..8f2acd0 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_vesc_driver/setup.py @@ -0,0 +1,27 @@ +from setuptools import setup + +package_name = "saltybot_vesc_driver" + +setup( + name=package_name, + version="0.1.0", + packages=[package_name], + data_files=[ + ("share/ament_index/resource_index/packages", [f"resource/{package_name}"]), + (f"share/{package_name}", ["package.xml"]), + (f"share/{package_name}/launch", ["launch/vesc_driver.launch.py"]), + (f"share/{package_name}/config", ["config/vesc_params.yaml"]), + ], + install_requires=["setuptools", "pyvesc"], + zip_safe=True, + maintainer="sl-controls", + maintainer_email="sl-controls@saltylab.local", + description="VESC UART driver for motor control and telemetry", + license="MIT", + tests_require=["pytest"], + entry_points={ + "console_scripts": [ + "vesc_driver_node = saltybot_vesc_driver.vesc_driver_node:main", + ], + }, +) diff --git a/jetson/ros2_ws/src/saltybot_vesc_driver/test/__init__.py b/jetson/ros2_ws/src/saltybot_vesc_driver/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_vesc_driver/test/test_vesc_driver.py b/jetson/ros2_ws/src/saltybot_vesc_driver/test/test_vesc_driver.py new file mode 100644 index 0000000..bf5946e --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_vesc_driver/test/test_vesc_driver.py @@ -0,0 +1,43 @@ +"""Unit tests for VESC driver node.""" + +import pytest + + +def test_vesc_driver_init(): + """Test VESC driver node initialization.""" + import rclpy + + rclpy.init(allow_reuse=True) + try: + from saltybot_vesc_driver.vesc_driver_node import VESCDriverNode + + node = VESCDriverNode() + assert node is not None + node.destroy_node() + except Exception as e: + # pyvesc may not be installed in test env + assert "pyvesc" in str(e) or "serial" in str(e) + + +def test_vesc_state(): + """Test VESC state object.""" + from saltybot_vesc_driver.vesc_driver_node import VESCState + + state = VESCState() + assert state.voltage_v == 0.0 + assert state.current_a == 0.0 + assert state.rpm == 0 + assert state.fault_code == 0 + + +def test_vesc_command_mode(): + """Test VESC command mode enum.""" + from saltybot_vesc_driver.vesc_driver_node import VESCCommandMode + + assert VESCCommandMode.DUTY_CYCLE.value == "duty_cycle" + assert VESCCommandMode.RPM.value == "rpm" + assert VESCCommandMode.CURRENT.value == "current" + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])