Merge remote-tracking branch 'origin/sl-controls/issue-644-vesc-can-driver'

This commit is contained in:
Sebastien Vayrette 2026-03-17 11:27:26 -04:00
commit bf8df6af8f
4 changed files with 107 additions and 209 deletions

View File

@ -1,20 +1,10 @@
# VESC Driver Configuration for SaltyBot
# Flipsky FSESC 4.20 Plus (VESC dual ESC) motor control
vesc_driver:
vesc_can_driver:
ros__parameters:
# Serial communication
port: "/dev/ttyUSB0"
baudrate: 115200
# Command mode: "duty_cycle" or "rpm"
# duty_cycle: Direct motor duty cycle (-100 to 100)
# rpm: RPM setpoint mode (closed-loop speed control)
command_mode: "duty_cycle"
# Motor limits
max_rpm: 60000
max_current_a: 50.0
# Command timeout: If no cmd_vel received for this duration, motor stops
timeout_s: 1.0
interface: can0
bitrate: 500000
left_motor_id: 61
right_motor_id: 79
wheel_separation: 0.5
wheel_radius: 0.1
max_speed: 5.0
command_timeout: 1.0

View File

@ -1,4 +1,4 @@
"""Launch file for VESC driver node."""
"""Launch file for VESC CAN driver node."""
from launch import LaunchDescription
from launch_ros.actions import Node
@ -9,7 +9,7 @@ from ament_index_python.packages import get_package_share_directory
def generate_launch_description():
"""Generate launch description for VESC driver."""
"""Generate launch description for VESC CAN driver."""
pkg_dir = get_package_share_directory("saltybot_vesc_driver")
config_file = os.path.join(pkg_dir, "config", "vesc_params.yaml")
@ -20,15 +20,10 @@ def generate_launch_description():
default_value=config_file,
description="Path to configuration YAML file",
),
DeclareLaunchArgument(
"port",
default_value="/dev/ttyUSB0",
description="Serial port for VESC",
),
Node(
package="saltybot_vesc_driver",
executable="vesc_driver_node",
name="vesc_driver",
name="vesc_can_driver",
output="screen",
parameters=[LaunchConfiguration("config_file")],
),

View File

@ -14,6 +14,8 @@
<depend>geometry_msgs</depend>
<depend>std_msgs</depend>
<exec_depend>python3-can</exec_depend>
<buildtool_depend>ament_python</buildtool_depend>
<test_depend>ament_copyright</test_depend>

View File

@ -1,222 +1,133 @@
#!/usr/bin/env python3
"""VESC UART driver node for SaltyBot.
"""
VESC CAN driver node using python-can with SocketCAN.
Subscribes to /cmd_vel (geometry_msgs/Twist) and sends duty cycle
commands to two VESC motor controllers via CAN bus.
Uses pyvesc library to communicate with Flipsky FSESC 4.20 Plus (VESC dual ESC).
Subscribes to velocity commands and publishes motor telemetry.
Subscribed topics:
/cmd_vel (geometry_msgs/Twist) - Velocity command (linear.x = m/s, angular.z = rad/s)
Published topics:
/vesc/state (VESCState) - Motor telemetry (voltage, current, RPM, temperature, fault)
/vesc/raw_telemetry (String) - Raw telemetry JSON for debugging
VESC CAN protocol:
Extended frame ID = (CAN_PACKET_ID << 8) | VESC_ID
CAN_PACKET_SET_DUTY = 0 -> payload: int32 (duty * 100000)
"""
import json
import threading
import struct
import time
from enum import Enum
from typing import Optional
import can
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
from std_msgs.msg import String
import serial
from rclpy.node import Node
# VESC CAN packet IDs
CAN_PACKET_SET_DUTY = 0
CAN_PACKET_SET_RPM = 3
def make_can_id(packet_id: int, vesc_id: int) -> int:
"""Build extended CAN frame ID for VESC."""
return (packet_id << 8) | vesc_id
class VescCanDriver(Node):
def __init__(self):
super().__init__('vesc_can_driver')
# Declare parameters
self.declare_parameter('interface', 'can0')
self.declare_parameter('left_motor_id', 61)
self.declare_parameter('right_motor_id', 79)
self.declare_parameter('wheel_separation', 0.5)
self.declare_parameter('wheel_radius', 0.1)
self.declare_parameter('max_speed', 5.0)
self.declare_parameter('command_timeout', 1.0)
# Read parameters
self.interface = self.get_parameter('interface').value
self.left_id = self.get_parameter('left_motor_id').value
self.right_id = self.get_parameter('right_motor_id').value
self.wheel_sep = self.get_parameter('wheel_separation').value
self.max_speed = self.get_parameter('max_speed').value
self.cmd_timeout = self.get_parameter('command_timeout').value
# Open CAN bus
try:
import pyvesc
except ImportError:
pyvesc = None
self.bus = can.interface.Bus(
channel=self.interface,
bustype='socketcan',
)
self.get_logger().info(f'CAN bus opened: {self.interface}')
except Exception as e:
self.get_logger().error(f'Failed to open CAN bus: {e}')
raise
self._last_cmd_time = time.monotonic()
class VESCCommandMode(Enum):
"""VESC command modes."""
DUTY_CYCLE = "duty_cycle" # Direct duty cycle (-100 to 100)
RPM = "rpm" # RPM setpoint
CURRENT = "current" # Current (A)
BRAKE_CURRENT = "brake" # Brake current (A)
# Subscriber
self.create_subscription(Twist, '/cmd_vel', self._cmd_vel_cb, 10)
class VESCState:
"""VESC telemetry state."""
def __init__(self):
self.voltage_v = 0.0
self.current_a = 0.0
self.rpm = 0
self.temperature_c = 0.0
self.fault_code = 0
self.timestamp = time.time()
class VESCDriverNode(Node):
"""ROS2 node for VESC motor control and telemetry."""
def __init__(self):
super().__init__("vesc_driver")
# Parameters
self.declare_parameter("port", "/dev/ttyUSB0")
self.declare_parameter("baudrate", 115200)
self.declare_parameter("command_mode", "duty_cycle") # or "rpm"
self.declare_parameter("max_rpm", 60000)
self.declare_parameter("max_current_a", 50.0)
self.declare_parameter("timeout_s", 1.0)
self.port = self.get_parameter("port").value
self.baudrate = self.get_parameter("baudrate").value
self.command_mode = self.get_parameter("command_mode").value
self.max_rpm = self.get_parameter("max_rpm").value
self.max_current_a = self.get_parameter("max_current_a").value
self.timeout_s = self.get_parameter("timeout_s").value
# Serial connection
self.serial = None
self.vesc = None
self.last_cmd_time = 0
self.state = VESCState()
# Subscriptions
self.create_subscription(Twist, "/cmd_vel", self._on_cmd_vel, 10)
# Publishers
self.pub_state = self.create_publisher(String, "/vesc/state", 10)
self.pub_telemetry = self.create_publisher(String, "/vesc/raw_telemetry", 10)
# Timer for telemetry polling (100 Hz)
self.create_timer(0.01, self._poll_telemetry)
# Initialize VESC connection
self._init_vesc()
# Watchdog timer (10 Hz)
self.create_timer(0.1, self._watchdog_cb)
self.get_logger().info(
f"VESC driver initialized: port={self.port}, baud={self.baudrate}, "
f"mode={self.command_mode}, timeout={self.timeout_s}s"
f'VESC CAN driver ready — left={self.left_id} right={self.right_id}'
)
def _init_vesc(self) -> bool:
"""Initialize serial connection to VESC."""
try:
if pyvesc is None:
self.get_logger().error("pyvesc not installed. Install with: pip install pyvesc")
return False
# ------------------------------------------------------------------
self.serial = serial.Serial(
port=self.port,
baudrate=self.baudrate,
timeout=0.1,
def _cmd_vel_cb(self, msg: Twist):
self._last_cmd_time = time.monotonic()
linear = msg.linear.x
angular = msg.angular.z
left_speed = linear - (angular * self.wheel_sep / 2.0)
right_speed = linear + (angular * self.wheel_sep / 2.0)
left_duty = max(-1.0, min(1.0, left_speed / self.max_speed))
right_duty = max(-1.0, min(1.0, right_speed / self.max_speed))
self._send_duty(self.left_id, left_duty)
self._send_duty(self.right_id, right_duty)
def _watchdog_cb(self):
elapsed = time.monotonic() - self._last_cmd_time
if elapsed > self.cmd_timeout:
self._send_duty(self.left_id, 0.0)
self._send_duty(self.right_id, 0.0)
# ------------------------------------------------------------------
def _send_duty(self, vesc_id: int, duty: float):
"""Send CAN_PACKET_SET_DUTY to a VESC controller."""
can_id = make_can_id(CAN_PACKET_SET_DUTY, vesc_id)
payload = struct.pack('>i', int(duty * 100000))
msg = can.Message(
arbitration_id=can_id,
data=payload,
is_extended_id=True,
)
self.vesc = pyvesc.VescUart(
serial_port=self.serial,
has_sensor=False, # No wheel speed sensor
start_heartbeat=True,
)
self.get_logger().info(f"Connected to VESC on {self.port} @ {self.baudrate} baud")
return True
except (serial.SerialException, Exception) as e:
self.get_logger().error(f"Failed to initialize VESC: {e}")
return False
def _on_cmd_vel(self, msg: Twist) -> None:
"""Handle velocity command from /cmd_vel."""
if self.vesc is None:
return
try:
# Extract linear velocity (m/s) and angular velocity (rad/s)
linear_v = msg.linear.x
angular_v = msg.angular.z
self.bus.send(msg)
except can.CanError as e:
self.get_logger().error(f'CAN send error (id={vesc_id}): {e}')
# Convert velocity to VESC command based on mode
if self.command_mode == "duty_cycle":
# Map velocity to duty cycle (-100 to 100)
# Assuming max velocity ~5 m/s maps to ~100% duty
duty = max(-100, min(100, (linear_v / 5.0) * 100))
self.vesc.set_duty(duty / 100.0) # pyvesc expects 0-1 or -1-0
elif self.command_mode == "rpm":
# Map velocity to RPM
# Assuming max velocity 5 m/s → max_rpm
rpm = max(-self.max_rpm, min(self.max_rpm, (linear_v / 5.0) * self.max_rpm))
self.vesc.set_rpm(int(rpm))
# Angular velocity could control steering if dual motors are used differently
# For now, it's ignored (single dual-motor ESC)
self.last_cmd_time = time.time()
except Exception as e:
self.get_logger().error(f"Error sending command to VESC: {e}")
def _poll_telemetry(self) -> None:
"""Poll VESC for telemetry and publish state."""
if self.vesc is None:
return
try:
# Read telemetry from VESC
if self.vesc.read_values():
# Update state from VESC data
self.state.voltage_v = self.vesc.data.v_in
self.state.current_a = self.vesc.data.current_in
self.state.rpm = self.vesc.data.rpm
self.state.temperature_c = self.vesc.data.temp_fet
self.state.fault_code = self.vesc.data.fault_code
self.state.timestamp = time.time()
# Check for timeout
if time.time() - self.last_cmd_time > self.timeout_s:
# No recent command, send zero
if self.command_mode == "duty_cycle":
self.vesc.set_duty(0.0)
else:
self.vesc.set_rpm(0)
# Publish state
self._publish_state()
else:
self.get_logger().warn("Failed to read VESC telemetry")
except Exception as e:
self.get_logger().error(f"Error polling VESC: {e}")
def _publish_state(self) -> None:
"""Publish VESC state as JSON."""
state_dict = {
"voltage_v": round(self.state.voltage_v, 2),
"current_a": round(self.state.current_a, 2),
"rpm": self.state.rpm,
"temperature_c": round(self.state.temperature_c, 1),
"fault_code": self.state.fault_code,
"timestamp": self.state.timestamp,
}
msg = String(data=json.dumps(state_dict))
self.pub_state.publish(msg)
self.pub_telemetry.publish(msg)
# Log fault codes
if self.state.fault_code != 0:
self.get_logger().warn(f"VESC fault code: {self.state.fault_code}")
def destroy_node(self):
self._send_duty(self.left_id, 0.0)
self._send_duty(self.right_id, 0.0)
self.bus.shutdown()
super().destroy_node()
def main(args=None):
rclpy.init(args=args)
node = VESCDriverNode()
node = VescCanDriver()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
if node.vesc and node.serial:
node.vesc.set_duty(0.0) # Zero throttle on shutdown
node.serial.close()
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
if __name__ == '__main__':
main()