Compare commits

..

No commits in common. "bf8df6af8fca49c61b0496b928b7da04b1bb6af3" and "a506989af6279469107de85478623c7daf80927c" have entirely different histories.

4 changed files with 209 additions and 107 deletions

View File

@ -1,10 +1,20 @@
vesc_can_driver:
# VESC Driver Configuration for SaltyBot
# Flipsky FSESC 4.20 Plus (VESC dual ESC) motor control
vesc_driver:
ros__parameters:
interface: can0
bitrate: 500000
left_motor_id: 61
right_motor_id: 79
wheel_separation: 0.5
wheel_radius: 0.1
max_speed: 5.0
command_timeout: 1.0
# Serial communication
port: "/dev/ttyUSB0"
baudrate: 115200
# Command mode: "duty_cycle" or "rpm"
# duty_cycle: Direct motor duty cycle (-100 to 100)
# rpm: RPM setpoint mode (closed-loop speed control)
command_mode: "duty_cycle"
# Motor limits
max_rpm: 60000
max_current_a: 50.0
# Command timeout: If no cmd_vel received for this duration, motor stops
timeout_s: 1.0

View File

@ -1,4 +1,4 @@
"""Launch file for VESC CAN driver node."""
"""Launch file for VESC driver node."""
from launch import LaunchDescription
from launch_ros.actions import Node
@ -9,7 +9,7 @@ from ament_index_python.packages import get_package_share_directory
def generate_launch_description():
"""Generate launch description for VESC CAN driver."""
"""Generate launch description for VESC driver."""
pkg_dir = get_package_share_directory("saltybot_vesc_driver")
config_file = os.path.join(pkg_dir, "config", "vesc_params.yaml")
@ -20,10 +20,15 @@ def generate_launch_description():
default_value=config_file,
description="Path to configuration YAML file",
),
DeclareLaunchArgument(
"port",
default_value="/dev/ttyUSB0",
description="Serial port for VESC",
),
Node(
package="saltybot_vesc_driver",
executable="vesc_driver_node",
name="vesc_can_driver",
name="vesc_driver",
output="screen",
parameters=[LaunchConfiguration("config_file")],
),

View File

@ -14,8 +14,6 @@
<depend>geometry_msgs</depend>
<depend>std_msgs</depend>
<exec_depend>python3-can</exec_depend>
<buildtool_depend>ament_python</buildtool_depend>
<test_depend>ament_copyright</test_depend>

View File

@ -1,133 +1,222 @@
#!/usr/bin/env python3
"""
VESC CAN driver node using python-can with SocketCAN.
Subscribes to /cmd_vel (geometry_msgs/Twist) and sends duty cycle
commands to two VESC motor controllers via CAN bus.
"""VESC UART driver node for SaltyBot.
VESC CAN protocol:
Extended frame ID = (CAN_PACKET_ID << 8) | VESC_ID
CAN_PACKET_SET_DUTY = 0 -> payload: int32 (duty * 100000)
Uses pyvesc library to communicate with Flipsky FSESC 4.20 Plus (VESC dual ESC).
Subscribes to velocity commands and publishes motor telemetry.
Subscribed topics:
/cmd_vel (geometry_msgs/Twist) - Velocity command (linear.x = m/s, angular.z = rad/s)
Published topics:
/vesc/state (VESCState) - Motor telemetry (voltage, current, RPM, temperature, fault)
/vesc/raw_telemetry (String) - Raw telemetry JSON for debugging
"""
import struct
import json
import threading
import time
from enum import Enum
from typing import Optional
import can
import rclpy
from geometry_msgs.msg import Twist
from rclpy.node import Node
from geometry_msgs.msg import Twist
from std_msgs.msg import String
import serial
# VESC CAN packet IDs
CAN_PACKET_SET_DUTY = 0
CAN_PACKET_SET_RPM = 3
def make_can_id(packet_id: int, vesc_id: int) -> int:
"""Build extended CAN frame ID for VESC."""
return (packet_id << 8) | vesc_id
class VescCanDriver(Node):
def __init__(self):
super().__init__('vesc_can_driver')
# Declare parameters
self.declare_parameter('interface', 'can0')
self.declare_parameter('left_motor_id', 61)
self.declare_parameter('right_motor_id', 79)
self.declare_parameter('wheel_separation', 0.5)
self.declare_parameter('wheel_radius', 0.1)
self.declare_parameter('max_speed', 5.0)
self.declare_parameter('command_timeout', 1.0)
# Read parameters
self.interface = self.get_parameter('interface').value
self.left_id = self.get_parameter('left_motor_id').value
self.right_id = self.get_parameter('right_motor_id').value
self.wheel_sep = self.get_parameter('wheel_separation').value
self.max_speed = self.get_parameter('max_speed').value
self.cmd_timeout = self.get_parameter('command_timeout').value
# Open CAN bus
try:
self.bus = can.interface.Bus(
channel=self.interface,
bustype='socketcan',
)
self.get_logger().info(f'CAN bus opened: {self.interface}')
except Exception as e:
self.get_logger().error(f'Failed to open CAN bus: {e}')
raise
import pyvesc
except ImportError:
pyvesc = None
self._last_cmd_time = time.monotonic()
# Subscriber
self.create_subscription(Twist, '/cmd_vel', self._cmd_vel_cb, 10)
class VESCCommandMode(Enum):
"""VESC command modes."""
DUTY_CYCLE = "duty_cycle" # Direct duty cycle (-100 to 100)
RPM = "rpm" # RPM setpoint
CURRENT = "current" # Current (A)
BRAKE_CURRENT = "brake" # Brake current (A)
# Watchdog timer (10 Hz)
self.create_timer(0.1, self._watchdog_cb)
class VESCState:
"""VESC telemetry state."""
def __init__(self):
self.voltage_v = 0.0
self.current_a = 0.0
self.rpm = 0
self.temperature_c = 0.0
self.fault_code = 0
self.timestamp = time.time()
class VESCDriverNode(Node):
"""ROS2 node for VESC motor control and telemetry."""
def __init__(self):
super().__init__("vesc_driver")
# Parameters
self.declare_parameter("port", "/dev/ttyUSB0")
self.declare_parameter("baudrate", 115200)
self.declare_parameter("command_mode", "duty_cycle") # or "rpm"
self.declare_parameter("max_rpm", 60000)
self.declare_parameter("max_current_a", 50.0)
self.declare_parameter("timeout_s", 1.0)
self.port = self.get_parameter("port").value
self.baudrate = self.get_parameter("baudrate").value
self.command_mode = self.get_parameter("command_mode").value
self.max_rpm = self.get_parameter("max_rpm").value
self.max_current_a = self.get_parameter("max_current_a").value
self.timeout_s = self.get_parameter("timeout_s").value
# Serial connection
self.serial = None
self.vesc = None
self.last_cmd_time = 0
self.state = VESCState()
# Subscriptions
self.create_subscription(Twist, "/cmd_vel", self._on_cmd_vel, 10)
# Publishers
self.pub_state = self.create_publisher(String, "/vesc/state", 10)
self.pub_telemetry = self.create_publisher(String, "/vesc/raw_telemetry", 10)
# Timer for telemetry polling (100 Hz)
self.create_timer(0.01, self._poll_telemetry)
# Initialize VESC connection
self._init_vesc()
self.get_logger().info(
f'VESC CAN driver ready — left={self.left_id} right={self.right_id}'
f"VESC driver initialized: port={self.port}, baud={self.baudrate}, "
f"mode={self.command_mode}, timeout={self.timeout_s}s"
)
# ------------------------------------------------------------------
def _cmd_vel_cb(self, msg: Twist):
self._last_cmd_time = time.monotonic()
linear = msg.linear.x
angular = msg.angular.z
left_speed = linear - (angular * self.wheel_sep / 2.0)
right_speed = linear + (angular * self.wheel_sep / 2.0)
left_duty = max(-1.0, min(1.0, left_speed / self.max_speed))
right_duty = max(-1.0, min(1.0, right_speed / self.max_speed))
self._send_duty(self.left_id, left_duty)
self._send_duty(self.right_id, right_duty)
def _watchdog_cb(self):
elapsed = time.monotonic() - self._last_cmd_time
if elapsed > self.cmd_timeout:
self._send_duty(self.left_id, 0.0)
self._send_duty(self.right_id, 0.0)
# ------------------------------------------------------------------
def _send_duty(self, vesc_id: int, duty: float):
"""Send CAN_PACKET_SET_DUTY to a VESC controller."""
can_id = make_can_id(CAN_PACKET_SET_DUTY, vesc_id)
payload = struct.pack('>i', int(duty * 100000))
msg = can.Message(
arbitration_id=can_id,
data=payload,
is_extended_id=True,
)
def _init_vesc(self) -> bool:
"""Initialize serial connection to VESC."""
try:
self.bus.send(msg)
except can.CanError as e:
self.get_logger().error(f'CAN send error (id={vesc_id}): {e}')
if pyvesc is None:
self.get_logger().error("pyvesc not installed. Install with: pip install pyvesc")
return False
def destroy_node(self):
self._send_duty(self.left_id, 0.0)
self._send_duty(self.right_id, 0.0)
self.bus.shutdown()
super().destroy_node()
self.serial = serial.Serial(
port=self.port,
baudrate=self.baudrate,
timeout=0.1,
)
self.vesc = pyvesc.VescUart(
serial_port=self.serial,
has_sensor=False, # No wheel speed sensor
start_heartbeat=True,
)
self.get_logger().info(f"Connected to VESC on {self.port} @ {self.baudrate} baud")
return True
except (serial.SerialException, Exception) as e:
self.get_logger().error(f"Failed to initialize VESC: {e}")
return False
def _on_cmd_vel(self, msg: Twist) -> None:
"""Handle velocity command from /cmd_vel."""
if self.vesc is None:
return
try:
# Extract linear velocity (m/s) and angular velocity (rad/s)
linear_v = msg.linear.x
angular_v = msg.angular.z
# Convert velocity to VESC command based on mode
if self.command_mode == "duty_cycle":
# Map velocity to duty cycle (-100 to 100)
# Assuming max velocity ~5 m/s maps to ~100% duty
duty = max(-100, min(100, (linear_v / 5.0) * 100))
self.vesc.set_duty(duty / 100.0) # pyvesc expects 0-1 or -1-0
elif self.command_mode == "rpm":
# Map velocity to RPM
# Assuming max velocity 5 m/s → max_rpm
rpm = max(-self.max_rpm, min(self.max_rpm, (linear_v / 5.0) * self.max_rpm))
self.vesc.set_rpm(int(rpm))
# Angular velocity could control steering if dual motors are used differently
# For now, it's ignored (single dual-motor ESC)
self.last_cmd_time = time.time()
except Exception as e:
self.get_logger().error(f"Error sending command to VESC: {e}")
def _poll_telemetry(self) -> None:
"""Poll VESC for telemetry and publish state."""
if self.vesc is None:
return
try:
# Read telemetry from VESC
if self.vesc.read_values():
# Update state from VESC data
self.state.voltage_v = self.vesc.data.v_in
self.state.current_a = self.vesc.data.current_in
self.state.rpm = self.vesc.data.rpm
self.state.temperature_c = self.vesc.data.temp_fet
self.state.fault_code = self.vesc.data.fault_code
self.state.timestamp = time.time()
# Check for timeout
if time.time() - self.last_cmd_time > self.timeout_s:
# No recent command, send zero
if self.command_mode == "duty_cycle":
self.vesc.set_duty(0.0)
else:
self.vesc.set_rpm(0)
# Publish state
self._publish_state()
else:
self.get_logger().warn("Failed to read VESC telemetry")
except Exception as e:
self.get_logger().error(f"Error polling VESC: {e}")
def _publish_state(self) -> None:
"""Publish VESC state as JSON."""
state_dict = {
"voltage_v": round(self.state.voltage_v, 2),
"current_a": round(self.state.current_a, 2),
"rpm": self.state.rpm,
"temperature_c": round(self.state.temperature_c, 1),
"fault_code": self.state.fault_code,
"timestamp": self.state.timestamp,
}
msg = String(data=json.dumps(state_dict))
self.pub_state.publish(msg)
self.pub_telemetry.publish(msg)
# Log fault codes
if self.state.fault_code != 0:
self.get_logger().warn(f"VESC fault code: {self.state.fault_code}")
def main(args=None):
rclpy.init(args=args)
node = VescCanDriver()
node = VESCDriverNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
if node.vesc and node.serial:
node.vesc.set_duty(0.0) # Zero throttle on shutdown
node.serial.close()
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
if __name__ == "__main__":
main()