feat(tank): SaltyTank tracked-vehicle ESC driver (Issue #122)
- kinematics.py: tank_speeds with slip compensation (angular_cmd scaled
by 1/(1-slip_factor)), skid_steer_speeds (no slip), speed_to_pwm,
compute_track_speeds (2wd|4wd|tracked modes, ±max clip), expand_to_4ch,
odometry_from_track_speeds (angular scaled by (1-slip_factor) — inverse
of command path, consistent dead-reckoning across all slip values)
- tank_driver_node.py: 50 Hz ROS2 node; serial P<ch1>,<ch2>,<ch3>,<ch4>\n;
H\n heartbeat; dual stop paths (watchdog 0.3s + /saltybot/e_stop topic,
latching); runtime drive_mode + slip_factor param switch; dead-reckoning
odometry with slip compensation; publishes /saltybot/tank_pwm (JSON) +
/saltybot/tank_odom
- config/tank_params.yaml, launch/tank_driver.launch.py, package.xml,
setup.py, setup.cfg
- test/test_tank_kinematics.py: 71 unit tests, all passing (includes
parametric round-trip over slip ∈ {0.0, 0.1, 0.3, 0.5})
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
7de55accc3
commit
413810e6ba
@ -0,0 +1,36 @@
|
|||||||
|
tank_driver:
|
||||||
|
ros__parameters:
|
||||||
|
# Serial interface
|
||||||
|
serial_port: /dev/ttyUSB_tank
|
||||||
|
baud_rate: 115200
|
||||||
|
|
||||||
|
# Drive mode: "tracked" | "4wd" | "2wd"
|
||||||
|
# tracked — tank kinematics with track-slip compensation (default)
|
||||||
|
# 4wd — skid-steer, 4 ESC channels, no slip compensation
|
||||||
|
# 2wd — standard diff-drive, 2 ESC channels, no slip compensation
|
||||||
|
drive_mode: tracked
|
||||||
|
|
||||||
|
# Physical parameters
|
||||||
|
track_width: 0.30 # left↔right track centre distance (m)
|
||||||
|
wheelbase: 0.32 # front↔rear sprocket distance (m) — for reference
|
||||||
|
|
||||||
|
# Speed limits
|
||||||
|
max_speed_ms: 1.5 # m/s at full ESC deflection
|
||||||
|
max_linear_vel: 1.0 # clamp on cmd_vel linear.x (m/s)
|
||||||
|
max_angular_vel: 2.5 # clamp on cmd_vel angular.z (rad/s)
|
||||||
|
|
||||||
|
# Track slip compensation (tracked mode only)
|
||||||
|
# 0.0 = no slip (rigid wheel / high-traction surface)
|
||||||
|
# 0.1 = light slip (rubber tracks on concrete)
|
||||||
|
# 0.3 = moderate slip (rubber tracks on grass/gravel) ← default
|
||||||
|
# 0.5 = heavy slip (metal tracks on mud)
|
||||||
|
slip_factor: 0.3
|
||||||
|
|
||||||
|
# ESC PWM settings
|
||||||
|
pwm_neutral_us: 1500 # neutral/brake pulse width (µs)
|
||||||
|
pwm_range_us: 500 # half-range from neutral → [1000..2000] µs
|
||||||
|
|
||||||
|
# Timing
|
||||||
|
watchdog_timeout_s: 0.3 # deadman: shorter than rover for tracked safety
|
||||||
|
heartbeat_period_s: 0.2
|
||||||
|
control_rate: 50.0 # Hz
|
||||||
@ -0,0 +1,65 @@
|
|||||||
|
"""
|
||||||
|
tank_driver.launch.py — Launch the SaltyTank ESC motor driver node.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
ros2 launch saltybot_tank_driver tank_driver.launch.py
|
||||||
|
ros2 launch saltybot_tank_driver tank_driver.launch.py serial_port:=/dev/ttyACM0
|
||||||
|
ros2 launch saltybot_tank_driver tank_driver.launch.py drive_mode:=4wd
|
||||||
|
ros2 launch saltybot_tank_driver tank_driver.launch.py slip_factor:=0.1
|
||||||
|
|
||||||
|
Emergency stop:
|
||||||
|
ros2 topic pub /saltybot/e_stop std_msgs/Bool '{data: true}' --once
|
||||||
|
ros2 topic pub /saltybot/e_stop std_msgs/Bool '{data: false}' --once
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
|
||||||
|
from ament_index_python.packages import get_package_share_directory
|
||||||
|
from launch import LaunchDescription
|
||||||
|
from launch.actions import DeclareLaunchArgument
|
||||||
|
from launch.substitutions import LaunchConfiguration
|
||||||
|
from launch_ros.actions import Node
|
||||||
|
|
||||||
|
|
||||||
|
def generate_launch_description():
|
||||||
|
pkg_share = get_package_share_directory("saltybot_tank_driver")
|
||||||
|
params_file = os.path.join(pkg_share, "config", "tank_params.yaml")
|
||||||
|
|
||||||
|
serial_port_arg = DeclareLaunchArgument(
|
||||||
|
"serial_port",
|
||||||
|
default_value="/dev/ttyUSB_tank",
|
||||||
|
description="Serial port for ESC controller",
|
||||||
|
)
|
||||||
|
drive_mode_arg = DeclareLaunchArgument(
|
||||||
|
"drive_mode",
|
||||||
|
default_value="tracked",
|
||||||
|
description="Drive mode: tracked | 4wd | 2wd",
|
||||||
|
)
|
||||||
|
slip_factor_arg = DeclareLaunchArgument(
|
||||||
|
"slip_factor",
|
||||||
|
default_value="0.3",
|
||||||
|
description="Track slip coefficient [0.0, 0.99]",
|
||||||
|
)
|
||||||
|
|
||||||
|
tank_driver = Node(
|
||||||
|
package="saltybot_tank_driver",
|
||||||
|
executable="tank_driver_node",
|
||||||
|
name="tank_driver",
|
||||||
|
parameters=[
|
||||||
|
params_file,
|
||||||
|
{
|
||||||
|
"serial_port": LaunchConfiguration("serial_port"),
|
||||||
|
"drive_mode": LaunchConfiguration("drive_mode"),
|
||||||
|
"slip_factor": LaunchConfiguration("slip_factor"),
|
||||||
|
},
|
||||||
|
],
|
||||||
|
output="screen",
|
||||||
|
emulate_tty=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
return LaunchDescription([
|
||||||
|
serial_port_arg,
|
||||||
|
drive_mode_arg,
|
||||||
|
slip_factor_arg,
|
||||||
|
tank_driver,
|
||||||
|
])
|
||||||
23
jetson/ros2_ws/src/saltybot_tank_driver/package.xml
Normal file
23
jetson/ros2_ws/src/saltybot_tank_driver/package.xml
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
<?xml version="1.0"?>
|
||||||
|
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||||
|
<package format="3">
|
||||||
|
<name>saltybot_tank_driver</name>
|
||||||
|
<version>0.1.0</version>
|
||||||
|
<description>SaltyTank tracked-vehicle ESC driver with slip compensation (Issue #122)</description>
|
||||||
|
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
|
||||||
|
<license>MIT</license>
|
||||||
|
|
||||||
|
<depend>rclpy</depend>
|
||||||
|
<depend>geometry_msgs</depend>
|
||||||
|
<depend>nav_msgs</depend>
|
||||||
|
<depend>std_msgs</depend>
|
||||||
|
|
||||||
|
<test_depend>ament_copyright</test_depend>
|
||||||
|
<test_depend>ament_flake8</test_depend>
|
||||||
|
<test_depend>ament_pep257</test_depend>
|
||||||
|
<test_depend>pytest</test_depend>
|
||||||
|
|
||||||
|
<export>
|
||||||
|
<build_type>ament_python</build_type>
|
||||||
|
</export>
|
||||||
|
</package>
|
||||||
@ -0,0 +1,203 @@
|
|||||||
|
"""
|
||||||
|
kinematics.py — Tank / tracked-vehicle kinematics for SaltyTank (Issue #122).
|
||||||
|
|
||||||
|
Drive modes
|
||||||
|
───────────
|
||||||
|
"2wd" — standard differential drive, 2 ESC channels (CH1=left, CH3=right).
|
||||||
|
Mirrored to 4 channels by the caller (CH1=CH2=left, CH3=CH4=right).
|
||||||
|
|
||||||
|
"4wd" — skid-steer 4-channel. Left pair (CH1+CH2) share one speed;
|
||||||
|
right pair (CH3+CH4) share one speed. No slip compensation.
|
||||||
|
|
||||||
|
"tracked" — tank kinematics with track-slip compensation.
|
||||||
|
Commanded angular is scaled up by 1/(1-slip_factor) so the
|
||||||
|
vehicle achieves the desired turn rate despite track slip.
|
||||||
|
Odometry angular is scaled down by (1-slip_factor) to reflect
|
||||||
|
the fact that wheel-speed difference overestimates actual yaw rate.
|
||||||
|
|
||||||
|
Slip compensation model
|
||||||
|
───────────────────────
|
||||||
|
slip_factor ∈ [0.0, 0.99]
|
||||||
|
|
||||||
|
0.0 → rigid wheel / no slip (identical to standard diff-drive)
|
||||||
|
0.1 → light slip (rubber tracks on concrete)
|
||||||
|
0.3 → moderate slip (rubber tracks on grass / gravel) ← default
|
||||||
|
0.5 → heavy slip (metal tracks on mud)
|
||||||
|
|
||||||
|
Command path : angular_cmd = angular_z / (1 - slip_factor)
|
||||||
|
Odometry path : angular_odom = (v_right - v_left) / B × (1 - slip_factor)
|
||||||
|
|
||||||
|
The two corrections are complementary: a vehicle commanded at ω rad/s and
|
||||||
|
estimated by odometry will yield consistent trajectories regardless of the
|
||||||
|
chosen slip_factor value (as long as it is consistent between command and odom).
|
||||||
|
|
||||||
|
Zero-radius turns
|
||||||
|
─────────────────
|
||||||
|
Setting linear_x=0 and angular_z≠0 produces equal-and-opposite track speeds,
|
||||||
|
which is a valid zero-radius pivot for tracked vehicles. The slip factor still
|
||||||
|
applies; set slip_factor=0 for a surface with known high traction.
|
||||||
|
|
||||||
|
Pure module — no ROS2 dependency; fully unit-testable.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import math
|
||||||
|
|
||||||
|
|
||||||
|
# ── Constants ────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
SLIP_MIN = 0.0
|
||||||
|
SLIP_MAX = 0.99 # must stay < 1.0 to avoid division-by-zero
|
||||||
|
DRIVE_MODES = ("2wd", "4wd", "tracked")
|
||||||
|
|
||||||
|
|
||||||
|
# ── Low-level helpers ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _clamp_slip(slip_factor: float) -> float:
|
||||||
|
return max(SLIP_MIN, min(SLIP_MAX, slip_factor))
|
||||||
|
|
||||||
|
|
||||||
|
# ── Track-speed computation ──────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def tank_speeds(
|
||||||
|
linear_x: float,
|
||||||
|
angular_z: float,
|
||||||
|
track_width: float,
|
||||||
|
slip_factor: float = 0.0,
|
||||||
|
) -> tuple[float, float]:
|
||||||
|
"""
|
||||||
|
Unicycle → tank (tracked) differential drive with optional slip compensation.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
linear_x : forward velocity (m/s)
|
||||||
|
angular_z : desired yaw rate (rad/s, +ve = CCW / left turn)
|
||||||
|
track_width : centre-to-centre track separation (m)
|
||||||
|
slip_factor : track slip coefficient ∈ [0.0, 0.99]; 0 = no slip
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
(v_left, v_right) in m/s
|
||||||
|
"""
|
||||||
|
slip = _clamp_slip(slip_factor)
|
||||||
|
# Scale angular command up to achieve desired turn rate despite slip
|
||||||
|
angular_cmd = angular_z / (1.0 - slip) if slip > 0.0 else angular_z
|
||||||
|
half = angular_cmd * track_width / 2.0
|
||||||
|
return linear_x - half, linear_x + half
|
||||||
|
|
||||||
|
|
||||||
|
def skid_steer_speeds(
|
||||||
|
linear_x: float,
|
||||||
|
angular_z: float,
|
||||||
|
track_width: float,
|
||||||
|
) -> tuple[float, float]:
|
||||||
|
"""
|
||||||
|
Standard unicycle → differential drive (no slip compensation).
|
||||||
|
Used for 2WD and 4WD modes.
|
||||||
|
"""
|
||||||
|
half = angular_z * track_width / 2.0
|
||||||
|
return linear_x - half, linear_x + half
|
||||||
|
|
||||||
|
|
||||||
|
# ── m/s → ESC PWM ────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def speed_to_pwm(
|
||||||
|
speed_ms: float,
|
||||||
|
max_speed_ms: float,
|
||||||
|
pwm_neutral_us: int,
|
||||||
|
pwm_range_us: int,
|
||||||
|
) -> int:
|
||||||
|
"""
|
||||||
|
Convert wheel/track speed (m/s) to ESC PWM pulse width (µs).
|
||||||
|
|
||||||
|
Linear mapping:
|
||||||
|
pwm = neutral + (speed / max_speed) * range
|
||||||
|
|
||||||
|
Clamped to [neutral - range, neutral + range].
|
||||||
|
"""
|
||||||
|
if max_speed_ms <= 0.0:
|
||||||
|
return pwm_neutral_us
|
||||||
|
normalised = max(-1.0, min(1.0, speed_ms / max_speed_ms))
|
||||||
|
pwm = pwm_neutral_us + normalised * pwm_range_us
|
||||||
|
lo = pwm_neutral_us - pwm_range_us
|
||||||
|
hi = pwm_neutral_us + pwm_range_us
|
||||||
|
return int(round(max(lo, min(hi, pwm))))
|
||||||
|
|
||||||
|
|
||||||
|
# ── Variant-aware speed computation ─────────────────────────────────────────
|
||||||
|
|
||||||
|
def compute_track_speeds(
|
||||||
|
linear_x: float,
|
||||||
|
angular_z: float,
|
||||||
|
track_width: float,
|
||||||
|
max_speed_ms: float,
|
||||||
|
drive_mode: str = "tracked",
|
||||||
|
slip_factor: float = 0.3,
|
||||||
|
) -> tuple[float, float]:
|
||||||
|
"""
|
||||||
|
Compute and clip left/right track/wheel speeds for the selected mode.
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
(v_left, v_right) clipped to ±max_speed_ms.
|
||||||
|
|
||||||
|
All modes produce a 2-element tuple; the caller expands to 4 channels
|
||||||
|
for 4WD mode by mirroring (vl, vl, vr, vr).
|
||||||
|
"""
|
||||||
|
def clip(v: float) -> float:
|
||||||
|
return max(-max_speed_ms, min(max_speed_ms, v))
|
||||||
|
|
||||||
|
if drive_mode == "tracked":
|
||||||
|
vl, vr = tank_speeds(linear_x, angular_z, track_width, slip_factor)
|
||||||
|
else:
|
||||||
|
# "2wd" or "4wd" — standard diff-drive, no slip compensation
|
||||||
|
vl, vr = skid_steer_speeds(linear_x, angular_z, track_width)
|
||||||
|
|
||||||
|
return clip(vl), clip(vr)
|
||||||
|
|
||||||
|
|
||||||
|
def expand_to_4ch(v_left: float, v_right: float) -> tuple[float, float, float, float]:
|
||||||
|
"""
|
||||||
|
Expand left/right speeds to 4-channel ESC layout.
|
||||||
|
|
||||||
|
Channel assignment:
|
||||||
|
CH1 = left-front CH2 = left-rear
|
||||||
|
CH3 = right-front CH4 = right-rear
|
||||||
|
"""
|
||||||
|
return v_left, v_left, v_right, v_right
|
||||||
|
|
||||||
|
|
||||||
|
# ── Odometry helper ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def odometry_from_track_speeds(
|
||||||
|
v_left: float,
|
||||||
|
v_right: float,
|
||||||
|
track_width: float,
|
||||||
|
slip_factor: float = 0.0,
|
||||||
|
) -> tuple[float, float]:
|
||||||
|
"""
|
||||||
|
Estimate (linear_x, angular_z) from left/right track speeds.
|
||||||
|
|
||||||
|
Applies slip compensation to the angular estimate:
|
||||||
|
angular_z_odom = (v_right - v_left) / B × (1 - slip_factor)
|
||||||
|
|
||||||
|
This is the inverse of tank_speeds for consistent dead-reckoning.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
v_left, v_right : measured track speeds (m/s)
|
||||||
|
track_width : centre-to-centre track separation (m)
|
||||||
|
slip_factor : same value used in the command path
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
(linear_x, angular_z) in m/s and rad/s
|
||||||
|
"""
|
||||||
|
linear_x = (v_left + v_right) / 2.0
|
||||||
|
if track_width > 0.0:
|
||||||
|
slip = _clamp_slip(slip_factor)
|
||||||
|
angular_z = (v_right - v_left) / track_width * (1.0 - slip)
|
||||||
|
else:
|
||||||
|
angular_z = 0.0
|
||||||
|
return linear_x, angular_z
|
||||||
@ -0,0 +1,383 @@
|
|||||||
|
"""
|
||||||
|
tank_driver_node.py — SaltyTank tracked-vehicle ESC driver (Issue #122).
|
||||||
|
|
||||||
|
Hardware
|
||||||
|
────────
|
||||||
|
SaltyTank: tracked robot with left/right independent brushless ESCs.
|
||||||
|
ESCs controlled via PWM (servo-style 1000–2000 µs pulses).
|
||||||
|
Communication: USB CDC serial to STM32 or Raspberry Pi Pico GPIO PWM bridge.
|
||||||
|
|
||||||
|
ESC channel assignments (configurable):
|
||||||
|
CH1 = left-front (or left-track in 2WD/tracked mode)
|
||||||
|
CH2 = left-rear (mirrored in 2WD/tracked mode)
|
||||||
|
CH3 = right-front (or right-track in 2WD/tracked mode)
|
||||||
|
CH4 = right-rear (mirrored in 2WD/tracked mode)
|
||||||
|
|
||||||
|
Drive modes
|
||||||
|
───────────
|
||||||
|
Selectable at runtime via `drive_mode` parameter (default "tracked").
|
||||||
|
|
||||||
|
"2wd" — standard differential drive; CH1+CH2 = left, CH3+CH4 = right.
|
||||||
|
No slip compensation.
|
||||||
|
|
||||||
|
"4wd" — skid-steer; all four ESCs commanded. Left pair same speed,
|
||||||
|
right pair same speed. No slip compensation.
|
||||||
|
|
||||||
|
"tracked" — tank kinematics with slip compensation. Angular command is
|
||||||
|
pre-scaled to achieve the desired yaw rate despite track slip.
|
||||||
|
Odometry angular is de-scaled consistently.
|
||||||
|
|
||||||
|
Switch live: ros2 param set /tank_driver drive_mode tracked
|
||||||
|
|
||||||
|
Serial protocol
|
||||||
|
────────────────
|
||||||
|
Command frame (ASCII, newline-terminated):
|
||||||
|
P<ch1_us>,<ch2_us>,<ch3_us>,<ch4_us>\\n
|
||||||
|
Values: 1000–2000 µs (1500 = neutral/brake)
|
||||||
|
|
||||||
|
Emergency stop frame:
|
||||||
|
E\\n — forces all channels to neutral immediately
|
||||||
|
|
||||||
|
Heartbeat:
|
||||||
|
H\\n every heartbeat_period_s (ESC controller zeros PWM after timeout)
|
||||||
|
|
||||||
|
Emergency stop
|
||||||
|
──────────────
|
||||||
|
Two stop paths:
|
||||||
|
1. Software watchdog — /cmd_vel silence > watchdog_timeout_s → neutral
|
||||||
|
2. E-stop topic — /saltybot/e_stop (std_msgs/Bool, True = stop)
|
||||||
|
|
||||||
|
Both paths send P1500,1500,1500,1500\\n and latch until a new /cmd_vel arrives
|
||||||
|
(e-stop topic) or new commands come in (watchdog).
|
||||||
|
|
||||||
|
Subscribes
|
||||||
|
──────────
|
||||||
|
/cmd_vel (geometry_msgs/Twist) — Nav2-compatible velocity command
|
||||||
|
linear.x = forward (m/s)
|
||||||
|
angular.z = yaw rate (rad/s, +CCW)
|
||||||
|
/saltybot/e_stop (std_msgs/Bool) — True = emergency stop
|
||||||
|
|
||||||
|
Publishes
|
||||||
|
─────────
|
||||||
|
/saltybot/tank_pwm (std_msgs/String JSON) — current PWM values
|
||||||
|
/saltybot/tank_odom (nav_msgs/Odometry) — dead-reckoning odometry
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
──────────
|
||||||
|
serial_port /dev/ttyUSB_tank (or /dev/ttyACM0, etc.)
|
||||||
|
baud_rate 115200
|
||||||
|
drive_mode "tracked" — "2wd" | "4wd" | "tracked"
|
||||||
|
track_width 0.30 — left↔right track centre distance (m)
|
||||||
|
wheelbase 0.32 — front↔rear axle distance (m)
|
||||||
|
max_speed_ms 1.5 — m/s at full ESC deflection
|
||||||
|
max_linear_vel 1.0 — clamp on cmd_vel linear.x (m/s)
|
||||||
|
max_angular_vel 2.5 — clamp on cmd_vel angular.z (rad/s)
|
||||||
|
slip_factor 0.3 — track slip coefficient [0.0, 0.99]
|
||||||
|
pwm_neutral_us 1500
|
||||||
|
pwm_range_us 500
|
||||||
|
watchdog_timeout_s 0.3 — tighter than rover (0.5 s) for safety
|
||||||
|
heartbeat_period_s 0.2
|
||||||
|
control_rate 50.0 — Hz
|
||||||
|
|
||||||
|
Usage
|
||||||
|
─────
|
||||||
|
ros2 launch saltybot_tank_driver tank_driver.launch.py
|
||||||
|
ros2 param set /tank_driver drive_mode 4wd
|
||||||
|
ros2 param set /tank_driver slip_factor 0.1
|
||||||
|
ros2 topic pub /saltybot/e_stop std_msgs/Bool '{data: true}' --once
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import math
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from rclpy.qos import HistoryPolicy, QoSProfile, ReliabilityPolicy
|
||||||
|
|
||||||
|
from geometry_msgs.msg import Twist
|
||||||
|
from nav_msgs.msg import Odometry
|
||||||
|
from std_msgs.msg import Bool, String
|
||||||
|
|
||||||
|
from saltybot_tank_driver.kinematics import (
|
||||||
|
compute_track_speeds,
|
||||||
|
expand_to_4ch,
|
||||||
|
odometry_from_track_speeds,
|
||||||
|
speed_to_pwm,
|
||||||
|
DRIVE_MODES,
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
import serial
|
||||||
|
_SERIAL_OK = True
|
||||||
|
except ImportError:
|
||||||
|
_SERIAL_OK = False
|
||||||
|
|
||||||
|
|
||||||
|
class TankDriverNode(Node):
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__("tank_driver")
|
||||||
|
|
||||||
|
# ── Parameters ───────────────────────────────────────────────────────
|
||||||
|
self.declare_parameter("serial_port", "/dev/ttyUSB_tank")
|
||||||
|
self.declare_parameter("baud_rate", 115200)
|
||||||
|
self.declare_parameter("drive_mode", "tracked")
|
||||||
|
self.declare_parameter("track_width", 0.30)
|
||||||
|
self.declare_parameter("wheelbase", 0.32)
|
||||||
|
self.declare_parameter("max_speed_ms", 1.5)
|
||||||
|
self.declare_parameter("max_linear_vel", 1.0)
|
||||||
|
self.declare_parameter("max_angular_vel", 2.5)
|
||||||
|
self.declare_parameter("slip_factor", 0.3)
|
||||||
|
self.declare_parameter("pwm_neutral_us", 1500)
|
||||||
|
self.declare_parameter("pwm_range_us", 500)
|
||||||
|
self.declare_parameter("watchdog_timeout_s", 0.3)
|
||||||
|
self.declare_parameter("heartbeat_period_s", 0.2)
|
||||||
|
self.declare_parameter("control_rate", 50.0)
|
||||||
|
|
||||||
|
self._p = self._load_params()
|
||||||
|
|
||||||
|
# ── State ────────────────────────────────────────────────────────────
|
||||||
|
self._target_linear_x: float = 0.0
|
||||||
|
self._target_angular_z: float = 0.0
|
||||||
|
self._last_cmd_vel_t: float = 0.0
|
||||||
|
self._last_pwm: tuple = (1500, 1500, 1500, 1500)
|
||||||
|
self._e_stop: bool = False
|
||||||
|
|
||||||
|
self._odom_x: float = 0.0
|
||||||
|
self._odom_y: float = 0.0
|
||||||
|
self._odom_yaw: float = 0.0
|
||||||
|
self._last_odom_t: float = time.monotonic()
|
||||||
|
|
||||||
|
# ── Serial ───────────────────────────────────────────────────────────
|
||||||
|
self._ser = None
|
||||||
|
self._ser_lock = threading.Lock()
|
||||||
|
if _SERIAL_OK:
|
||||||
|
self._open_serial()
|
||||||
|
else:
|
||||||
|
self.get_logger().warn(
|
||||||
|
"pyserial not installed — running in simulation mode (no serial I/O)")
|
||||||
|
|
||||||
|
# ── QoS ──────────────────────────────────────────────────────────────
|
||||||
|
reliable = QoSProfile(
|
||||||
|
reliability=ReliabilityPolicy.RELIABLE,
|
||||||
|
history=HistoryPolicy.KEEP_LAST,
|
||||||
|
depth=10,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Subscriptions ────────────────────────────────────────────────────
|
||||||
|
self.create_subscription(Twist, "/cmd_vel", self._cmd_vel_cb, reliable)
|
||||||
|
self.create_subscription(Bool, "/saltybot/e_stop", self._e_stop_cb, reliable)
|
||||||
|
|
||||||
|
# ── Publishers ───────────────────────────────────────────────────────
|
||||||
|
self._pwm_pub = self.create_publisher(String, "/saltybot/tank_pwm", reliable)
|
||||||
|
self._odom_pub = self.create_publisher(Odometry, "/saltybot/tank_odom", reliable)
|
||||||
|
|
||||||
|
# ── Timers ───────────────────────────────────────────────────────────
|
||||||
|
rate = self._p["control_rate"]
|
||||||
|
self._ctrl_timer = self.create_timer(1.0 / rate, self._control_cb)
|
||||||
|
self._hb_timer = self.create_timer(self._p["hb_period"], self._heartbeat_cb)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"TankDriverNode ready "
|
||||||
|
f"mode={self._p['drive_mode']} "
|
||||||
|
f"track={self._p['track_width']}m "
|
||||||
|
f"slip={self._p['slip_factor']} "
|
||||||
|
f"max_speed={self._p['max_speed_ms']}m/s "
|
||||||
|
f"port={self._p['serial_port']}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Parameter load ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _load_params(self) -> dict:
|
||||||
|
mode = self.get_parameter("drive_mode").value
|
||||||
|
if mode not in DRIVE_MODES:
|
||||||
|
self.get_logger().warn(
|
||||||
|
f"Unknown drive_mode '{mode}', defaulting to 'tracked'")
|
||||||
|
mode = "tracked"
|
||||||
|
return {
|
||||||
|
"serial_port": self.get_parameter("serial_port").value,
|
||||||
|
"baud_rate": self.get_parameter("baud_rate").value,
|
||||||
|
"drive_mode": mode,
|
||||||
|
"track_width": self.get_parameter("track_width").value,
|
||||||
|
"wheelbase": self.get_parameter("wheelbase").value,
|
||||||
|
"max_speed_ms": self.get_parameter("max_speed_ms").value,
|
||||||
|
"max_linear_vel": self.get_parameter("max_linear_vel").value,
|
||||||
|
"max_angular_vel":self.get_parameter("max_angular_vel").value,
|
||||||
|
"slip_factor": float(self.get_parameter("slip_factor").value),
|
||||||
|
"pwm_neutral": int(self.get_parameter("pwm_neutral_us").value),
|
||||||
|
"pwm_range": int(self.get_parameter("pwm_range_us").value),
|
||||||
|
"watchdog_timeout": self.get_parameter("watchdog_timeout_s").value,
|
||||||
|
"hb_period": self.get_parameter("heartbeat_period_s").value,
|
||||||
|
"control_rate": self.get_parameter("control_rate").value,
|
||||||
|
}
|
||||||
|
|
||||||
|
# ── Serial helpers ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _open_serial(self) -> None:
|
||||||
|
with self._ser_lock:
|
||||||
|
try:
|
||||||
|
self._ser = serial.Serial(
|
||||||
|
self._p["serial_port"], self._p["baud_rate"], timeout=0.05)
|
||||||
|
self.get_logger().info(f"Serial open: {self._p['serial_port']}")
|
||||||
|
except Exception as exc:
|
||||||
|
self.get_logger().error(f"Cannot open serial: {exc}")
|
||||||
|
self._ser = None
|
||||||
|
|
||||||
|
def _write(self, data: bytes) -> bool:
|
||||||
|
with self._ser_lock:
|
||||||
|
if self._ser is None or not self._ser.is_open:
|
||||||
|
return False
|
||||||
|
try:
|
||||||
|
self._ser.write(data)
|
||||||
|
return True
|
||||||
|
except Exception as exc:
|
||||||
|
self.get_logger().error(f"Serial write: {exc}",
|
||||||
|
throttle_duration_sec=2.0)
|
||||||
|
self._ser = None
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _send_neutral(self) -> None:
|
||||||
|
self._write(b"P1500,1500,1500,1500\n")
|
||||||
|
self._last_pwm = (1500, 1500, 1500, 1500)
|
||||||
|
|
||||||
|
# ── cmd_vel callback ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _cmd_vel_cb(self, msg: Twist) -> None:
|
||||||
|
p = self._p
|
||||||
|
self._target_linear_x = max(-p["max_linear_vel"],
|
||||||
|
min(p["max_linear_vel"], msg.linear.x))
|
||||||
|
self._target_angular_z = max(-p["max_angular_vel"],
|
||||||
|
min(p["max_angular_vel"], msg.angular.z))
|
||||||
|
self._last_cmd_vel_t = time.monotonic()
|
||||||
|
# A new cmd_vel clears a latched e-stop
|
||||||
|
if self._e_stop:
|
||||||
|
self._e_stop = False
|
||||||
|
self.get_logger().info("E-stop cleared by new cmd_vel")
|
||||||
|
|
||||||
|
# ── E-stop callback ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _e_stop_cb(self, msg: Bool) -> None:
|
||||||
|
if msg.data and not self._e_stop:
|
||||||
|
self._e_stop = True
|
||||||
|
self._send_neutral()
|
||||||
|
self.get_logger().warn("Emergency stop engaged")
|
||||||
|
elif not msg.data and self._e_stop:
|
||||||
|
self._e_stop = False
|
||||||
|
self.get_logger().info("E-stop released via topic")
|
||||||
|
|
||||||
|
# ── 50 Hz control loop ────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _control_cb(self) -> None:
|
||||||
|
self._p = self._load_params()
|
||||||
|
p = self._p
|
||||||
|
now = time.monotonic()
|
||||||
|
|
||||||
|
# E-stop or watchdog
|
||||||
|
cmd_age = (now - self._last_cmd_vel_t) if self._last_cmd_vel_t > 0.0 else 1e9
|
||||||
|
if self._e_stop or cmd_age > p["watchdog_timeout"]:
|
||||||
|
lin_x, ang_z = 0.0, 0.0
|
||||||
|
else:
|
||||||
|
lin_x, ang_z = self._target_linear_x, self._target_angular_z
|
||||||
|
|
||||||
|
# Kinematics → left/right track speeds
|
||||||
|
vl, vr = compute_track_speeds(
|
||||||
|
lin_x, ang_z,
|
||||||
|
track_width=p["track_width"],
|
||||||
|
max_speed_ms=p["max_speed_ms"],
|
||||||
|
drive_mode=p["drive_mode"],
|
||||||
|
slip_factor=p["slip_factor"],
|
||||||
|
)
|
||||||
|
|
||||||
|
# Expand to 4 channels
|
||||||
|
speeds4 = expand_to_4ch(vl, vr) # (vl, vl, vr, vr)
|
||||||
|
|
||||||
|
# Convert to PWM
|
||||||
|
def to_pwm(v):
|
||||||
|
return speed_to_pwm(v, p["max_speed_ms"], p["pwm_neutral"], p["pwm_range"])
|
||||||
|
|
||||||
|
pwm = tuple(to_pwm(v) for v in speeds4)
|
||||||
|
self._last_pwm = pwm
|
||||||
|
|
||||||
|
# Send P command
|
||||||
|
frame = f"P{pwm[0]},{pwm[1]},{pwm[2]},{pwm[3]}\n".encode("ascii")
|
||||||
|
self._write(frame)
|
||||||
|
|
||||||
|
# Publish observability
|
||||||
|
pwm_msg = String()
|
||||||
|
pwm_msg.data = json.dumps({
|
||||||
|
"ch1_us": pwm[0], "ch2_us": pwm[1],
|
||||||
|
"ch3_us": pwm[2], "ch4_us": pwm[3],
|
||||||
|
"drive_mode": p["drive_mode"],
|
||||||
|
"e_stop": self._e_stop,
|
||||||
|
})
|
||||||
|
self._pwm_pub.publish(pwm_msg)
|
||||||
|
|
||||||
|
# Odometry integration (dead reckoning with slip compensation)
|
||||||
|
dt = now - self._last_odom_t
|
||||||
|
self._last_odom_t = now
|
||||||
|
self._integrate_odometry(vl, vr, p["track_width"], p["slip_factor"], dt)
|
||||||
|
self._publish_odom()
|
||||||
|
|
||||||
|
# ── Heartbeat ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _heartbeat_cb(self) -> None:
|
||||||
|
self._write(b"H\n")
|
||||||
|
|
||||||
|
# ── Odometry ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _integrate_odometry(
|
||||||
|
self,
|
||||||
|
v_left: float,
|
||||||
|
v_right: float,
|
||||||
|
track_width: float,
|
||||||
|
slip_factor: float,
|
||||||
|
dt: float,
|
||||||
|
) -> None:
|
||||||
|
"""2D dead-reckoning with track-slip compensation."""
|
||||||
|
lin, ang = odometry_from_track_speeds(v_left, v_right, track_width, slip_factor)
|
||||||
|
self._odom_yaw += ang * dt
|
||||||
|
self._odom_x += lin * math.cos(self._odom_yaw) * dt
|
||||||
|
self._odom_y += lin * math.sin(self._odom_yaw) * dt
|
||||||
|
|
||||||
|
def _publish_odom(self) -> None:
|
||||||
|
yaw = self._odom_yaw
|
||||||
|
msg = Odometry()
|
||||||
|
msg.header.stamp = self.get_clock().now().to_msg()
|
||||||
|
msg.header.frame_id = "odom"
|
||||||
|
msg.child_frame_id = "base_link"
|
||||||
|
|
||||||
|
msg.pose.pose.position.x = self._odom_x
|
||||||
|
msg.pose.pose.position.y = self._odom_y
|
||||||
|
msg.pose.pose.orientation.z = math.sin(yaw / 2.0)
|
||||||
|
msg.pose.pose.orientation.w = math.cos(yaw / 2.0)
|
||||||
|
|
||||||
|
msg.twist.twist.linear.x = self._target_linear_x
|
||||||
|
msg.twist.twist.angular.z = self._target_angular_z
|
||||||
|
|
||||||
|
self._odom_pub.publish(msg)
|
||||||
|
|
||||||
|
# ── Lifecycle ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def destroy_node(self) -> None:
|
||||||
|
self._send_neutral() # safe-state on shutdown
|
||||||
|
super().destroy_node()
|
||||||
|
|
||||||
|
|
||||||
|
# ── Entry point ───────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = TankDriverNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.try_shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
5
jetson/ros2_ws/src/saltybot_tank_driver/setup.cfg
Normal file
5
jetson/ros2_ws/src/saltybot_tank_driver/setup.cfg
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
[develop]
|
||||||
|
script_dir=$base/lib/saltybot_tank_driver
|
||||||
|
|
||||||
|
[install]
|
||||||
|
install_scripts=$base/lib/saltybot_tank_driver
|
||||||
32
jetson/ros2_ws/src/saltybot_tank_driver/setup.py
Normal file
32
jetson/ros2_ws/src/saltybot_tank_driver/setup.py
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
from setuptools import setup, find_packages
|
||||||
|
import os
|
||||||
|
from glob import glob
|
||||||
|
|
||||||
|
package_name = "saltybot_tank_driver"
|
||||||
|
|
||||||
|
setup(
|
||||||
|
name=package_name,
|
||||||
|
version="0.1.0",
|
||||||
|
packages=find_packages(exclude=["test"]),
|
||||||
|
data_files=[
|
||||||
|
("share/ament_index/resource_index/packages",
|
||||||
|
[f"resource/{package_name}"]),
|
||||||
|
(f"share/{package_name}", ["package.xml"]),
|
||||||
|
(os.path.join("share", package_name, "config"),
|
||||||
|
glob("config/*.yaml")),
|
||||||
|
(os.path.join("share", package_name, "launch"),
|
||||||
|
glob("launch/*.py")),
|
||||||
|
],
|
||||||
|
install_requires=["setuptools"],
|
||||||
|
zip_safe=True,
|
||||||
|
maintainer="sl-controls",
|
||||||
|
maintainer_email="sl-controls@saltylab.local",
|
||||||
|
description="SaltyTank tracked-vehicle ESC driver with slip compensation",
|
||||||
|
license="MIT",
|
||||||
|
tests_require=["pytest"],
|
||||||
|
entry_points={
|
||||||
|
"console_scripts": [
|
||||||
|
f"tank_driver_node = {package_name}.tank_driver_node:main",
|
||||||
|
],
|
||||||
|
},
|
||||||
|
)
|
||||||
@ -0,0 +1,347 @@
|
|||||||
|
"""
|
||||||
|
test_tank_kinematics.py — Unit tests for kinematics.py (Issue #122).
|
||||||
|
|
||||||
|
No ROS2 runtime required — pure Python.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import math
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from saltybot_tank_driver.kinematics import (
|
||||||
|
tank_speeds,
|
||||||
|
skid_steer_speeds,
|
||||||
|
speed_to_pwm,
|
||||||
|
compute_track_speeds,
|
||||||
|
expand_to_4ch,
|
||||||
|
odometry_from_track_speeds,
|
||||||
|
_clamp_slip,
|
||||||
|
DRIVE_MODES,
|
||||||
|
SLIP_MIN,
|
||||||
|
SLIP_MAX,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ─── _clamp_slip ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestClampSlip:
|
||||||
|
|
||||||
|
def test_zero_passes_through(self):
|
||||||
|
assert _clamp_slip(0.0) == 0.0
|
||||||
|
|
||||||
|
def test_typical_value(self):
|
||||||
|
assert _clamp_slip(0.3) == pytest.approx(0.3)
|
||||||
|
|
||||||
|
def test_below_min_clamped(self):
|
||||||
|
assert _clamp_slip(-0.1) == SLIP_MIN
|
||||||
|
|
||||||
|
def test_above_max_clamped(self):
|
||||||
|
assert _clamp_slip(1.0) == SLIP_MAX
|
||||||
|
|
||||||
|
def test_at_max_boundary(self):
|
||||||
|
assert _clamp_slip(0.99) == pytest.approx(0.99)
|
||||||
|
|
||||||
|
def test_large_value_clamped(self):
|
||||||
|
assert _clamp_slip(5.0) == SLIP_MAX
|
||||||
|
|
||||||
|
|
||||||
|
# ─── tank_speeds ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestTankSpeeds:
|
||||||
|
|
||||||
|
def test_straight_no_slip(self):
|
||||||
|
vl, vr = tank_speeds(1.0, 0.0, 0.30, slip_factor=0.0)
|
||||||
|
assert vl == pytest.approx(1.0)
|
||||||
|
assert vr == pytest.approx(1.0)
|
||||||
|
|
||||||
|
def test_point_turn_no_slip(self):
|
||||||
|
"""Pure CCW rotation with no slip."""
|
||||||
|
vl, vr = tank_speeds(0.0, 1.0, 0.30, slip_factor=0.0)
|
||||||
|
assert vl == pytest.approx(-0.15)
|
||||||
|
assert vr == pytest.approx(0.15)
|
||||||
|
|
||||||
|
def test_point_turn_cw_no_slip(self):
|
||||||
|
vl, vr = tank_speeds(0.0, -1.0, 0.30, slip_factor=0.0)
|
||||||
|
assert vl == pytest.approx(0.15)
|
||||||
|
assert vr == pytest.approx(-0.15)
|
||||||
|
|
||||||
|
def test_zero_inputs(self):
|
||||||
|
vl, vr = tank_speeds(0.0, 0.0, 0.30, slip_factor=0.0)
|
||||||
|
assert vl == pytest.approx(0.0)
|
||||||
|
assert vr == pytest.approx(0.0)
|
||||||
|
|
||||||
|
def test_slip_increases_angular_command(self):
|
||||||
|
"""With slip>0, angular component is amplified so output diff is larger."""
|
||||||
|
vl0, vr0 = tank_speeds(0.0, 1.0, 0.30, slip_factor=0.0)
|
||||||
|
vl3, vr3 = tank_speeds(0.0, 1.0, 0.30, slip_factor=0.3)
|
||||||
|
# More slip → larger differential to achieve same ω
|
||||||
|
assert abs(vr3 - vl3) > abs(vr0 - vl0)
|
||||||
|
|
||||||
|
def test_slip_zero_equals_skid_steer(self):
|
||||||
|
"""slip_factor=0 must be identical to skid_steer_speeds."""
|
||||||
|
vl_t, vr_t = tank_speeds(0.7, 0.4, 0.30, slip_factor=0.0)
|
||||||
|
vl_s, vr_s = skid_steer_speeds(0.7, 0.4, 0.30)
|
||||||
|
assert vl_t == pytest.approx(vl_s)
|
||||||
|
assert vr_t == pytest.approx(vr_s)
|
||||||
|
|
||||||
|
def test_known_slip_value(self):
|
||||||
|
"""slip=0.5 → angular scaled by 1/(1-0.5) = 2."""
|
||||||
|
vl, vr = tank_speeds(0.0, 1.0, 0.30, slip_factor=0.5)
|
||||||
|
expected_half = (1.0 / (1.0 - 0.5)) * 0.30 / 2.0 # = 0.30
|
||||||
|
assert vl == pytest.approx(-expected_half)
|
||||||
|
assert vr == pytest.approx(expected_half)
|
||||||
|
|
||||||
|
def test_negative_slip_clamped_to_zero(self):
|
||||||
|
"""Negative slip_factor is clamped to 0."""
|
||||||
|
vl_neg, vr_neg = tank_speeds(0.0, 1.0, 0.30, slip_factor=-0.5)
|
||||||
|
vl_0, vr_0 = tank_speeds(0.0, 1.0, 0.30, slip_factor=0.0)
|
||||||
|
assert vl_neg == pytest.approx(vl_0)
|
||||||
|
assert vr_neg == pytest.approx(vr_0)
|
||||||
|
|
||||||
|
def test_symmetry_linear(self):
|
||||||
|
"""Reversing linear_x reverses both speeds equally."""
|
||||||
|
vl_f, vr_f = tank_speeds(1.0, 0.5, 0.30, slip_factor=0.2)
|
||||||
|
vl_r, vr_r = tank_speeds(-1.0, 0.5, 0.30, slip_factor=0.2)
|
||||||
|
assert vl_r == pytest.approx(vl_f - 2.0)
|
||||||
|
assert vr_r == pytest.approx(vr_f - 2.0)
|
||||||
|
|
||||||
|
def test_zero_radius_turn(self):
|
||||||
|
"""lin=0, ang≠0 → equal and opposite speeds (zero-radius pivot)."""
|
||||||
|
vl, vr = tank_speeds(0.0, 1.0, 0.30, slip_factor=0.0)
|
||||||
|
assert vl == pytest.approx(-vr)
|
||||||
|
|
||||||
|
def test_zero_track_width_no_crash(self):
|
||||||
|
vl, vr = tank_speeds(1.0, 2.0, 0.0, slip_factor=0.0)
|
||||||
|
assert vl == pytest.approx(1.0)
|
||||||
|
assert vr == pytest.approx(1.0)
|
||||||
|
|
||||||
|
|
||||||
|
# ─── skid_steer_speeds ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestSkidSteerSpeeds:
|
||||||
|
|
||||||
|
def test_straight(self):
|
||||||
|
vl, vr = skid_steer_speeds(1.0, 0.0, 0.30)
|
||||||
|
assert vl == pytest.approx(1.0)
|
||||||
|
assert vr == pytest.approx(1.0)
|
||||||
|
|
||||||
|
def test_point_turn(self):
|
||||||
|
vl, vr = skid_steer_speeds(0.0, 1.0, 0.30)
|
||||||
|
assert vl == pytest.approx(-0.15)
|
||||||
|
assert vr == pytest.approx(0.15)
|
||||||
|
|
||||||
|
def test_curve(self):
|
||||||
|
vl, vr = skid_steer_speeds(1.0, 1.0, 0.30)
|
||||||
|
assert vl == pytest.approx(1.0 - 0.15)
|
||||||
|
assert vr == pytest.approx(1.0 + 0.15)
|
||||||
|
|
||||||
|
|
||||||
|
# ─── speed_to_pwm ────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestSpeedToPwm:
|
||||||
|
|
||||||
|
def test_zero_speed_neutral(self):
|
||||||
|
assert speed_to_pwm(0.0, 1.5, 1500, 500) == 1500
|
||||||
|
|
||||||
|
def test_full_forward(self):
|
||||||
|
assert speed_to_pwm(1.5, 1.5, 1500, 500) == 2000
|
||||||
|
|
||||||
|
def test_full_reverse(self):
|
||||||
|
assert speed_to_pwm(-1.5, 1.5, 1500, 500) == 1000
|
||||||
|
|
||||||
|
def test_half_forward(self):
|
||||||
|
assert speed_to_pwm(0.75, 1.5, 1500, 500) == 1750
|
||||||
|
|
||||||
|
def test_clamp_over(self):
|
||||||
|
assert speed_to_pwm(10.0, 1.5, 1500, 500) == 2000
|
||||||
|
|
||||||
|
def test_clamp_under(self):
|
||||||
|
assert speed_to_pwm(-10.0, 1.5, 1500, 500) == 1000
|
||||||
|
|
||||||
|
def test_zero_max_returns_neutral(self):
|
||||||
|
assert speed_to_pwm(1.0, 0.0, 1500, 500) == 1500
|
||||||
|
|
||||||
|
def test_returns_int(self):
|
||||||
|
assert isinstance(speed_to_pwm(0.5, 1.5, 1500, 500), int)
|
||||||
|
|
||||||
|
def test_rounding(self):
|
||||||
|
# 1.0/1.5 = 0.6667 → 1500 + 0.6667*500 = 1833.33 → 1833
|
||||||
|
pwm = speed_to_pwm(1.0, 1.5, 1500, 500)
|
||||||
|
assert pwm == 1833
|
||||||
|
|
||||||
|
|
||||||
|
# ─── compute_track_speeds ────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestComputeTrackSpeeds:
|
||||||
|
|
||||||
|
def test_returns_two_values(self):
|
||||||
|
result = compute_track_speeds(1.0, 0.0, 0.30, 1.5)
|
||||||
|
assert len(result) == 2
|
||||||
|
|
||||||
|
def test_straight_tracked(self):
|
||||||
|
vl, vr = compute_track_speeds(1.0, 0.0, 0.30, 1.5, drive_mode="tracked", slip_factor=0.3)
|
||||||
|
assert vl == pytest.approx(1.0)
|
||||||
|
assert vr == pytest.approx(1.0)
|
||||||
|
|
||||||
|
def test_straight_2wd(self):
|
||||||
|
vl, vr = compute_track_speeds(1.0, 0.0, 0.30, 1.5, drive_mode="2wd")
|
||||||
|
assert vl == pytest.approx(1.0)
|
||||||
|
assert vr == pytest.approx(1.0)
|
||||||
|
|
||||||
|
def test_straight_4wd(self):
|
||||||
|
vl, vr = compute_track_speeds(1.0, 0.0, 0.30, 1.5, drive_mode="4wd")
|
||||||
|
assert vl == pytest.approx(1.0)
|
||||||
|
assert vr == pytest.approx(1.0)
|
||||||
|
|
||||||
|
def test_clipping_tracked(self):
|
||||||
|
vl, vr = compute_track_speeds(1.5, 5.0, 0.30, 1.5, drive_mode="tracked", slip_factor=0.3)
|
||||||
|
assert -1.5 <= vl <= 1.5
|
||||||
|
assert -1.5 <= vr <= 1.5
|
||||||
|
|
||||||
|
def test_clipping_2wd(self):
|
||||||
|
vl, vr = compute_track_speeds(1.5, 5.0, 0.30, 1.5, drive_mode="2wd")
|
||||||
|
assert -1.5 <= vl <= 1.5
|
||||||
|
assert -1.5 <= vr <= 1.5
|
||||||
|
|
||||||
|
def test_tracked_differs_from_2wd_on_turn(self):
|
||||||
|
"""With slip>0, tracked mode should produce a different differential."""
|
||||||
|
vl_t, vr_t = compute_track_speeds(0.0, 1.0, 0.30, 1.5,
|
||||||
|
drive_mode="tracked", slip_factor=0.3)
|
||||||
|
vl_2, vr_2 = compute_track_speeds(0.0, 1.0, 0.30, 1.5,
|
||||||
|
drive_mode="2wd", slip_factor=0.3)
|
||||||
|
assert abs(vr_t - vl_t) > abs(vr_2 - vl_2)
|
||||||
|
|
||||||
|
def test_unknown_mode_falls_back(self):
|
||||||
|
"""Unknown drive_mode: falls through to skid_steer (no slip comp)."""
|
||||||
|
vl, vr = compute_track_speeds(1.0, 0.5, 0.30, 1.5, drive_mode="bogus")
|
||||||
|
vl_ref, vr_ref = compute_track_speeds(1.0, 0.5, 0.30, 1.5, drive_mode="2wd")
|
||||||
|
assert vl == pytest.approx(vl_ref)
|
||||||
|
assert vr == pytest.approx(vr_ref)
|
||||||
|
|
||||||
|
def test_zero_pivot_tracked(self):
|
||||||
|
vl, vr = compute_track_speeds(0.0, 1.0, 0.30, 1.5, drive_mode="tracked", slip_factor=0.0)
|
||||||
|
assert vl == pytest.approx(-vr)
|
||||||
|
|
||||||
|
def test_reverse_straight(self):
|
||||||
|
vl, vr = compute_track_speeds(-1.0, 0.0, 0.30, 1.5, drive_mode="tracked")
|
||||||
|
assert vl == pytest.approx(-1.0)
|
||||||
|
assert vr == pytest.approx(-1.0)
|
||||||
|
|
||||||
|
|
||||||
|
# ─── expand_to_4ch ───────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestExpandTo4ch:
|
||||||
|
|
||||||
|
def test_returns_four_values(self):
|
||||||
|
result = expand_to_4ch(1.0, -1.0)
|
||||||
|
assert len(result) == 4
|
||||||
|
|
||||||
|
def test_left_pair(self):
|
||||||
|
vfl, vlr, vrf, vrr = expand_to_4ch(0.5, 1.0)
|
||||||
|
assert vfl == pytest.approx(0.5)
|
||||||
|
assert vlr == pytest.approx(0.5)
|
||||||
|
|
||||||
|
def test_right_pair(self):
|
||||||
|
vfl, vlr, vrf, vrr = expand_to_4ch(0.5, 1.0)
|
||||||
|
assert vrf == pytest.approx(1.0)
|
||||||
|
assert vrr == pytest.approx(1.0)
|
||||||
|
|
||||||
|
def test_zero_inputs(self):
|
||||||
|
assert expand_to_4ch(0.0, 0.0) == (0.0, 0.0, 0.0, 0.0)
|
||||||
|
|
||||||
|
def test_negative(self):
|
||||||
|
vfl, vlr, vrf, vrr = expand_to_4ch(-0.5, -0.5)
|
||||||
|
assert all(v == pytest.approx(-0.5) for v in (vfl, vlr, vrf, vrr))
|
||||||
|
|
||||||
|
|
||||||
|
# ─── odometry_from_track_speeds ──────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestOdometryFromTrackSpeeds:
|
||||||
|
|
||||||
|
def test_straight_no_slip(self):
|
||||||
|
lin, ang = odometry_from_track_speeds(1.0, 1.0, 0.30, slip_factor=0.0)
|
||||||
|
assert lin == pytest.approx(1.0)
|
||||||
|
assert ang == pytest.approx(0.0)
|
||||||
|
|
||||||
|
def test_point_turn_ccw_no_slip(self):
|
||||||
|
vl, vr = -0.15, 0.15
|
||||||
|
lin, ang = odometry_from_track_speeds(vl, vr, 0.30, slip_factor=0.0)
|
||||||
|
assert lin == pytest.approx(0.0)
|
||||||
|
assert ang == pytest.approx(1.0)
|
||||||
|
|
||||||
|
def test_slip_reduces_angular_estimate(self):
|
||||||
|
"""With slip, measured angular is scaled down."""
|
||||||
|
lin0, ang0 = odometry_from_track_speeds(-0.15, 0.15, 0.30, slip_factor=0.0)
|
||||||
|
lin3, ang3 = odometry_from_track_speeds(-0.15, 0.15, 0.30, slip_factor=0.3)
|
||||||
|
assert abs(ang3) < abs(ang0)
|
||||||
|
|
||||||
|
def test_known_slip_value(self):
|
||||||
|
"""slip=0.5 → angular halved."""
|
||||||
|
lin, ang = odometry_from_track_speeds(-0.15, 0.15, 0.30, slip_factor=0.5)
|
||||||
|
# (0.15 - -0.15) / 0.30 * (1 - 0.5) = 1.0 * 0.5
|
||||||
|
assert ang == pytest.approx(0.5)
|
||||||
|
|
||||||
|
def test_zero_track_width_no_crash(self):
|
||||||
|
lin, ang = odometry_from_track_speeds(1.0, 1.0, 0.0, slip_factor=0.0)
|
||||||
|
assert lin == pytest.approx(1.0)
|
||||||
|
assert ang == pytest.approx(0.0)
|
||||||
|
|
||||||
|
def test_zero_inputs(self):
|
||||||
|
lin, ang = odometry_from_track_speeds(0.0, 0.0, 0.30, slip_factor=0.0)
|
||||||
|
assert lin == pytest.approx(0.0)
|
||||||
|
assert ang == pytest.approx(0.0)
|
||||||
|
|
||||||
|
def test_reverse_straight(self):
|
||||||
|
lin, ang = odometry_from_track_speeds(-1.0, -1.0, 0.30, slip_factor=0.0)
|
||||||
|
assert lin == pytest.approx(-1.0)
|
||||||
|
assert ang == pytest.approx(0.0)
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Integration: command ↔ odometry round-trip ──────────────────────────────
|
||||||
|
|
||||||
|
class TestRoundTrip:
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("slip", [0.0, 0.1, 0.3, 0.5])
|
||||||
|
@pytest.mark.parametrize("lin,ang", [
|
||||||
|
(1.0, 0.0),
|
||||||
|
(0.0, 1.0),
|
||||||
|
(0.5, 0.5),
|
||||||
|
(-0.5, 0.3),
|
||||||
|
])
|
||||||
|
def test_roundtrip_tracked(self, lin, ang, slip):
|
||||||
|
"""
|
||||||
|
Commanding (lin, ang) then estimating back via odometry should recover
|
||||||
|
(lin, ang) for all slip values, provided the speeds are unclipped.
|
||||||
|
"""
|
||||||
|
track = 0.30
|
||||||
|
max_speed = 5.0 # generous to avoid clipping in test
|
||||||
|
vl, vr = compute_track_speeds(lin, ang, track, max_speed,
|
||||||
|
drive_mode="tracked", slip_factor=slip)
|
||||||
|
lin_out, ang_out = odometry_from_track_speeds(vl, vr, track, slip_factor=slip)
|
||||||
|
assert lin_out == pytest.approx(lin, abs=1e-9)
|
||||||
|
assert ang_out == pytest.approx(ang, abs=1e-9)
|
||||||
|
|
||||||
|
def test_zero_pivot_pwm_symmetric(self):
|
||||||
|
"""Zero-radius pivot: left and right PWMs are symmetric around neutral."""
|
||||||
|
vl, vr = compute_track_speeds(0.0, 1.0, 0.30, 1.5,
|
||||||
|
drive_mode="tracked", slip_factor=0.0)
|
||||||
|
pwm_l = speed_to_pwm(vl, 1.5, 1500, 500)
|
||||||
|
pwm_r = speed_to_pwm(vr, 1.5, 1500, 500)
|
||||||
|
assert pwm_l + pwm_r == 3000 # symmetric: 1500 - x + 1500 + x
|
||||||
|
|
||||||
|
def test_all_modes_forward_equal(self):
|
||||||
|
"""Straight forward: all drive modes produce the same speeds."""
|
||||||
|
for mode in DRIVE_MODES:
|
||||||
|
vl, vr = compute_track_speeds(1.0, 0.0, 0.30, 1.5, drive_mode=mode)
|
||||||
|
assert vl == pytest.approx(1.0)
|
||||||
|
assert vr == pytest.approx(1.0)
|
||||||
|
|
||||||
|
def test_4ch_expansion_pwm(self):
|
||||||
|
"""4-channel expansion: all channels driven during straight motion."""
|
||||||
|
vl, vr = compute_track_speeds(1.0, 0.0, 0.30, 1.5, drive_mode="4wd")
|
||||||
|
pwms = [speed_to_pwm(v, 1.5, 1500, 500) for v in expand_to_4ch(vl, vr)]
|
||||||
|
assert all(p > 1500 for p in pwms)
|
||||||
|
|
||||||
|
def test_pwm_neutral_when_stopped(self):
|
||||||
|
vl, vr = compute_track_speeds(0.0, 0.0, 0.30, 1.5, drive_mode="tracked")
|
||||||
|
for v in (vl, vr):
|
||||||
|
assert speed_to_pwm(v, 1.5, 1500, 500) == 1500
|
||||||
Loading…
x
Reference in New Issue
Block a user