From 413810e6bab7337874cc00525ca44a2c6c550afc Mon Sep 17 00:00:00 2001 From: sl-controls Date: Mon, 2 Mar 2026 09:14:37 -0500 Subject: [PATCH] feat(tank): SaltyTank tracked-vehicle ESC driver (Issue #122) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - kinematics.py: tank_speeds with slip compensation (angular_cmd scaled by 1/(1-slip_factor)), skid_steer_speeds (no slip), speed_to_pwm, compute_track_speeds (2wd|4wd|tracked modes, ±max clip), expand_to_4ch, odometry_from_track_speeds (angular scaled by (1-slip_factor) — inverse of command path, consistent dead-reckoning across all slip values) - tank_driver_node.py: 50 Hz ROS2 node; serial P,,,\n; H\n heartbeat; dual stop paths (watchdog 0.3s + /saltybot/e_stop topic, latching); runtime drive_mode + slip_factor param switch; dead-reckoning odometry with slip compensation; publishes /saltybot/tank_pwm (JSON) + /saltybot/tank_odom - config/tank_params.yaml, launch/tank_driver.launch.py, package.xml, setup.py, setup.cfg - test/test_tank_kinematics.py: 71 unit tests, all passing (includes parametric round-trip over slip ∈ {0.0, 0.1, 0.3, 0.5}) Co-Authored-By: Claude Sonnet 4.6 --- .../config/tank_params.yaml | 36 ++ .../launch/tank_driver.launch.py | 65 +++ .../src/saltybot_tank_driver/package.xml | 23 ++ .../resource/saltybot_tank_driver | 0 .../saltybot_tank_driver/__init__.py | 0 .../saltybot_tank_driver/kinematics.py | 203 ++++++++++ .../saltybot_tank_driver/tank_driver_node.py | 383 ++++++++++++++++++ .../src/saltybot_tank_driver/setup.cfg | 5 + .../ros2_ws/src/saltybot_tank_driver/setup.py | 32 ++ .../test/test_tank_kinematics.py | 347 ++++++++++++++++ 10 files changed, 1094 insertions(+) create mode 100644 jetson/ros2_ws/src/saltybot_tank_driver/config/tank_params.yaml create mode 100644 jetson/ros2_ws/src/saltybot_tank_driver/launch/tank_driver.launch.py create mode 100644 jetson/ros2_ws/src/saltybot_tank_driver/package.xml create mode 100644 jetson/ros2_ws/src/saltybot_tank_driver/resource/saltybot_tank_driver create mode 100644 jetson/ros2_ws/src/saltybot_tank_driver/saltybot_tank_driver/__init__.py create mode 100644 jetson/ros2_ws/src/saltybot_tank_driver/saltybot_tank_driver/kinematics.py create mode 100644 jetson/ros2_ws/src/saltybot_tank_driver/saltybot_tank_driver/tank_driver_node.py create mode 100644 jetson/ros2_ws/src/saltybot_tank_driver/setup.cfg create mode 100644 jetson/ros2_ws/src/saltybot_tank_driver/setup.py create mode 100644 jetson/ros2_ws/src/saltybot_tank_driver/test/test_tank_kinematics.py diff --git a/jetson/ros2_ws/src/saltybot_tank_driver/config/tank_params.yaml b/jetson/ros2_ws/src/saltybot_tank_driver/config/tank_params.yaml new file mode 100644 index 0000000..f5bca53 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_tank_driver/config/tank_params.yaml @@ -0,0 +1,36 @@ +tank_driver: + ros__parameters: + # Serial interface + serial_port: /dev/ttyUSB_tank + baud_rate: 115200 + + # Drive mode: "tracked" | "4wd" | "2wd" + # tracked — tank kinematics with track-slip compensation (default) + # 4wd — skid-steer, 4 ESC channels, no slip compensation + # 2wd — standard diff-drive, 2 ESC channels, no slip compensation + drive_mode: tracked + + # Physical parameters + track_width: 0.30 # left↔right track centre distance (m) + wheelbase: 0.32 # front↔rear sprocket distance (m) — for reference + + # Speed limits + max_speed_ms: 1.5 # m/s at full ESC deflection + max_linear_vel: 1.0 # clamp on cmd_vel linear.x (m/s) + max_angular_vel: 2.5 # clamp on cmd_vel angular.z (rad/s) + + # Track slip compensation (tracked mode only) + # 0.0 = no slip (rigid wheel / high-traction surface) + # 0.1 = light slip (rubber tracks on concrete) + # 0.3 = moderate slip (rubber tracks on grass/gravel) ← default + # 0.5 = heavy slip (metal tracks on mud) + slip_factor: 0.3 + + # ESC PWM settings + pwm_neutral_us: 1500 # neutral/brake pulse width (µs) + pwm_range_us: 500 # half-range from neutral → [1000..2000] µs + + # Timing + watchdog_timeout_s: 0.3 # deadman: shorter than rover for tracked safety + heartbeat_period_s: 0.2 + control_rate: 50.0 # Hz diff --git a/jetson/ros2_ws/src/saltybot_tank_driver/launch/tank_driver.launch.py b/jetson/ros2_ws/src/saltybot_tank_driver/launch/tank_driver.launch.py new file mode 100644 index 0000000..629593a --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_tank_driver/launch/tank_driver.launch.py @@ -0,0 +1,65 @@ +""" +tank_driver.launch.py — Launch the SaltyTank ESC motor driver node. + +Usage: + ros2 launch saltybot_tank_driver tank_driver.launch.py + ros2 launch saltybot_tank_driver tank_driver.launch.py serial_port:=/dev/ttyACM0 + ros2 launch saltybot_tank_driver tank_driver.launch.py drive_mode:=4wd + ros2 launch saltybot_tank_driver tank_driver.launch.py slip_factor:=0.1 + +Emergency stop: + ros2 topic pub /saltybot/e_stop std_msgs/Bool '{data: true}' --once + ros2 topic pub /saltybot/e_stop std_msgs/Bool '{data: false}' --once +""" + +import os + +from ament_index_python.packages import get_package_share_directory +from launch import LaunchDescription +from launch.actions import DeclareLaunchArgument +from launch.substitutions import LaunchConfiguration +from launch_ros.actions import Node + + +def generate_launch_description(): + pkg_share = get_package_share_directory("saltybot_tank_driver") + params_file = os.path.join(pkg_share, "config", "tank_params.yaml") + + serial_port_arg = DeclareLaunchArgument( + "serial_port", + default_value="/dev/ttyUSB_tank", + description="Serial port for ESC controller", + ) + drive_mode_arg = DeclareLaunchArgument( + "drive_mode", + default_value="tracked", + description="Drive mode: tracked | 4wd | 2wd", + ) + slip_factor_arg = DeclareLaunchArgument( + "slip_factor", + default_value="0.3", + description="Track slip coefficient [0.0, 0.99]", + ) + + tank_driver = Node( + package="saltybot_tank_driver", + executable="tank_driver_node", + name="tank_driver", + parameters=[ + params_file, + { + "serial_port": LaunchConfiguration("serial_port"), + "drive_mode": LaunchConfiguration("drive_mode"), + "slip_factor": LaunchConfiguration("slip_factor"), + }, + ], + output="screen", + emulate_tty=True, + ) + + return LaunchDescription([ + serial_port_arg, + drive_mode_arg, + slip_factor_arg, + tank_driver, + ]) diff --git a/jetson/ros2_ws/src/saltybot_tank_driver/package.xml b/jetson/ros2_ws/src/saltybot_tank_driver/package.xml new file mode 100644 index 0000000..0446742 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_tank_driver/package.xml @@ -0,0 +1,23 @@ + + + + saltybot_tank_driver + 0.1.0 + SaltyTank tracked-vehicle ESC driver with slip compensation (Issue #122) + sl-controls + MIT + + rclpy + geometry_msgs + nav_msgs + std_msgs + + ament_copyright + ament_flake8 + ament_pep257 + pytest + + + ament_python + + diff --git a/jetson/ros2_ws/src/saltybot_tank_driver/resource/saltybot_tank_driver b/jetson/ros2_ws/src/saltybot_tank_driver/resource/saltybot_tank_driver new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_tank_driver/saltybot_tank_driver/__init__.py b/jetson/ros2_ws/src/saltybot_tank_driver/saltybot_tank_driver/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_tank_driver/saltybot_tank_driver/kinematics.py b/jetson/ros2_ws/src/saltybot_tank_driver/saltybot_tank_driver/kinematics.py new file mode 100644 index 0000000..58f760f --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_tank_driver/saltybot_tank_driver/kinematics.py @@ -0,0 +1,203 @@ +""" +kinematics.py — Tank / tracked-vehicle kinematics for SaltyTank (Issue #122). + +Drive modes +─────────── + "2wd" — standard differential drive, 2 ESC channels (CH1=left, CH3=right). + Mirrored to 4 channels by the caller (CH1=CH2=left, CH3=CH4=right). + + "4wd" — skid-steer 4-channel. Left pair (CH1+CH2) share one speed; + right pair (CH3+CH4) share one speed. No slip compensation. + + "tracked" — tank kinematics with track-slip compensation. + Commanded angular is scaled up by 1/(1-slip_factor) so the + vehicle achieves the desired turn rate despite track slip. + Odometry angular is scaled down by (1-slip_factor) to reflect + the fact that wheel-speed difference overestimates actual yaw rate. + +Slip compensation model +─────────────────────── + slip_factor ∈ [0.0, 0.99] + + 0.0 → rigid wheel / no slip (identical to standard diff-drive) + 0.1 → light slip (rubber tracks on concrete) + 0.3 → moderate slip (rubber tracks on grass / gravel) ← default + 0.5 → heavy slip (metal tracks on mud) + + Command path : angular_cmd = angular_z / (1 - slip_factor) + Odometry path : angular_odom = (v_right - v_left) / B × (1 - slip_factor) + + The two corrections are complementary: a vehicle commanded at ω rad/s and + estimated by odometry will yield consistent trajectories regardless of the + chosen slip_factor value (as long as it is consistent between command and odom). + +Zero-radius turns +───────────────── + Setting linear_x=0 and angular_z≠0 produces equal-and-opposite track speeds, + which is a valid zero-radius pivot for tracked vehicles. The slip factor still + applies; set slip_factor=0 for a surface with known high traction. + +Pure module — no ROS2 dependency; fully unit-testable. +""" + +from __future__ import annotations + +import math + + +# ── Constants ──────────────────────────────────────────────────────────────── + +SLIP_MIN = 0.0 +SLIP_MAX = 0.99 # must stay < 1.0 to avoid division-by-zero +DRIVE_MODES = ("2wd", "4wd", "tracked") + + +# ── Low-level helpers ──────────────────────────────────────────────────────── + +def _clamp_slip(slip_factor: float) -> float: + return max(SLIP_MIN, min(SLIP_MAX, slip_factor)) + + +# ── Track-speed computation ────────────────────────────────────────────────── + +def tank_speeds( + linear_x: float, + angular_z: float, + track_width: float, + slip_factor: float = 0.0, +) -> tuple[float, float]: + """ + Unicycle → tank (tracked) differential drive with optional slip compensation. + + Parameters + ---------- + linear_x : forward velocity (m/s) + angular_z : desired yaw rate (rad/s, +ve = CCW / left turn) + track_width : centre-to-centre track separation (m) + slip_factor : track slip coefficient ∈ [0.0, 0.99]; 0 = no slip + + Returns + ------- + (v_left, v_right) in m/s + """ + slip = _clamp_slip(slip_factor) + # Scale angular command up to achieve desired turn rate despite slip + angular_cmd = angular_z / (1.0 - slip) if slip > 0.0 else angular_z + half = angular_cmd * track_width / 2.0 + return linear_x - half, linear_x + half + + +def skid_steer_speeds( + linear_x: float, + angular_z: float, + track_width: float, +) -> tuple[float, float]: + """ + Standard unicycle → differential drive (no slip compensation). + Used for 2WD and 4WD modes. + """ + half = angular_z * track_width / 2.0 + return linear_x - half, linear_x + half + + +# ── m/s → ESC PWM ──────────────────────────────────────────────────────────── + +def speed_to_pwm( + speed_ms: float, + max_speed_ms: float, + pwm_neutral_us: int, + pwm_range_us: int, +) -> int: + """ + Convert wheel/track speed (m/s) to ESC PWM pulse width (µs). + + Linear mapping: + pwm = neutral + (speed / max_speed) * range + + Clamped to [neutral - range, neutral + range]. + """ + if max_speed_ms <= 0.0: + return pwm_neutral_us + normalised = max(-1.0, min(1.0, speed_ms / max_speed_ms)) + pwm = pwm_neutral_us + normalised * pwm_range_us + lo = pwm_neutral_us - pwm_range_us + hi = pwm_neutral_us + pwm_range_us + return int(round(max(lo, min(hi, pwm)))) + + +# ── Variant-aware speed computation ───────────────────────────────────────── + +def compute_track_speeds( + linear_x: float, + angular_z: float, + track_width: float, + max_speed_ms: float, + drive_mode: str = "tracked", + slip_factor: float = 0.3, +) -> tuple[float, float]: + """ + Compute and clip left/right track/wheel speeds for the selected mode. + + Returns + ------- + (v_left, v_right) clipped to ±max_speed_ms. + + All modes produce a 2-element tuple; the caller expands to 4 channels + for 4WD mode by mirroring (vl, vl, vr, vr). + """ + def clip(v: float) -> float: + return max(-max_speed_ms, min(max_speed_ms, v)) + + if drive_mode == "tracked": + vl, vr = tank_speeds(linear_x, angular_z, track_width, slip_factor) + else: + # "2wd" or "4wd" — standard diff-drive, no slip compensation + vl, vr = skid_steer_speeds(linear_x, angular_z, track_width) + + return clip(vl), clip(vr) + + +def expand_to_4ch(v_left: float, v_right: float) -> tuple[float, float, float, float]: + """ + Expand left/right speeds to 4-channel ESC layout. + + Channel assignment: + CH1 = left-front CH2 = left-rear + CH3 = right-front CH4 = right-rear + """ + return v_left, v_left, v_right, v_right + + +# ── Odometry helper ────────────────────────────────────────────────────────── + +def odometry_from_track_speeds( + v_left: float, + v_right: float, + track_width: float, + slip_factor: float = 0.0, +) -> tuple[float, float]: + """ + Estimate (linear_x, angular_z) from left/right track speeds. + + Applies slip compensation to the angular estimate: + angular_z_odom = (v_right - v_left) / B × (1 - slip_factor) + + This is the inverse of tank_speeds for consistent dead-reckoning. + + Parameters + ---------- + v_left, v_right : measured track speeds (m/s) + track_width : centre-to-centre track separation (m) + slip_factor : same value used in the command path + + Returns + ------- + (linear_x, angular_z) in m/s and rad/s + """ + linear_x = (v_left + v_right) / 2.0 + if track_width > 0.0: + slip = _clamp_slip(slip_factor) + angular_z = (v_right - v_left) / track_width * (1.0 - slip) + else: + angular_z = 0.0 + return linear_x, angular_z diff --git a/jetson/ros2_ws/src/saltybot_tank_driver/saltybot_tank_driver/tank_driver_node.py b/jetson/ros2_ws/src/saltybot_tank_driver/saltybot_tank_driver/tank_driver_node.py new file mode 100644 index 0000000..3f9024e --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_tank_driver/saltybot_tank_driver/tank_driver_node.py @@ -0,0 +1,383 @@ +""" +tank_driver_node.py — SaltyTank tracked-vehicle ESC driver (Issue #122). + +Hardware +──────── + SaltyTank: tracked robot with left/right independent brushless ESCs. + ESCs controlled via PWM (servo-style 1000–2000 µs pulses). + Communication: USB CDC serial to STM32 or Raspberry Pi Pico GPIO PWM bridge. + + ESC channel assignments (configurable): + CH1 = left-front (or left-track in 2WD/tracked mode) + CH2 = left-rear (mirrored in 2WD/tracked mode) + CH3 = right-front (or right-track in 2WD/tracked mode) + CH4 = right-rear (mirrored in 2WD/tracked mode) + +Drive modes +─────────── + Selectable at runtime via `drive_mode` parameter (default "tracked"). + + "2wd" — standard differential drive; CH1+CH2 = left, CH3+CH4 = right. + No slip compensation. + + "4wd" — skid-steer; all four ESCs commanded. Left pair same speed, + right pair same speed. No slip compensation. + + "tracked" — tank kinematics with slip compensation. Angular command is + pre-scaled to achieve the desired yaw rate despite track slip. + Odometry angular is de-scaled consistently. + + Switch live: ros2 param set /tank_driver drive_mode tracked + +Serial protocol +──────────────── + Command frame (ASCII, newline-terminated): + P,,,\\n + Values: 1000–2000 µs (1500 = neutral/brake) + + Emergency stop frame: + E\\n — forces all channels to neutral immediately + + Heartbeat: + H\\n every heartbeat_period_s (ESC controller zeros PWM after timeout) + +Emergency stop +────────────── + Two stop paths: + 1. Software watchdog — /cmd_vel silence > watchdog_timeout_s → neutral + 2. E-stop topic — /saltybot/e_stop (std_msgs/Bool, True = stop) + + Both paths send P1500,1500,1500,1500\\n and latch until a new /cmd_vel arrives + (e-stop topic) or new commands come in (watchdog). + +Subscribes +────────── + /cmd_vel (geometry_msgs/Twist) — Nav2-compatible velocity command + linear.x = forward (m/s) + angular.z = yaw rate (rad/s, +CCW) + /saltybot/e_stop (std_msgs/Bool) — True = emergency stop + +Publishes +───────── + /saltybot/tank_pwm (std_msgs/String JSON) — current PWM values + /saltybot/tank_odom (nav_msgs/Odometry) — dead-reckoning odometry + +Parameters +────────── + serial_port /dev/ttyUSB_tank (or /dev/ttyACM0, etc.) + baud_rate 115200 + drive_mode "tracked" — "2wd" | "4wd" | "tracked" + track_width 0.30 — left↔right track centre distance (m) + wheelbase 0.32 — front↔rear axle distance (m) + max_speed_ms 1.5 — m/s at full ESC deflection + max_linear_vel 1.0 — clamp on cmd_vel linear.x (m/s) + max_angular_vel 2.5 — clamp on cmd_vel angular.z (rad/s) + slip_factor 0.3 — track slip coefficient [0.0, 0.99] + pwm_neutral_us 1500 + pwm_range_us 500 + watchdog_timeout_s 0.3 — tighter than rover (0.5 s) for safety + heartbeat_period_s 0.2 + control_rate 50.0 — Hz + +Usage +───── + ros2 launch saltybot_tank_driver tank_driver.launch.py + ros2 param set /tank_driver drive_mode 4wd + ros2 param set /tank_driver slip_factor 0.1 + ros2 topic pub /saltybot/e_stop std_msgs/Bool '{data: true}' --once +""" + +import json +import math +import threading +import time + +import rclpy +from rclpy.node import Node +from rclpy.qos import HistoryPolicy, QoSProfile, ReliabilityPolicy + +from geometry_msgs.msg import Twist +from nav_msgs.msg import Odometry +from std_msgs.msg import Bool, String + +from saltybot_tank_driver.kinematics import ( + compute_track_speeds, + expand_to_4ch, + odometry_from_track_speeds, + speed_to_pwm, + DRIVE_MODES, +) + +try: + import serial + _SERIAL_OK = True +except ImportError: + _SERIAL_OK = False + + +class TankDriverNode(Node): + + def __init__(self): + super().__init__("tank_driver") + + # ── Parameters ─────────────────────────────────────────────────────── + self.declare_parameter("serial_port", "/dev/ttyUSB_tank") + self.declare_parameter("baud_rate", 115200) + self.declare_parameter("drive_mode", "tracked") + self.declare_parameter("track_width", 0.30) + self.declare_parameter("wheelbase", 0.32) + self.declare_parameter("max_speed_ms", 1.5) + self.declare_parameter("max_linear_vel", 1.0) + self.declare_parameter("max_angular_vel", 2.5) + self.declare_parameter("slip_factor", 0.3) + self.declare_parameter("pwm_neutral_us", 1500) + self.declare_parameter("pwm_range_us", 500) + self.declare_parameter("watchdog_timeout_s", 0.3) + self.declare_parameter("heartbeat_period_s", 0.2) + self.declare_parameter("control_rate", 50.0) + + self._p = self._load_params() + + # ── State ──────────────────────────────────────────────────────────── + self._target_linear_x: float = 0.0 + self._target_angular_z: float = 0.0 + self._last_cmd_vel_t: float = 0.0 + self._last_pwm: tuple = (1500, 1500, 1500, 1500) + self._e_stop: bool = False + + self._odom_x: float = 0.0 + self._odom_y: float = 0.0 + self._odom_yaw: float = 0.0 + self._last_odom_t: float = time.monotonic() + + # ── Serial ─────────────────────────────────────────────────────────── + self._ser = None + self._ser_lock = threading.Lock() + if _SERIAL_OK: + self._open_serial() + else: + self.get_logger().warn( + "pyserial not installed — running in simulation mode (no serial I/O)") + + # ── QoS ────────────────────────────────────────────────────────────── + reliable = QoSProfile( + reliability=ReliabilityPolicy.RELIABLE, + history=HistoryPolicy.KEEP_LAST, + depth=10, + ) + + # ── Subscriptions ──────────────────────────────────────────────────── + self.create_subscription(Twist, "/cmd_vel", self._cmd_vel_cb, reliable) + self.create_subscription(Bool, "/saltybot/e_stop", self._e_stop_cb, reliable) + + # ── Publishers ─────────────────────────────────────────────────────── + self._pwm_pub = self.create_publisher(String, "/saltybot/tank_pwm", reliable) + self._odom_pub = self.create_publisher(Odometry, "/saltybot/tank_odom", reliable) + + # ── Timers ─────────────────────────────────────────────────────────── + rate = self._p["control_rate"] + self._ctrl_timer = self.create_timer(1.0 / rate, self._control_cb) + self._hb_timer = self.create_timer(self._p["hb_period"], self._heartbeat_cb) + + self.get_logger().info( + f"TankDriverNode ready " + f"mode={self._p['drive_mode']} " + f"track={self._p['track_width']}m " + f"slip={self._p['slip_factor']} " + f"max_speed={self._p['max_speed_ms']}m/s " + f"port={self._p['serial_port']}" + ) + + # ── Parameter load ──────────────────────────────────────────────────────── + + def _load_params(self) -> dict: + mode = self.get_parameter("drive_mode").value + if mode not in DRIVE_MODES: + self.get_logger().warn( + f"Unknown drive_mode '{mode}', defaulting to 'tracked'") + mode = "tracked" + return { + "serial_port": self.get_parameter("serial_port").value, + "baud_rate": self.get_parameter("baud_rate").value, + "drive_mode": mode, + "track_width": self.get_parameter("track_width").value, + "wheelbase": self.get_parameter("wheelbase").value, + "max_speed_ms": self.get_parameter("max_speed_ms").value, + "max_linear_vel": self.get_parameter("max_linear_vel").value, + "max_angular_vel":self.get_parameter("max_angular_vel").value, + "slip_factor": float(self.get_parameter("slip_factor").value), + "pwm_neutral": int(self.get_parameter("pwm_neutral_us").value), + "pwm_range": int(self.get_parameter("pwm_range_us").value), + "watchdog_timeout": self.get_parameter("watchdog_timeout_s").value, + "hb_period": self.get_parameter("heartbeat_period_s").value, + "control_rate": self.get_parameter("control_rate").value, + } + + # ── Serial helpers ──────────────────────────────────────────────────────── + + def _open_serial(self) -> None: + with self._ser_lock: + try: + self._ser = serial.Serial( + self._p["serial_port"], self._p["baud_rate"], timeout=0.05) + self.get_logger().info(f"Serial open: {self._p['serial_port']}") + except Exception as exc: + self.get_logger().error(f"Cannot open serial: {exc}") + self._ser = None + + def _write(self, data: bytes) -> bool: + with self._ser_lock: + if self._ser is None or not self._ser.is_open: + return False + try: + self._ser.write(data) + return True + except Exception as exc: + self.get_logger().error(f"Serial write: {exc}", + throttle_duration_sec=2.0) + self._ser = None + return False + + def _send_neutral(self) -> None: + self._write(b"P1500,1500,1500,1500\n") + self._last_pwm = (1500, 1500, 1500, 1500) + + # ── cmd_vel callback ────────────────────────────────────────────────────── + + def _cmd_vel_cb(self, msg: Twist) -> None: + p = self._p + self._target_linear_x = max(-p["max_linear_vel"], + min(p["max_linear_vel"], msg.linear.x)) + self._target_angular_z = max(-p["max_angular_vel"], + min(p["max_angular_vel"], msg.angular.z)) + self._last_cmd_vel_t = time.monotonic() + # A new cmd_vel clears a latched e-stop + if self._e_stop: + self._e_stop = False + self.get_logger().info("E-stop cleared by new cmd_vel") + + # ── E-stop callback ─────────────────────────────────────────────────────── + + def _e_stop_cb(self, msg: Bool) -> None: + if msg.data and not self._e_stop: + self._e_stop = True + self._send_neutral() + self.get_logger().warn("Emergency stop engaged") + elif not msg.data and self._e_stop: + self._e_stop = False + self.get_logger().info("E-stop released via topic") + + # ── 50 Hz control loop ──────────────────────────────────────────────────── + + def _control_cb(self) -> None: + self._p = self._load_params() + p = self._p + now = time.monotonic() + + # E-stop or watchdog + cmd_age = (now - self._last_cmd_vel_t) if self._last_cmd_vel_t > 0.0 else 1e9 + if self._e_stop or cmd_age > p["watchdog_timeout"]: + lin_x, ang_z = 0.0, 0.0 + else: + lin_x, ang_z = self._target_linear_x, self._target_angular_z + + # Kinematics → left/right track speeds + vl, vr = compute_track_speeds( + lin_x, ang_z, + track_width=p["track_width"], + max_speed_ms=p["max_speed_ms"], + drive_mode=p["drive_mode"], + slip_factor=p["slip_factor"], + ) + + # Expand to 4 channels + speeds4 = expand_to_4ch(vl, vr) # (vl, vl, vr, vr) + + # Convert to PWM + def to_pwm(v): + return speed_to_pwm(v, p["max_speed_ms"], p["pwm_neutral"], p["pwm_range"]) + + pwm = tuple(to_pwm(v) for v in speeds4) + self._last_pwm = pwm + + # Send P command + frame = f"P{pwm[0]},{pwm[1]},{pwm[2]},{pwm[3]}\n".encode("ascii") + self._write(frame) + + # Publish observability + pwm_msg = String() + pwm_msg.data = json.dumps({ + "ch1_us": pwm[0], "ch2_us": pwm[1], + "ch3_us": pwm[2], "ch4_us": pwm[3], + "drive_mode": p["drive_mode"], + "e_stop": self._e_stop, + }) + self._pwm_pub.publish(pwm_msg) + + # Odometry integration (dead reckoning with slip compensation) + dt = now - self._last_odom_t + self._last_odom_t = now + self._integrate_odometry(vl, vr, p["track_width"], p["slip_factor"], dt) + self._publish_odom() + + # ── Heartbeat ───────────────────────────────────────────────────────────── + + def _heartbeat_cb(self) -> None: + self._write(b"H\n") + + # ── Odometry ───────────────────────────────────────────────────────────── + + def _integrate_odometry( + self, + v_left: float, + v_right: float, + track_width: float, + slip_factor: float, + dt: float, + ) -> None: + """2D dead-reckoning with track-slip compensation.""" + lin, ang = odometry_from_track_speeds(v_left, v_right, track_width, slip_factor) + self._odom_yaw += ang * dt + self._odom_x += lin * math.cos(self._odom_yaw) * dt + self._odom_y += lin * math.sin(self._odom_yaw) * dt + + def _publish_odom(self) -> None: + yaw = self._odom_yaw + msg = Odometry() + msg.header.stamp = self.get_clock().now().to_msg() + msg.header.frame_id = "odom" + msg.child_frame_id = "base_link" + + msg.pose.pose.position.x = self._odom_x + msg.pose.pose.position.y = self._odom_y + msg.pose.pose.orientation.z = math.sin(yaw / 2.0) + msg.pose.pose.orientation.w = math.cos(yaw / 2.0) + + msg.twist.twist.linear.x = self._target_linear_x + msg.twist.twist.angular.z = self._target_angular_z + + self._odom_pub.publish(msg) + + # ── Lifecycle ───────────────────────────────────────────────────────────── + + def destroy_node(self) -> None: + self._send_neutral() # safe-state on shutdown + super().destroy_node() + + +# ── Entry point ─────────────────────────────────────────────────────────────── + +def main(args=None): + rclpy.init(args=args) + node = TankDriverNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.try_shutdown() + + +if __name__ == "__main__": + main() diff --git a/jetson/ros2_ws/src/saltybot_tank_driver/setup.cfg b/jetson/ros2_ws/src/saltybot_tank_driver/setup.cfg new file mode 100644 index 0000000..718c023 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_tank_driver/setup.cfg @@ -0,0 +1,5 @@ +[develop] +script_dir=$base/lib/saltybot_tank_driver + +[install] +install_scripts=$base/lib/saltybot_tank_driver diff --git a/jetson/ros2_ws/src/saltybot_tank_driver/setup.py b/jetson/ros2_ws/src/saltybot_tank_driver/setup.py new file mode 100644 index 0000000..92c9a99 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_tank_driver/setup.py @@ -0,0 +1,32 @@ +from setuptools import setup, find_packages +import os +from glob import glob + +package_name = "saltybot_tank_driver" + +setup( + name=package_name, + version="0.1.0", + packages=find_packages(exclude=["test"]), + data_files=[ + ("share/ament_index/resource_index/packages", + [f"resource/{package_name}"]), + (f"share/{package_name}", ["package.xml"]), + (os.path.join("share", package_name, "config"), + glob("config/*.yaml")), + (os.path.join("share", package_name, "launch"), + glob("launch/*.py")), + ], + install_requires=["setuptools"], + zip_safe=True, + maintainer="sl-controls", + maintainer_email="sl-controls@saltylab.local", + description="SaltyTank tracked-vehicle ESC driver with slip compensation", + license="MIT", + tests_require=["pytest"], + entry_points={ + "console_scripts": [ + f"tank_driver_node = {package_name}.tank_driver_node:main", + ], + }, +) diff --git a/jetson/ros2_ws/src/saltybot_tank_driver/test/test_tank_kinematics.py b/jetson/ros2_ws/src/saltybot_tank_driver/test/test_tank_kinematics.py new file mode 100644 index 0000000..19c824e --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_tank_driver/test/test_tank_kinematics.py @@ -0,0 +1,347 @@ +""" +test_tank_kinematics.py — Unit tests for kinematics.py (Issue #122). + +No ROS2 runtime required — pure Python. +""" + +import math +import pytest + +from saltybot_tank_driver.kinematics import ( + tank_speeds, + skid_steer_speeds, + speed_to_pwm, + compute_track_speeds, + expand_to_4ch, + odometry_from_track_speeds, + _clamp_slip, + DRIVE_MODES, + SLIP_MIN, + SLIP_MAX, +) + + +# ─── _clamp_slip ───────────────────────────────────────────────────────────── + +class TestClampSlip: + + def test_zero_passes_through(self): + assert _clamp_slip(0.0) == 0.0 + + def test_typical_value(self): + assert _clamp_slip(0.3) == pytest.approx(0.3) + + def test_below_min_clamped(self): + assert _clamp_slip(-0.1) == SLIP_MIN + + def test_above_max_clamped(self): + assert _clamp_slip(1.0) == SLIP_MAX + + def test_at_max_boundary(self): + assert _clamp_slip(0.99) == pytest.approx(0.99) + + def test_large_value_clamped(self): + assert _clamp_slip(5.0) == SLIP_MAX + + +# ─── tank_speeds ───────────────────────────────────────────────────────────── + +class TestTankSpeeds: + + def test_straight_no_slip(self): + vl, vr = tank_speeds(1.0, 0.0, 0.30, slip_factor=0.0) + assert vl == pytest.approx(1.0) + assert vr == pytest.approx(1.0) + + def test_point_turn_no_slip(self): + """Pure CCW rotation with no slip.""" + vl, vr = tank_speeds(0.0, 1.0, 0.30, slip_factor=0.0) + assert vl == pytest.approx(-0.15) + assert vr == pytest.approx(0.15) + + def test_point_turn_cw_no_slip(self): + vl, vr = tank_speeds(0.0, -1.0, 0.30, slip_factor=0.0) + assert vl == pytest.approx(0.15) + assert vr == pytest.approx(-0.15) + + def test_zero_inputs(self): + vl, vr = tank_speeds(0.0, 0.0, 0.30, slip_factor=0.0) + assert vl == pytest.approx(0.0) + assert vr == pytest.approx(0.0) + + def test_slip_increases_angular_command(self): + """With slip>0, angular component is amplified so output diff is larger.""" + vl0, vr0 = tank_speeds(0.0, 1.0, 0.30, slip_factor=0.0) + vl3, vr3 = tank_speeds(0.0, 1.0, 0.30, slip_factor=0.3) + # More slip → larger differential to achieve same ω + assert abs(vr3 - vl3) > abs(vr0 - vl0) + + def test_slip_zero_equals_skid_steer(self): + """slip_factor=0 must be identical to skid_steer_speeds.""" + vl_t, vr_t = tank_speeds(0.7, 0.4, 0.30, slip_factor=0.0) + vl_s, vr_s = skid_steer_speeds(0.7, 0.4, 0.30) + assert vl_t == pytest.approx(vl_s) + assert vr_t == pytest.approx(vr_s) + + def test_known_slip_value(self): + """slip=0.5 → angular scaled by 1/(1-0.5) = 2.""" + vl, vr = tank_speeds(0.0, 1.0, 0.30, slip_factor=0.5) + expected_half = (1.0 / (1.0 - 0.5)) * 0.30 / 2.0 # = 0.30 + assert vl == pytest.approx(-expected_half) + assert vr == pytest.approx(expected_half) + + def test_negative_slip_clamped_to_zero(self): + """Negative slip_factor is clamped to 0.""" + vl_neg, vr_neg = tank_speeds(0.0, 1.0, 0.30, slip_factor=-0.5) + vl_0, vr_0 = tank_speeds(0.0, 1.0, 0.30, slip_factor=0.0) + assert vl_neg == pytest.approx(vl_0) + assert vr_neg == pytest.approx(vr_0) + + def test_symmetry_linear(self): + """Reversing linear_x reverses both speeds equally.""" + vl_f, vr_f = tank_speeds(1.0, 0.5, 0.30, slip_factor=0.2) + vl_r, vr_r = tank_speeds(-1.0, 0.5, 0.30, slip_factor=0.2) + assert vl_r == pytest.approx(vl_f - 2.0) + assert vr_r == pytest.approx(vr_f - 2.0) + + def test_zero_radius_turn(self): + """lin=0, ang≠0 → equal and opposite speeds (zero-radius pivot).""" + vl, vr = tank_speeds(0.0, 1.0, 0.30, slip_factor=0.0) + assert vl == pytest.approx(-vr) + + def test_zero_track_width_no_crash(self): + vl, vr = tank_speeds(1.0, 2.0, 0.0, slip_factor=0.0) + assert vl == pytest.approx(1.0) + assert vr == pytest.approx(1.0) + + +# ─── skid_steer_speeds ─────────────────────────────────────────────────────── + +class TestSkidSteerSpeeds: + + def test_straight(self): + vl, vr = skid_steer_speeds(1.0, 0.0, 0.30) + assert vl == pytest.approx(1.0) + assert vr == pytest.approx(1.0) + + def test_point_turn(self): + vl, vr = skid_steer_speeds(0.0, 1.0, 0.30) + assert vl == pytest.approx(-0.15) + assert vr == pytest.approx(0.15) + + def test_curve(self): + vl, vr = skid_steer_speeds(1.0, 1.0, 0.30) + assert vl == pytest.approx(1.0 - 0.15) + assert vr == pytest.approx(1.0 + 0.15) + + +# ─── speed_to_pwm ──────────────────────────────────────────────────────────── + +class TestSpeedToPwm: + + def test_zero_speed_neutral(self): + assert speed_to_pwm(0.0, 1.5, 1500, 500) == 1500 + + def test_full_forward(self): + assert speed_to_pwm(1.5, 1.5, 1500, 500) == 2000 + + def test_full_reverse(self): + assert speed_to_pwm(-1.5, 1.5, 1500, 500) == 1000 + + def test_half_forward(self): + assert speed_to_pwm(0.75, 1.5, 1500, 500) == 1750 + + def test_clamp_over(self): + assert speed_to_pwm(10.0, 1.5, 1500, 500) == 2000 + + def test_clamp_under(self): + assert speed_to_pwm(-10.0, 1.5, 1500, 500) == 1000 + + def test_zero_max_returns_neutral(self): + assert speed_to_pwm(1.0, 0.0, 1500, 500) == 1500 + + def test_returns_int(self): + assert isinstance(speed_to_pwm(0.5, 1.5, 1500, 500), int) + + def test_rounding(self): + # 1.0/1.5 = 0.6667 → 1500 + 0.6667*500 = 1833.33 → 1833 + pwm = speed_to_pwm(1.0, 1.5, 1500, 500) + assert pwm == 1833 + + +# ─── compute_track_speeds ──────────────────────────────────────────────────── + +class TestComputeTrackSpeeds: + + def test_returns_two_values(self): + result = compute_track_speeds(1.0, 0.0, 0.30, 1.5) + assert len(result) == 2 + + def test_straight_tracked(self): + vl, vr = compute_track_speeds(1.0, 0.0, 0.30, 1.5, drive_mode="tracked", slip_factor=0.3) + assert vl == pytest.approx(1.0) + assert vr == pytest.approx(1.0) + + def test_straight_2wd(self): + vl, vr = compute_track_speeds(1.0, 0.0, 0.30, 1.5, drive_mode="2wd") + assert vl == pytest.approx(1.0) + assert vr == pytest.approx(1.0) + + def test_straight_4wd(self): + vl, vr = compute_track_speeds(1.0, 0.0, 0.30, 1.5, drive_mode="4wd") + assert vl == pytest.approx(1.0) + assert vr == pytest.approx(1.0) + + def test_clipping_tracked(self): + vl, vr = compute_track_speeds(1.5, 5.0, 0.30, 1.5, drive_mode="tracked", slip_factor=0.3) + assert -1.5 <= vl <= 1.5 + assert -1.5 <= vr <= 1.5 + + def test_clipping_2wd(self): + vl, vr = compute_track_speeds(1.5, 5.0, 0.30, 1.5, drive_mode="2wd") + assert -1.5 <= vl <= 1.5 + assert -1.5 <= vr <= 1.5 + + def test_tracked_differs_from_2wd_on_turn(self): + """With slip>0, tracked mode should produce a different differential.""" + vl_t, vr_t = compute_track_speeds(0.0, 1.0, 0.30, 1.5, + drive_mode="tracked", slip_factor=0.3) + vl_2, vr_2 = compute_track_speeds(0.0, 1.0, 0.30, 1.5, + drive_mode="2wd", slip_factor=0.3) + assert abs(vr_t - vl_t) > abs(vr_2 - vl_2) + + def test_unknown_mode_falls_back(self): + """Unknown drive_mode: falls through to skid_steer (no slip comp).""" + vl, vr = compute_track_speeds(1.0, 0.5, 0.30, 1.5, drive_mode="bogus") + vl_ref, vr_ref = compute_track_speeds(1.0, 0.5, 0.30, 1.5, drive_mode="2wd") + assert vl == pytest.approx(vl_ref) + assert vr == pytest.approx(vr_ref) + + def test_zero_pivot_tracked(self): + vl, vr = compute_track_speeds(0.0, 1.0, 0.30, 1.5, drive_mode="tracked", slip_factor=0.0) + assert vl == pytest.approx(-vr) + + def test_reverse_straight(self): + vl, vr = compute_track_speeds(-1.0, 0.0, 0.30, 1.5, drive_mode="tracked") + assert vl == pytest.approx(-1.0) + assert vr == pytest.approx(-1.0) + + +# ─── expand_to_4ch ─────────────────────────────────────────────────────────── + +class TestExpandTo4ch: + + def test_returns_four_values(self): + result = expand_to_4ch(1.0, -1.0) + assert len(result) == 4 + + def test_left_pair(self): + vfl, vlr, vrf, vrr = expand_to_4ch(0.5, 1.0) + assert vfl == pytest.approx(0.5) + assert vlr == pytest.approx(0.5) + + def test_right_pair(self): + vfl, vlr, vrf, vrr = expand_to_4ch(0.5, 1.0) + assert vrf == pytest.approx(1.0) + assert vrr == pytest.approx(1.0) + + def test_zero_inputs(self): + assert expand_to_4ch(0.0, 0.0) == (0.0, 0.0, 0.0, 0.0) + + def test_negative(self): + vfl, vlr, vrf, vrr = expand_to_4ch(-0.5, -0.5) + assert all(v == pytest.approx(-0.5) for v in (vfl, vlr, vrf, vrr)) + + +# ─── odometry_from_track_speeds ────────────────────────────────────────────── + +class TestOdometryFromTrackSpeeds: + + def test_straight_no_slip(self): + lin, ang = odometry_from_track_speeds(1.0, 1.0, 0.30, slip_factor=0.0) + assert lin == pytest.approx(1.0) + assert ang == pytest.approx(0.0) + + def test_point_turn_ccw_no_slip(self): + vl, vr = -0.15, 0.15 + lin, ang = odometry_from_track_speeds(vl, vr, 0.30, slip_factor=0.0) + assert lin == pytest.approx(0.0) + assert ang == pytest.approx(1.0) + + def test_slip_reduces_angular_estimate(self): + """With slip, measured angular is scaled down.""" + lin0, ang0 = odometry_from_track_speeds(-0.15, 0.15, 0.30, slip_factor=0.0) + lin3, ang3 = odometry_from_track_speeds(-0.15, 0.15, 0.30, slip_factor=0.3) + assert abs(ang3) < abs(ang0) + + def test_known_slip_value(self): + """slip=0.5 → angular halved.""" + lin, ang = odometry_from_track_speeds(-0.15, 0.15, 0.30, slip_factor=0.5) + # (0.15 - -0.15) / 0.30 * (1 - 0.5) = 1.0 * 0.5 + assert ang == pytest.approx(0.5) + + def test_zero_track_width_no_crash(self): + lin, ang = odometry_from_track_speeds(1.0, 1.0, 0.0, slip_factor=0.0) + assert lin == pytest.approx(1.0) + assert ang == pytest.approx(0.0) + + def test_zero_inputs(self): + lin, ang = odometry_from_track_speeds(0.0, 0.0, 0.30, slip_factor=0.0) + assert lin == pytest.approx(0.0) + assert ang == pytest.approx(0.0) + + def test_reverse_straight(self): + lin, ang = odometry_from_track_speeds(-1.0, -1.0, 0.30, slip_factor=0.0) + assert lin == pytest.approx(-1.0) + assert ang == pytest.approx(0.0) + + +# ─── Integration: command ↔ odometry round-trip ────────────────────────────── + +class TestRoundTrip: + + @pytest.mark.parametrize("slip", [0.0, 0.1, 0.3, 0.5]) + @pytest.mark.parametrize("lin,ang", [ + (1.0, 0.0), + (0.0, 1.0), + (0.5, 0.5), + (-0.5, 0.3), + ]) + def test_roundtrip_tracked(self, lin, ang, slip): + """ + Commanding (lin, ang) then estimating back via odometry should recover + (lin, ang) for all slip values, provided the speeds are unclipped. + """ + track = 0.30 + max_speed = 5.0 # generous to avoid clipping in test + vl, vr = compute_track_speeds(lin, ang, track, max_speed, + drive_mode="tracked", slip_factor=slip) + lin_out, ang_out = odometry_from_track_speeds(vl, vr, track, slip_factor=slip) + assert lin_out == pytest.approx(lin, abs=1e-9) + assert ang_out == pytest.approx(ang, abs=1e-9) + + def test_zero_pivot_pwm_symmetric(self): + """Zero-radius pivot: left and right PWMs are symmetric around neutral.""" + vl, vr = compute_track_speeds(0.0, 1.0, 0.30, 1.5, + drive_mode="tracked", slip_factor=0.0) + pwm_l = speed_to_pwm(vl, 1.5, 1500, 500) + pwm_r = speed_to_pwm(vr, 1.5, 1500, 500) + assert pwm_l + pwm_r == 3000 # symmetric: 1500 - x + 1500 + x + + def test_all_modes_forward_equal(self): + """Straight forward: all drive modes produce the same speeds.""" + for mode in DRIVE_MODES: + vl, vr = compute_track_speeds(1.0, 0.0, 0.30, 1.5, drive_mode=mode) + assert vl == pytest.approx(1.0) + assert vr == pytest.approx(1.0) + + def test_4ch_expansion_pwm(self): + """4-channel expansion: all channels driven during straight motion.""" + vl, vr = compute_track_speeds(1.0, 0.0, 0.30, 1.5, drive_mode="4wd") + pwms = [speed_to_pwm(v, 1.5, 1500, 500) for v in expand_to_4ch(vl, vr)] + assert all(p > 1500 for p in pwms) + + def test_pwm_neutral_when_stopped(self): + vl, vr = compute_track_speeds(0.0, 0.0, 0.30, 1.5, drive_mode="tracked") + for v in (vl, vr): + assert speed_to_pwm(v, 1.5, 1500, 500) == 1500