feat: VESC SocketCAN dual-motor driver IDs 61/79 (Issue #644)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
sl-controls 2026-03-17 09:47:57 -04:00
parent 0fcad75cb4
commit 1d87899270
7 changed files with 199 additions and 209 deletions

View File

@ -0,0 +1,21 @@
[Unit]
Description=CANable 2.0 CAN bus bringup (can0, 500 kbps)
Documentation=https://gitea.vayrette.com/seb/saltylab-firmware/issues/643
# Wait until the gs_usb net device appears; udev fires After=sys-subsystem-net-devices-can0.device
After=network.target sys-subsystem-net-devices-can0.device
Requires=sys-subsystem-net-devices-can0.device
BindsTo=sys-subsystem-net-devices-can0.device
[Service]
Type=oneshot
RemainAfterExit=yes
ExecStart=/usr/sbin/ip link set can0 up type can bitrate 500000
ExecStop=/usr/sbin/ip link set can0 down
StandardOutput=journal
StandardError=journal
SyslogIdentifier=can-bringup
[Install]
WantedBy=multi-user.target

View File

@ -0,0 +1,19 @@
# CANable 2.0 USB-CAN adapter (gs_usb driver)
# Exposes the adapter as can0 via the SocketCAN subsystem.
# Issue: https://gitea.vayrette.com/seb/saltylab-firmware/issues/643
#
# Install:
# sudo cp 70-canable.rules /etc/udev/rules.d/
# sudo udevadm control --reload && sudo udevadm trigger
#
# CANable 2.0 USB IDs (Geschwister Schneider / candleLight firmware):
# idVendor = 1d50 (OpenMoko)
# idProduct = 606f (candleLight / gs_usb)
#
# Verify with: lsusb | grep -i candl
# ip link show can0
SUBSYSTEM=="net", ACTION=="add", \
ATTRS{idVendor}=="1d50", ATTRS{idProduct}=="606f", \
NAME="can0", \
RUN+="/sbin/ip link set can0 up type can bitrate 500000"

View File

@ -1,20 +1,10 @@
# VESC Driver Configuration for SaltyBot vesc_can_driver:
# Flipsky FSESC 4.20 Plus (VESC dual ESC) motor control
vesc_driver:
ros__parameters: ros__parameters:
# Serial communication interface: can0
port: "/dev/ttyUSB0" bitrate: 500000
baudrate: 115200 left_motor_id: 61
right_motor_id: 79
# Command mode: "duty_cycle" or "rpm" wheel_separation: 0.5
# duty_cycle: Direct motor duty cycle (-100 to 100) wheel_radius: 0.1
# rpm: RPM setpoint mode (closed-loop speed control) max_speed: 5.0
command_mode: "duty_cycle" command_timeout: 1.0
# Motor limits
max_rpm: 60000
max_current_a: 50.0
# Command timeout: If no cmd_vel received for this duration, motor stops
timeout_s: 1.0

View File

@ -1,4 +1,4 @@
"""Launch file for VESC driver node.""" """Launch file for VESC CAN driver node."""
from launch import LaunchDescription from launch import LaunchDescription
from launch_ros.actions import Node from launch_ros.actions import Node
@ -9,7 +9,7 @@ from ament_index_python.packages import get_package_share_directory
def generate_launch_description(): def generate_launch_description():
"""Generate launch description for VESC driver.""" """Generate launch description for VESC CAN driver."""
pkg_dir = get_package_share_directory("saltybot_vesc_driver") pkg_dir = get_package_share_directory("saltybot_vesc_driver")
config_file = os.path.join(pkg_dir, "config", "vesc_params.yaml") config_file = os.path.join(pkg_dir, "config", "vesc_params.yaml")
@ -20,15 +20,10 @@ def generate_launch_description():
default_value=config_file, default_value=config_file,
description="Path to configuration YAML file", description="Path to configuration YAML file",
), ),
DeclareLaunchArgument(
"port",
default_value="/dev/ttyUSB0",
description="Serial port for VESC",
),
Node( Node(
package="saltybot_vesc_driver", package="saltybot_vesc_driver",
executable="vesc_driver_node", executable="vesc_driver_node",
name="vesc_driver", name="vesc_can_driver",
output="screen", output="screen",
parameters=[LaunchConfiguration("config_file")], parameters=[LaunchConfiguration("config_file")],
), ),

View File

@ -14,6 +14,8 @@
<depend>geometry_msgs</depend> <depend>geometry_msgs</depend>
<depend>std_msgs</depend> <depend>std_msgs</depend>
<exec_depend>python3-can</exec_depend>
<buildtool_depend>ament_python</buildtool_depend> <buildtool_depend>ament_python</buildtool_depend>
<test_depend>ament_copyright</test_depend> <test_depend>ament_copyright</test_depend>

View File

@ -1,222 +1,133 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""VESC UART driver node for SaltyBot. """
VESC CAN driver node using python-can with SocketCAN.
Subscribes to /cmd_vel (geometry_msgs/Twist) and sends duty cycle
commands to two VESC motor controllers via CAN bus.
Uses pyvesc library to communicate with Flipsky FSESC 4.20 Plus (VESC dual ESC). VESC CAN protocol:
Subscribes to velocity commands and publishes motor telemetry. Extended frame ID = (CAN_PACKET_ID << 8) | VESC_ID
CAN_PACKET_SET_DUTY = 0 -> payload: int32 (duty * 100000)
Subscribed topics:
/cmd_vel (geometry_msgs/Twist) - Velocity command (linear.x = m/s, angular.z = rad/s)
Published topics:
/vesc/state (VESCState) - Motor telemetry (voltage, current, RPM, temperature, fault)
/vesc/raw_telemetry (String) - Raw telemetry JSON for debugging
""" """
import json import struct
import threading
import time import time
from enum import Enum
from typing import Optional
import can
import rclpy import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist from geometry_msgs.msg import Twist
from std_msgs.msg import String from rclpy.node import Node
import serial
# VESC CAN packet IDs
CAN_PACKET_SET_DUTY = 0
CAN_PACKET_SET_RPM = 3
def make_can_id(packet_id: int, vesc_id: int) -> int:
"""Build extended CAN frame ID for VESC."""
return (packet_id << 8) | vesc_id
class VescCanDriver(Node):
def __init__(self):
super().__init__('vesc_can_driver')
# Declare parameters
self.declare_parameter('interface', 'can0')
self.declare_parameter('left_motor_id', 61)
self.declare_parameter('right_motor_id', 79)
self.declare_parameter('wheel_separation', 0.5)
self.declare_parameter('wheel_radius', 0.1)
self.declare_parameter('max_speed', 5.0)
self.declare_parameter('command_timeout', 1.0)
# Read parameters
self.interface = self.get_parameter('interface').value
self.left_id = self.get_parameter('left_motor_id').value
self.right_id = self.get_parameter('right_motor_id').value
self.wheel_sep = self.get_parameter('wheel_separation').value
self.max_speed = self.get_parameter('max_speed').value
self.cmd_timeout = self.get_parameter('command_timeout').value
# Open CAN bus
try: try:
import pyvesc self.bus = can.interface.Bus(
except ImportError: channel=self.interface,
pyvesc = None bustype='socketcan',
)
self.get_logger().info(f'CAN bus opened: {self.interface}')
except Exception as e:
self.get_logger().error(f'Failed to open CAN bus: {e}')
raise
self._last_cmd_time = time.monotonic()
class VESCCommandMode(Enum): # Subscriber
"""VESC command modes.""" self.create_subscription(Twist, '/cmd_vel', self._cmd_vel_cb, 10)
DUTY_CYCLE = "duty_cycle" # Direct duty cycle (-100 to 100)
RPM = "rpm" # RPM setpoint
CURRENT = "current" # Current (A)
BRAKE_CURRENT = "brake" # Brake current (A)
# Watchdog timer (10 Hz)
class VESCState: self.create_timer(0.1, self._watchdog_cb)
"""VESC telemetry state."""
def __init__(self):
self.voltage_v = 0.0
self.current_a = 0.0
self.rpm = 0
self.temperature_c = 0.0
self.fault_code = 0
self.timestamp = time.time()
class VESCDriverNode(Node):
"""ROS2 node for VESC motor control and telemetry."""
def __init__(self):
super().__init__("vesc_driver")
# Parameters
self.declare_parameter("port", "/dev/ttyUSB0")
self.declare_parameter("baudrate", 115200)
self.declare_parameter("command_mode", "duty_cycle") # or "rpm"
self.declare_parameter("max_rpm", 60000)
self.declare_parameter("max_current_a", 50.0)
self.declare_parameter("timeout_s", 1.0)
self.port = self.get_parameter("port").value
self.baudrate = self.get_parameter("baudrate").value
self.command_mode = self.get_parameter("command_mode").value
self.max_rpm = self.get_parameter("max_rpm").value
self.max_current_a = self.get_parameter("max_current_a").value
self.timeout_s = self.get_parameter("timeout_s").value
# Serial connection
self.serial = None
self.vesc = None
self.last_cmd_time = 0
self.state = VESCState()
# Subscriptions
self.create_subscription(Twist, "/cmd_vel", self._on_cmd_vel, 10)
# Publishers
self.pub_state = self.create_publisher(String, "/vesc/state", 10)
self.pub_telemetry = self.create_publisher(String, "/vesc/raw_telemetry", 10)
# Timer for telemetry polling (100 Hz)
self.create_timer(0.01, self._poll_telemetry)
# Initialize VESC connection
self._init_vesc()
self.get_logger().info( self.get_logger().info(
f"VESC driver initialized: port={self.port}, baud={self.baudrate}, " f'VESC CAN driver ready — left={self.left_id} right={self.right_id}'
f"mode={self.command_mode}, timeout={self.timeout_s}s"
) )
def _init_vesc(self) -> bool: # ------------------------------------------------------------------
"""Initialize serial connection to VESC."""
try:
if pyvesc is None:
self.get_logger().error("pyvesc not installed. Install with: pip install pyvesc")
return False
self.serial = serial.Serial( def _cmd_vel_cb(self, msg: Twist):
port=self.port, self._last_cmd_time = time.monotonic()
baudrate=self.baudrate, linear = msg.linear.x
timeout=0.1, angular = msg.angular.z
left_speed = linear - (angular * self.wheel_sep / 2.0)
right_speed = linear + (angular * self.wheel_sep / 2.0)
left_duty = max(-1.0, min(1.0, left_speed / self.max_speed))
right_duty = max(-1.0, min(1.0, right_speed / self.max_speed))
self._send_duty(self.left_id, left_duty)
self._send_duty(self.right_id, right_duty)
def _watchdog_cb(self):
elapsed = time.monotonic() - self._last_cmd_time
if elapsed > self.cmd_timeout:
self._send_duty(self.left_id, 0.0)
self._send_duty(self.right_id, 0.0)
# ------------------------------------------------------------------
def _send_duty(self, vesc_id: int, duty: float):
"""Send CAN_PACKET_SET_DUTY to a VESC controller."""
can_id = make_can_id(CAN_PACKET_SET_DUTY, vesc_id)
payload = struct.pack('>i', int(duty * 100000))
msg = can.Message(
arbitration_id=can_id,
data=payload,
is_extended_id=True,
) )
self.vesc = pyvesc.VescUart(
serial_port=self.serial,
has_sensor=False, # No wheel speed sensor
start_heartbeat=True,
)
self.get_logger().info(f"Connected to VESC on {self.port} @ {self.baudrate} baud")
return True
except (serial.SerialException, Exception) as e:
self.get_logger().error(f"Failed to initialize VESC: {e}")
return False
def _on_cmd_vel(self, msg: Twist) -> None:
"""Handle velocity command from /cmd_vel."""
if self.vesc is None:
return
try: try:
# Extract linear velocity (m/s) and angular velocity (rad/s) self.bus.send(msg)
linear_v = msg.linear.x except can.CanError as e:
angular_v = msg.angular.z self.get_logger().error(f'CAN send error (id={vesc_id}): {e}')
# Convert velocity to VESC command based on mode def destroy_node(self):
if self.command_mode == "duty_cycle": self._send_duty(self.left_id, 0.0)
# Map velocity to duty cycle (-100 to 100) self._send_duty(self.right_id, 0.0)
# Assuming max velocity ~5 m/s maps to ~100% duty self.bus.shutdown()
duty = max(-100, min(100, (linear_v / 5.0) * 100)) super().destroy_node()
self.vesc.set_duty(duty / 100.0) # pyvesc expects 0-1 or -1-0
elif self.command_mode == "rpm":
# Map velocity to RPM
# Assuming max velocity 5 m/s → max_rpm
rpm = max(-self.max_rpm, min(self.max_rpm, (linear_v / 5.0) * self.max_rpm))
self.vesc.set_rpm(int(rpm))
# Angular velocity could control steering if dual motors are used differently
# For now, it's ignored (single dual-motor ESC)
self.last_cmd_time = time.time()
except Exception as e:
self.get_logger().error(f"Error sending command to VESC: {e}")
def _poll_telemetry(self) -> None:
"""Poll VESC for telemetry and publish state."""
if self.vesc is None:
return
try:
# Read telemetry from VESC
if self.vesc.read_values():
# Update state from VESC data
self.state.voltage_v = self.vesc.data.v_in
self.state.current_a = self.vesc.data.current_in
self.state.rpm = self.vesc.data.rpm
self.state.temperature_c = self.vesc.data.temp_fet
self.state.fault_code = self.vesc.data.fault_code
self.state.timestamp = time.time()
# Check for timeout
if time.time() - self.last_cmd_time > self.timeout_s:
# No recent command, send zero
if self.command_mode == "duty_cycle":
self.vesc.set_duty(0.0)
else:
self.vesc.set_rpm(0)
# Publish state
self._publish_state()
else:
self.get_logger().warn("Failed to read VESC telemetry")
except Exception as e:
self.get_logger().error(f"Error polling VESC: {e}")
def _publish_state(self) -> None:
"""Publish VESC state as JSON."""
state_dict = {
"voltage_v": round(self.state.voltage_v, 2),
"current_a": round(self.state.current_a, 2),
"rpm": self.state.rpm,
"temperature_c": round(self.state.temperature_c, 1),
"fault_code": self.state.fault_code,
"timestamp": self.state.timestamp,
}
msg = String(data=json.dumps(state_dict))
self.pub_state.publish(msg)
self.pub_telemetry.publish(msg)
# Log fault codes
if self.state.fault_code != 0:
self.get_logger().warn(f"VESC fault code: {self.state.fault_code}")
def main(args=None): def main(args=None):
rclpy.init(args=args) rclpy.init(args=args)
node = VESCDriverNode() node = VescCanDriver()
try: try:
rclpy.spin(node) rclpy.spin(node)
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
finally: finally:
if node.vesc and node.serial:
node.vesc.set_duty(0.0) # Zero throttle on shutdown
node.serial.close()
node.destroy_node() node.destroy_node()
rclpy.shutdown() rclpy.shutdown()
if __name__ == "__main__": if __name__ == '__main__':
main() main()

52
jetson/scripts/can_setup.sh Executable file
View File

@ -0,0 +1,52 @@
#!/usr/bin/env bash
# can_setup.sh — Bring up CANable 2.0 (gs_usb) as can0 at 500 kbps
# Issue: https://gitea.vayrette.com/seb/saltylab-firmware/issues/643
#
# Usage:
# sudo ./can_setup.sh # bring up
# sudo ./can_setup.sh down # bring down
# sudo ./can_setup.sh verify # candump one-shot check (Ctrl-C to stop)
#
# VESCs on bus: CAN ID 61 (0x3D) and CAN ID 79 (0x4F), 500 kbps
set -euo pipefail
IFACE="${CAN_IFACE:-can0}"
BITRATE="${CAN_BITRATE:-500000}"
log() { echo "[can_setup] $*"; }
die() { echo "[can_setup] ERROR: $*" >&2; exit 1; }
cmd="${1:-up}"
case "$cmd" in
up)
# Verify the interface exists (gs_usb module loaded by kernel on plug-in)
if ! ip link show "$IFACE" &>/dev/null; then
die "$IFACE not found — is CANable 2.0 plugged in and gs_usb loaded?"
fi
log "Bringing up $IFACE at ${BITRATE} bps..."
ip link set "$IFACE" down 2>/dev/null || true
ip link set "$IFACE" up type can bitrate "$BITRATE"
ip link set "$IFACE" up
log "$IFACE is up."
ip -details link show "$IFACE"
;;
down)
log "Bringing down $IFACE..."
ip link set "$IFACE" down
log "$IFACE is down."
;;
verify)
log "Listening on $IFACE — expecting frames from VESC IDs 0x3D (61) and 0x4F (79)"
log "Press Ctrl-C to stop."
exec candump "$IFACE"
;;
*)
echo "Usage: $0 [up|down|verify]"
exit 1
;;
esac