diff --git a/jetson/ros2_ws/src/saltybot_tank_driver/config/tank_params.yaml b/jetson/ros2_ws/src/saltybot_tank_driver/config/tank_params.yaml
new file mode 100644
index 0000000..f5bca53
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_tank_driver/config/tank_params.yaml
@@ -0,0 +1,36 @@
+tank_driver:
+ ros__parameters:
+ # Serial interface
+ serial_port: /dev/ttyUSB_tank
+ baud_rate: 115200
+
+ # Drive mode: "tracked" | "4wd" | "2wd"
+ # tracked — tank kinematics with track-slip compensation (default)
+ # 4wd — skid-steer, 4 ESC channels, no slip compensation
+ # 2wd — standard diff-drive, 2 ESC channels, no slip compensation
+ drive_mode: tracked
+
+ # Physical parameters
+ track_width: 0.30 # left↔right track centre distance (m)
+ wheelbase: 0.32 # front↔rear sprocket distance (m) — for reference
+
+ # Speed limits
+ max_speed_ms: 1.5 # m/s at full ESC deflection
+ max_linear_vel: 1.0 # clamp on cmd_vel linear.x (m/s)
+ max_angular_vel: 2.5 # clamp on cmd_vel angular.z (rad/s)
+
+ # Track slip compensation (tracked mode only)
+ # 0.0 = no slip (rigid wheel / high-traction surface)
+ # 0.1 = light slip (rubber tracks on concrete)
+ # 0.3 = moderate slip (rubber tracks on grass/gravel) ← default
+ # 0.5 = heavy slip (metal tracks on mud)
+ slip_factor: 0.3
+
+ # ESC PWM settings
+ pwm_neutral_us: 1500 # neutral/brake pulse width (µs)
+ pwm_range_us: 500 # half-range from neutral → [1000..2000] µs
+
+ # Timing
+ watchdog_timeout_s: 0.3 # deadman: shorter than rover for tracked safety
+ heartbeat_period_s: 0.2
+ control_rate: 50.0 # Hz
diff --git a/jetson/ros2_ws/src/saltybot_tank_driver/launch/tank_driver.launch.py b/jetson/ros2_ws/src/saltybot_tank_driver/launch/tank_driver.launch.py
new file mode 100644
index 0000000..629593a
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_tank_driver/launch/tank_driver.launch.py
@@ -0,0 +1,65 @@
+"""
+tank_driver.launch.py — Launch the SaltyTank ESC motor driver node.
+
+Usage:
+ ros2 launch saltybot_tank_driver tank_driver.launch.py
+ ros2 launch saltybot_tank_driver tank_driver.launch.py serial_port:=/dev/ttyACM0
+ ros2 launch saltybot_tank_driver tank_driver.launch.py drive_mode:=4wd
+ ros2 launch saltybot_tank_driver tank_driver.launch.py slip_factor:=0.1
+
+Emergency stop:
+ ros2 topic pub /saltybot/e_stop std_msgs/Bool '{data: true}' --once
+ ros2 topic pub /saltybot/e_stop std_msgs/Bool '{data: false}' --once
+"""
+
+import os
+
+from ament_index_python.packages import get_package_share_directory
+from launch import LaunchDescription
+from launch.actions import DeclareLaunchArgument
+from launch.substitutions import LaunchConfiguration
+from launch_ros.actions import Node
+
+
+def generate_launch_description():
+ pkg_share = get_package_share_directory("saltybot_tank_driver")
+ params_file = os.path.join(pkg_share, "config", "tank_params.yaml")
+
+ serial_port_arg = DeclareLaunchArgument(
+ "serial_port",
+ default_value="/dev/ttyUSB_tank",
+ description="Serial port for ESC controller",
+ )
+ drive_mode_arg = DeclareLaunchArgument(
+ "drive_mode",
+ default_value="tracked",
+ description="Drive mode: tracked | 4wd | 2wd",
+ )
+ slip_factor_arg = DeclareLaunchArgument(
+ "slip_factor",
+ default_value="0.3",
+ description="Track slip coefficient [0.0, 0.99]",
+ )
+
+ tank_driver = Node(
+ package="saltybot_tank_driver",
+ executable="tank_driver_node",
+ name="tank_driver",
+ parameters=[
+ params_file,
+ {
+ "serial_port": LaunchConfiguration("serial_port"),
+ "drive_mode": LaunchConfiguration("drive_mode"),
+ "slip_factor": LaunchConfiguration("slip_factor"),
+ },
+ ],
+ output="screen",
+ emulate_tty=True,
+ )
+
+ return LaunchDescription([
+ serial_port_arg,
+ drive_mode_arg,
+ slip_factor_arg,
+ tank_driver,
+ ])
diff --git a/jetson/ros2_ws/src/saltybot_tank_driver/package.xml b/jetson/ros2_ws/src/saltybot_tank_driver/package.xml
new file mode 100644
index 0000000..0446742
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_tank_driver/package.xml
@@ -0,0 +1,23 @@
+
+
+
+ saltybot_tank_driver
+ 0.1.0
+ SaltyTank tracked-vehicle ESC driver with slip compensation (Issue #122)
+ sl-controls
+ MIT
+
+ rclpy
+ geometry_msgs
+ nav_msgs
+ std_msgs
+
+ ament_copyright
+ ament_flake8
+ ament_pep257
+ pytest
+
+
+ ament_python
+
+
diff --git a/jetson/ros2_ws/src/saltybot_tank_driver/resource/saltybot_tank_driver b/jetson/ros2_ws/src/saltybot_tank_driver/resource/saltybot_tank_driver
new file mode 100644
index 0000000..e69de29
diff --git a/jetson/ros2_ws/src/saltybot_tank_driver/saltybot_tank_driver/__init__.py b/jetson/ros2_ws/src/saltybot_tank_driver/saltybot_tank_driver/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/jetson/ros2_ws/src/saltybot_tank_driver/saltybot_tank_driver/kinematics.py b/jetson/ros2_ws/src/saltybot_tank_driver/saltybot_tank_driver/kinematics.py
new file mode 100644
index 0000000..58f760f
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_tank_driver/saltybot_tank_driver/kinematics.py
@@ -0,0 +1,203 @@
+"""
+kinematics.py — Tank / tracked-vehicle kinematics for SaltyTank (Issue #122).
+
+Drive modes
+───────────
+ "2wd" — standard differential drive, 2 ESC channels (CH1=left, CH3=right).
+ Mirrored to 4 channels by the caller (CH1=CH2=left, CH3=CH4=right).
+
+ "4wd" — skid-steer 4-channel. Left pair (CH1+CH2) share one speed;
+ right pair (CH3+CH4) share one speed. No slip compensation.
+
+ "tracked" — tank kinematics with track-slip compensation.
+ Commanded angular is scaled up by 1/(1-slip_factor) so the
+ vehicle achieves the desired turn rate despite track slip.
+ Odometry angular is scaled down by (1-slip_factor) to reflect
+ the fact that wheel-speed difference overestimates actual yaw rate.
+
+Slip compensation model
+───────────────────────
+ slip_factor ∈ [0.0, 0.99]
+
+ 0.0 → rigid wheel / no slip (identical to standard diff-drive)
+ 0.1 → light slip (rubber tracks on concrete)
+ 0.3 → moderate slip (rubber tracks on grass / gravel) ← default
+ 0.5 → heavy slip (metal tracks on mud)
+
+ Command path : angular_cmd = angular_z / (1 - slip_factor)
+ Odometry path : angular_odom = (v_right - v_left) / B × (1 - slip_factor)
+
+ The two corrections are complementary: a vehicle commanded at ω rad/s and
+ estimated by odometry will yield consistent trajectories regardless of the
+ chosen slip_factor value (as long as it is consistent between command and odom).
+
+Zero-radius turns
+─────────────────
+ Setting linear_x=0 and angular_z≠0 produces equal-and-opposite track speeds,
+ which is a valid zero-radius pivot for tracked vehicles. The slip factor still
+ applies; set slip_factor=0 for a surface with known high traction.
+
+Pure module — no ROS2 dependency; fully unit-testable.
+"""
+
+from __future__ import annotations
+
+import math
+
+
+# ── Constants ────────────────────────────────────────────────────────────────
+
+SLIP_MIN = 0.0
+SLIP_MAX = 0.99 # must stay < 1.0 to avoid division-by-zero
+DRIVE_MODES = ("2wd", "4wd", "tracked")
+
+
+# ── Low-level helpers ────────────────────────────────────────────────────────
+
+def _clamp_slip(slip_factor: float) -> float:
+ return max(SLIP_MIN, min(SLIP_MAX, slip_factor))
+
+
+# ── Track-speed computation ──────────────────────────────────────────────────
+
+def tank_speeds(
+ linear_x: float,
+ angular_z: float,
+ track_width: float,
+ slip_factor: float = 0.0,
+) -> tuple[float, float]:
+ """
+ Unicycle → tank (tracked) differential drive with optional slip compensation.
+
+ Parameters
+ ----------
+ linear_x : forward velocity (m/s)
+ angular_z : desired yaw rate (rad/s, +ve = CCW / left turn)
+ track_width : centre-to-centre track separation (m)
+ slip_factor : track slip coefficient ∈ [0.0, 0.99]; 0 = no slip
+
+ Returns
+ -------
+ (v_left, v_right) in m/s
+ """
+ slip = _clamp_slip(slip_factor)
+ # Scale angular command up to achieve desired turn rate despite slip
+ angular_cmd = angular_z / (1.0 - slip) if slip > 0.0 else angular_z
+ half = angular_cmd * track_width / 2.0
+ return linear_x - half, linear_x + half
+
+
+def skid_steer_speeds(
+ linear_x: float,
+ angular_z: float,
+ track_width: float,
+) -> tuple[float, float]:
+ """
+ Standard unicycle → differential drive (no slip compensation).
+ Used for 2WD and 4WD modes.
+ """
+ half = angular_z * track_width / 2.0
+ return linear_x - half, linear_x + half
+
+
+# ── m/s → ESC PWM ────────────────────────────────────────────────────────────
+
+def speed_to_pwm(
+ speed_ms: float,
+ max_speed_ms: float,
+ pwm_neutral_us: int,
+ pwm_range_us: int,
+) -> int:
+ """
+ Convert wheel/track speed (m/s) to ESC PWM pulse width (µs).
+
+ Linear mapping:
+ pwm = neutral + (speed / max_speed) * range
+
+ Clamped to [neutral - range, neutral + range].
+ """
+ if max_speed_ms <= 0.0:
+ return pwm_neutral_us
+ normalised = max(-1.0, min(1.0, speed_ms / max_speed_ms))
+ pwm = pwm_neutral_us + normalised * pwm_range_us
+ lo = pwm_neutral_us - pwm_range_us
+ hi = pwm_neutral_us + pwm_range_us
+ return int(round(max(lo, min(hi, pwm))))
+
+
+# ── Variant-aware speed computation ─────────────────────────────────────────
+
+def compute_track_speeds(
+ linear_x: float,
+ angular_z: float,
+ track_width: float,
+ max_speed_ms: float,
+ drive_mode: str = "tracked",
+ slip_factor: float = 0.3,
+) -> tuple[float, float]:
+ """
+ Compute and clip left/right track/wheel speeds for the selected mode.
+
+ Returns
+ -------
+ (v_left, v_right) clipped to ±max_speed_ms.
+
+ All modes produce a 2-element tuple; the caller expands to 4 channels
+ for 4WD mode by mirroring (vl, vl, vr, vr).
+ """
+ def clip(v: float) -> float:
+ return max(-max_speed_ms, min(max_speed_ms, v))
+
+ if drive_mode == "tracked":
+ vl, vr = tank_speeds(linear_x, angular_z, track_width, slip_factor)
+ else:
+ # "2wd" or "4wd" — standard diff-drive, no slip compensation
+ vl, vr = skid_steer_speeds(linear_x, angular_z, track_width)
+
+ return clip(vl), clip(vr)
+
+
+def expand_to_4ch(v_left: float, v_right: float) -> tuple[float, float, float, float]:
+ """
+ Expand left/right speeds to 4-channel ESC layout.
+
+ Channel assignment:
+ CH1 = left-front CH2 = left-rear
+ CH3 = right-front CH4 = right-rear
+ """
+ return v_left, v_left, v_right, v_right
+
+
+# ── Odometry helper ──────────────────────────────────────────────────────────
+
+def odometry_from_track_speeds(
+ v_left: float,
+ v_right: float,
+ track_width: float,
+ slip_factor: float = 0.0,
+) -> tuple[float, float]:
+ """
+ Estimate (linear_x, angular_z) from left/right track speeds.
+
+ Applies slip compensation to the angular estimate:
+ angular_z_odom = (v_right - v_left) / B × (1 - slip_factor)
+
+ This is the inverse of tank_speeds for consistent dead-reckoning.
+
+ Parameters
+ ----------
+ v_left, v_right : measured track speeds (m/s)
+ track_width : centre-to-centre track separation (m)
+ slip_factor : same value used in the command path
+
+ Returns
+ -------
+ (linear_x, angular_z) in m/s and rad/s
+ """
+ linear_x = (v_left + v_right) / 2.0
+ if track_width > 0.0:
+ slip = _clamp_slip(slip_factor)
+ angular_z = (v_right - v_left) / track_width * (1.0 - slip)
+ else:
+ angular_z = 0.0
+ return linear_x, angular_z
diff --git a/jetson/ros2_ws/src/saltybot_tank_driver/saltybot_tank_driver/tank_driver_node.py b/jetson/ros2_ws/src/saltybot_tank_driver/saltybot_tank_driver/tank_driver_node.py
new file mode 100644
index 0000000..3f9024e
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_tank_driver/saltybot_tank_driver/tank_driver_node.py
@@ -0,0 +1,383 @@
+"""
+tank_driver_node.py — SaltyTank tracked-vehicle ESC driver (Issue #122).
+
+Hardware
+────────
+ SaltyTank: tracked robot with left/right independent brushless ESCs.
+ ESCs controlled via PWM (servo-style 1000–2000 µs pulses).
+ Communication: USB CDC serial to STM32 or Raspberry Pi Pico GPIO PWM bridge.
+
+ ESC channel assignments (configurable):
+ CH1 = left-front (or left-track in 2WD/tracked mode)
+ CH2 = left-rear (mirrored in 2WD/tracked mode)
+ CH3 = right-front (or right-track in 2WD/tracked mode)
+ CH4 = right-rear (mirrored in 2WD/tracked mode)
+
+Drive modes
+───────────
+ Selectable at runtime via `drive_mode` parameter (default "tracked").
+
+ "2wd" — standard differential drive; CH1+CH2 = left, CH3+CH4 = right.
+ No slip compensation.
+
+ "4wd" — skid-steer; all four ESCs commanded. Left pair same speed,
+ right pair same speed. No slip compensation.
+
+ "tracked" — tank kinematics with slip compensation. Angular command is
+ pre-scaled to achieve the desired yaw rate despite track slip.
+ Odometry angular is de-scaled consistently.
+
+ Switch live: ros2 param set /tank_driver drive_mode tracked
+
+Serial protocol
+────────────────
+ Command frame (ASCII, newline-terminated):
+ P,,,\\n
+ Values: 1000–2000 µs (1500 = neutral/brake)
+
+ Emergency stop frame:
+ E\\n — forces all channels to neutral immediately
+
+ Heartbeat:
+ H\\n every heartbeat_period_s (ESC controller zeros PWM after timeout)
+
+Emergency stop
+──────────────
+ Two stop paths:
+ 1. Software watchdog — /cmd_vel silence > watchdog_timeout_s → neutral
+ 2. E-stop topic — /saltybot/e_stop (std_msgs/Bool, True = stop)
+
+ Both paths send P1500,1500,1500,1500\\n and latch until a new /cmd_vel arrives
+ (e-stop topic) or new commands come in (watchdog).
+
+Subscribes
+──────────
+ /cmd_vel (geometry_msgs/Twist) — Nav2-compatible velocity command
+ linear.x = forward (m/s)
+ angular.z = yaw rate (rad/s, +CCW)
+ /saltybot/e_stop (std_msgs/Bool) — True = emergency stop
+
+Publishes
+─────────
+ /saltybot/tank_pwm (std_msgs/String JSON) — current PWM values
+ /saltybot/tank_odom (nav_msgs/Odometry) — dead-reckoning odometry
+
+Parameters
+──────────
+ serial_port /dev/ttyUSB_tank (or /dev/ttyACM0, etc.)
+ baud_rate 115200
+ drive_mode "tracked" — "2wd" | "4wd" | "tracked"
+ track_width 0.30 — left↔right track centre distance (m)
+ wheelbase 0.32 — front↔rear axle distance (m)
+ max_speed_ms 1.5 — m/s at full ESC deflection
+ max_linear_vel 1.0 — clamp on cmd_vel linear.x (m/s)
+ max_angular_vel 2.5 — clamp on cmd_vel angular.z (rad/s)
+ slip_factor 0.3 — track slip coefficient [0.0, 0.99]
+ pwm_neutral_us 1500
+ pwm_range_us 500
+ watchdog_timeout_s 0.3 — tighter than rover (0.5 s) for safety
+ heartbeat_period_s 0.2
+ control_rate 50.0 — Hz
+
+Usage
+─────
+ ros2 launch saltybot_tank_driver tank_driver.launch.py
+ ros2 param set /tank_driver drive_mode 4wd
+ ros2 param set /tank_driver slip_factor 0.1
+ ros2 topic pub /saltybot/e_stop std_msgs/Bool '{data: true}' --once
+"""
+
+import json
+import math
+import threading
+import time
+
+import rclpy
+from rclpy.node import Node
+from rclpy.qos import HistoryPolicy, QoSProfile, ReliabilityPolicy
+
+from geometry_msgs.msg import Twist
+from nav_msgs.msg import Odometry
+from std_msgs.msg import Bool, String
+
+from saltybot_tank_driver.kinematics import (
+ compute_track_speeds,
+ expand_to_4ch,
+ odometry_from_track_speeds,
+ speed_to_pwm,
+ DRIVE_MODES,
+)
+
+try:
+ import serial
+ _SERIAL_OK = True
+except ImportError:
+ _SERIAL_OK = False
+
+
+class TankDriverNode(Node):
+
+ def __init__(self):
+ super().__init__("tank_driver")
+
+ # ── Parameters ───────────────────────────────────────────────────────
+ self.declare_parameter("serial_port", "/dev/ttyUSB_tank")
+ self.declare_parameter("baud_rate", 115200)
+ self.declare_parameter("drive_mode", "tracked")
+ self.declare_parameter("track_width", 0.30)
+ self.declare_parameter("wheelbase", 0.32)
+ self.declare_parameter("max_speed_ms", 1.5)
+ self.declare_parameter("max_linear_vel", 1.0)
+ self.declare_parameter("max_angular_vel", 2.5)
+ self.declare_parameter("slip_factor", 0.3)
+ self.declare_parameter("pwm_neutral_us", 1500)
+ self.declare_parameter("pwm_range_us", 500)
+ self.declare_parameter("watchdog_timeout_s", 0.3)
+ self.declare_parameter("heartbeat_period_s", 0.2)
+ self.declare_parameter("control_rate", 50.0)
+
+ self._p = self._load_params()
+
+ # ── State ────────────────────────────────────────────────────────────
+ self._target_linear_x: float = 0.0
+ self._target_angular_z: float = 0.0
+ self._last_cmd_vel_t: float = 0.0
+ self._last_pwm: tuple = (1500, 1500, 1500, 1500)
+ self._e_stop: bool = False
+
+ self._odom_x: float = 0.0
+ self._odom_y: float = 0.0
+ self._odom_yaw: float = 0.0
+ self._last_odom_t: float = time.monotonic()
+
+ # ── Serial ───────────────────────────────────────────────────────────
+ self._ser = None
+ self._ser_lock = threading.Lock()
+ if _SERIAL_OK:
+ self._open_serial()
+ else:
+ self.get_logger().warn(
+ "pyserial not installed — running in simulation mode (no serial I/O)")
+
+ # ── QoS ──────────────────────────────────────────────────────────────
+ reliable = QoSProfile(
+ reliability=ReliabilityPolicy.RELIABLE,
+ history=HistoryPolicy.KEEP_LAST,
+ depth=10,
+ )
+
+ # ── Subscriptions ────────────────────────────────────────────────────
+ self.create_subscription(Twist, "/cmd_vel", self._cmd_vel_cb, reliable)
+ self.create_subscription(Bool, "/saltybot/e_stop", self._e_stop_cb, reliable)
+
+ # ── Publishers ───────────────────────────────────────────────────────
+ self._pwm_pub = self.create_publisher(String, "/saltybot/tank_pwm", reliable)
+ self._odom_pub = self.create_publisher(Odometry, "/saltybot/tank_odom", reliable)
+
+ # ── Timers ───────────────────────────────────────────────────────────
+ rate = self._p["control_rate"]
+ self._ctrl_timer = self.create_timer(1.0 / rate, self._control_cb)
+ self._hb_timer = self.create_timer(self._p["hb_period"], self._heartbeat_cb)
+
+ self.get_logger().info(
+ f"TankDriverNode ready "
+ f"mode={self._p['drive_mode']} "
+ f"track={self._p['track_width']}m "
+ f"slip={self._p['slip_factor']} "
+ f"max_speed={self._p['max_speed_ms']}m/s "
+ f"port={self._p['serial_port']}"
+ )
+
+ # ── Parameter load ────────────────────────────────────────────────────────
+
+ def _load_params(self) -> dict:
+ mode = self.get_parameter("drive_mode").value
+ if mode not in DRIVE_MODES:
+ self.get_logger().warn(
+ f"Unknown drive_mode '{mode}', defaulting to 'tracked'")
+ mode = "tracked"
+ return {
+ "serial_port": self.get_parameter("serial_port").value,
+ "baud_rate": self.get_parameter("baud_rate").value,
+ "drive_mode": mode,
+ "track_width": self.get_parameter("track_width").value,
+ "wheelbase": self.get_parameter("wheelbase").value,
+ "max_speed_ms": self.get_parameter("max_speed_ms").value,
+ "max_linear_vel": self.get_parameter("max_linear_vel").value,
+ "max_angular_vel":self.get_parameter("max_angular_vel").value,
+ "slip_factor": float(self.get_parameter("slip_factor").value),
+ "pwm_neutral": int(self.get_parameter("pwm_neutral_us").value),
+ "pwm_range": int(self.get_parameter("pwm_range_us").value),
+ "watchdog_timeout": self.get_parameter("watchdog_timeout_s").value,
+ "hb_period": self.get_parameter("heartbeat_period_s").value,
+ "control_rate": self.get_parameter("control_rate").value,
+ }
+
+ # ── Serial helpers ────────────────────────────────────────────────────────
+
+ def _open_serial(self) -> None:
+ with self._ser_lock:
+ try:
+ self._ser = serial.Serial(
+ self._p["serial_port"], self._p["baud_rate"], timeout=0.05)
+ self.get_logger().info(f"Serial open: {self._p['serial_port']}")
+ except Exception as exc:
+ self.get_logger().error(f"Cannot open serial: {exc}")
+ self._ser = None
+
+ def _write(self, data: bytes) -> bool:
+ with self._ser_lock:
+ if self._ser is None or not self._ser.is_open:
+ return False
+ try:
+ self._ser.write(data)
+ return True
+ except Exception as exc:
+ self.get_logger().error(f"Serial write: {exc}",
+ throttle_duration_sec=2.0)
+ self._ser = None
+ return False
+
+ def _send_neutral(self) -> None:
+ self._write(b"P1500,1500,1500,1500\n")
+ self._last_pwm = (1500, 1500, 1500, 1500)
+
+ # ── cmd_vel callback ──────────────────────────────────────────────────────
+
+ def _cmd_vel_cb(self, msg: Twist) -> None:
+ p = self._p
+ self._target_linear_x = max(-p["max_linear_vel"],
+ min(p["max_linear_vel"], msg.linear.x))
+ self._target_angular_z = max(-p["max_angular_vel"],
+ min(p["max_angular_vel"], msg.angular.z))
+ self._last_cmd_vel_t = time.monotonic()
+ # A new cmd_vel clears a latched e-stop
+ if self._e_stop:
+ self._e_stop = False
+ self.get_logger().info("E-stop cleared by new cmd_vel")
+
+ # ── E-stop callback ───────────────────────────────────────────────────────
+
+ def _e_stop_cb(self, msg: Bool) -> None:
+ if msg.data and not self._e_stop:
+ self._e_stop = True
+ self._send_neutral()
+ self.get_logger().warn("Emergency stop engaged")
+ elif not msg.data and self._e_stop:
+ self._e_stop = False
+ self.get_logger().info("E-stop released via topic")
+
+ # ── 50 Hz control loop ────────────────────────────────────────────────────
+
+ def _control_cb(self) -> None:
+ self._p = self._load_params()
+ p = self._p
+ now = time.monotonic()
+
+ # E-stop or watchdog
+ cmd_age = (now - self._last_cmd_vel_t) if self._last_cmd_vel_t > 0.0 else 1e9
+ if self._e_stop or cmd_age > p["watchdog_timeout"]:
+ lin_x, ang_z = 0.0, 0.0
+ else:
+ lin_x, ang_z = self._target_linear_x, self._target_angular_z
+
+ # Kinematics → left/right track speeds
+ vl, vr = compute_track_speeds(
+ lin_x, ang_z,
+ track_width=p["track_width"],
+ max_speed_ms=p["max_speed_ms"],
+ drive_mode=p["drive_mode"],
+ slip_factor=p["slip_factor"],
+ )
+
+ # Expand to 4 channels
+ speeds4 = expand_to_4ch(vl, vr) # (vl, vl, vr, vr)
+
+ # Convert to PWM
+ def to_pwm(v):
+ return speed_to_pwm(v, p["max_speed_ms"], p["pwm_neutral"], p["pwm_range"])
+
+ pwm = tuple(to_pwm(v) for v in speeds4)
+ self._last_pwm = pwm
+
+ # Send P command
+ frame = f"P{pwm[0]},{pwm[1]},{pwm[2]},{pwm[3]}\n".encode("ascii")
+ self._write(frame)
+
+ # Publish observability
+ pwm_msg = String()
+ pwm_msg.data = json.dumps({
+ "ch1_us": pwm[0], "ch2_us": pwm[1],
+ "ch3_us": pwm[2], "ch4_us": pwm[3],
+ "drive_mode": p["drive_mode"],
+ "e_stop": self._e_stop,
+ })
+ self._pwm_pub.publish(pwm_msg)
+
+ # Odometry integration (dead reckoning with slip compensation)
+ dt = now - self._last_odom_t
+ self._last_odom_t = now
+ self._integrate_odometry(vl, vr, p["track_width"], p["slip_factor"], dt)
+ self._publish_odom()
+
+ # ── Heartbeat ─────────────────────────────────────────────────────────────
+
+ def _heartbeat_cb(self) -> None:
+ self._write(b"H\n")
+
+ # ── Odometry ─────────────────────────────────────────────────────────────
+
+ def _integrate_odometry(
+ self,
+ v_left: float,
+ v_right: float,
+ track_width: float,
+ slip_factor: float,
+ dt: float,
+ ) -> None:
+ """2D dead-reckoning with track-slip compensation."""
+ lin, ang = odometry_from_track_speeds(v_left, v_right, track_width, slip_factor)
+ self._odom_yaw += ang * dt
+ self._odom_x += lin * math.cos(self._odom_yaw) * dt
+ self._odom_y += lin * math.sin(self._odom_yaw) * dt
+
+ def _publish_odom(self) -> None:
+ yaw = self._odom_yaw
+ msg = Odometry()
+ msg.header.stamp = self.get_clock().now().to_msg()
+ msg.header.frame_id = "odom"
+ msg.child_frame_id = "base_link"
+
+ msg.pose.pose.position.x = self._odom_x
+ msg.pose.pose.position.y = self._odom_y
+ msg.pose.pose.orientation.z = math.sin(yaw / 2.0)
+ msg.pose.pose.orientation.w = math.cos(yaw / 2.0)
+
+ msg.twist.twist.linear.x = self._target_linear_x
+ msg.twist.twist.angular.z = self._target_angular_z
+
+ self._odom_pub.publish(msg)
+
+ # ── Lifecycle ─────────────────────────────────────────────────────────────
+
+ def destroy_node(self) -> None:
+ self._send_neutral() # safe-state on shutdown
+ super().destroy_node()
+
+
+# ── Entry point ───────────────────────────────────────────────────────────────
+
+def main(args=None):
+ rclpy.init(args=args)
+ node = TankDriverNode()
+ try:
+ rclpy.spin(node)
+ except KeyboardInterrupt:
+ pass
+ finally:
+ node.destroy_node()
+ rclpy.try_shutdown()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/jetson/ros2_ws/src/saltybot_tank_driver/setup.cfg b/jetson/ros2_ws/src/saltybot_tank_driver/setup.cfg
new file mode 100644
index 0000000..718c023
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_tank_driver/setup.cfg
@@ -0,0 +1,5 @@
+[develop]
+script_dir=$base/lib/saltybot_tank_driver
+
+[install]
+install_scripts=$base/lib/saltybot_tank_driver
diff --git a/jetson/ros2_ws/src/saltybot_tank_driver/setup.py b/jetson/ros2_ws/src/saltybot_tank_driver/setup.py
new file mode 100644
index 0000000..92c9a99
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_tank_driver/setup.py
@@ -0,0 +1,32 @@
+from setuptools import setup, find_packages
+import os
+from glob import glob
+
+package_name = "saltybot_tank_driver"
+
+setup(
+ name=package_name,
+ version="0.1.0",
+ packages=find_packages(exclude=["test"]),
+ data_files=[
+ ("share/ament_index/resource_index/packages",
+ [f"resource/{package_name}"]),
+ (f"share/{package_name}", ["package.xml"]),
+ (os.path.join("share", package_name, "config"),
+ glob("config/*.yaml")),
+ (os.path.join("share", package_name, "launch"),
+ glob("launch/*.py")),
+ ],
+ install_requires=["setuptools"],
+ zip_safe=True,
+ maintainer="sl-controls",
+ maintainer_email="sl-controls@saltylab.local",
+ description="SaltyTank tracked-vehicle ESC driver with slip compensation",
+ license="MIT",
+ tests_require=["pytest"],
+ entry_points={
+ "console_scripts": [
+ f"tank_driver_node = {package_name}.tank_driver_node:main",
+ ],
+ },
+)
diff --git a/jetson/ros2_ws/src/saltybot_tank_driver/test/test_tank_kinematics.py b/jetson/ros2_ws/src/saltybot_tank_driver/test/test_tank_kinematics.py
new file mode 100644
index 0000000..19c824e
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_tank_driver/test/test_tank_kinematics.py
@@ -0,0 +1,347 @@
+"""
+test_tank_kinematics.py — Unit tests for kinematics.py (Issue #122).
+
+No ROS2 runtime required — pure Python.
+"""
+
+import math
+import pytest
+
+from saltybot_tank_driver.kinematics import (
+ tank_speeds,
+ skid_steer_speeds,
+ speed_to_pwm,
+ compute_track_speeds,
+ expand_to_4ch,
+ odometry_from_track_speeds,
+ _clamp_slip,
+ DRIVE_MODES,
+ SLIP_MIN,
+ SLIP_MAX,
+)
+
+
+# ─── _clamp_slip ─────────────────────────────────────────────────────────────
+
+class TestClampSlip:
+
+ def test_zero_passes_through(self):
+ assert _clamp_slip(0.0) == 0.0
+
+ def test_typical_value(self):
+ assert _clamp_slip(0.3) == pytest.approx(0.3)
+
+ def test_below_min_clamped(self):
+ assert _clamp_slip(-0.1) == SLIP_MIN
+
+ def test_above_max_clamped(self):
+ assert _clamp_slip(1.0) == SLIP_MAX
+
+ def test_at_max_boundary(self):
+ assert _clamp_slip(0.99) == pytest.approx(0.99)
+
+ def test_large_value_clamped(self):
+ assert _clamp_slip(5.0) == SLIP_MAX
+
+
+# ─── tank_speeds ─────────────────────────────────────────────────────────────
+
+class TestTankSpeeds:
+
+ def test_straight_no_slip(self):
+ vl, vr = tank_speeds(1.0, 0.0, 0.30, slip_factor=0.0)
+ assert vl == pytest.approx(1.0)
+ assert vr == pytest.approx(1.0)
+
+ def test_point_turn_no_slip(self):
+ """Pure CCW rotation with no slip."""
+ vl, vr = tank_speeds(0.0, 1.0, 0.30, slip_factor=0.0)
+ assert vl == pytest.approx(-0.15)
+ assert vr == pytest.approx(0.15)
+
+ def test_point_turn_cw_no_slip(self):
+ vl, vr = tank_speeds(0.0, -1.0, 0.30, slip_factor=0.0)
+ assert vl == pytest.approx(0.15)
+ assert vr == pytest.approx(-0.15)
+
+ def test_zero_inputs(self):
+ vl, vr = tank_speeds(0.0, 0.0, 0.30, slip_factor=0.0)
+ assert vl == pytest.approx(0.0)
+ assert vr == pytest.approx(0.0)
+
+ def test_slip_increases_angular_command(self):
+ """With slip>0, angular component is amplified so output diff is larger."""
+ vl0, vr0 = tank_speeds(0.0, 1.0, 0.30, slip_factor=0.0)
+ vl3, vr3 = tank_speeds(0.0, 1.0, 0.30, slip_factor=0.3)
+ # More slip → larger differential to achieve same ω
+ assert abs(vr3 - vl3) > abs(vr0 - vl0)
+
+ def test_slip_zero_equals_skid_steer(self):
+ """slip_factor=0 must be identical to skid_steer_speeds."""
+ vl_t, vr_t = tank_speeds(0.7, 0.4, 0.30, slip_factor=0.0)
+ vl_s, vr_s = skid_steer_speeds(0.7, 0.4, 0.30)
+ assert vl_t == pytest.approx(vl_s)
+ assert vr_t == pytest.approx(vr_s)
+
+ def test_known_slip_value(self):
+ """slip=0.5 → angular scaled by 1/(1-0.5) = 2."""
+ vl, vr = tank_speeds(0.0, 1.0, 0.30, slip_factor=0.5)
+ expected_half = (1.0 / (1.0 - 0.5)) * 0.30 / 2.0 # = 0.30
+ assert vl == pytest.approx(-expected_half)
+ assert vr == pytest.approx(expected_half)
+
+ def test_negative_slip_clamped_to_zero(self):
+ """Negative slip_factor is clamped to 0."""
+ vl_neg, vr_neg = tank_speeds(0.0, 1.0, 0.30, slip_factor=-0.5)
+ vl_0, vr_0 = tank_speeds(0.0, 1.0, 0.30, slip_factor=0.0)
+ assert vl_neg == pytest.approx(vl_0)
+ assert vr_neg == pytest.approx(vr_0)
+
+ def test_symmetry_linear(self):
+ """Reversing linear_x reverses both speeds equally."""
+ vl_f, vr_f = tank_speeds(1.0, 0.5, 0.30, slip_factor=0.2)
+ vl_r, vr_r = tank_speeds(-1.0, 0.5, 0.30, slip_factor=0.2)
+ assert vl_r == pytest.approx(vl_f - 2.0)
+ assert vr_r == pytest.approx(vr_f - 2.0)
+
+ def test_zero_radius_turn(self):
+ """lin=0, ang≠0 → equal and opposite speeds (zero-radius pivot)."""
+ vl, vr = tank_speeds(0.0, 1.0, 0.30, slip_factor=0.0)
+ assert vl == pytest.approx(-vr)
+
+ def test_zero_track_width_no_crash(self):
+ vl, vr = tank_speeds(1.0, 2.0, 0.0, slip_factor=0.0)
+ assert vl == pytest.approx(1.0)
+ assert vr == pytest.approx(1.0)
+
+
+# ─── skid_steer_speeds ───────────────────────────────────────────────────────
+
+class TestSkidSteerSpeeds:
+
+ def test_straight(self):
+ vl, vr = skid_steer_speeds(1.0, 0.0, 0.30)
+ assert vl == pytest.approx(1.0)
+ assert vr == pytest.approx(1.0)
+
+ def test_point_turn(self):
+ vl, vr = skid_steer_speeds(0.0, 1.0, 0.30)
+ assert vl == pytest.approx(-0.15)
+ assert vr == pytest.approx(0.15)
+
+ def test_curve(self):
+ vl, vr = skid_steer_speeds(1.0, 1.0, 0.30)
+ assert vl == pytest.approx(1.0 - 0.15)
+ assert vr == pytest.approx(1.0 + 0.15)
+
+
+# ─── speed_to_pwm ────────────────────────────────────────────────────────────
+
+class TestSpeedToPwm:
+
+ def test_zero_speed_neutral(self):
+ assert speed_to_pwm(0.0, 1.5, 1500, 500) == 1500
+
+ def test_full_forward(self):
+ assert speed_to_pwm(1.5, 1.5, 1500, 500) == 2000
+
+ def test_full_reverse(self):
+ assert speed_to_pwm(-1.5, 1.5, 1500, 500) == 1000
+
+ def test_half_forward(self):
+ assert speed_to_pwm(0.75, 1.5, 1500, 500) == 1750
+
+ def test_clamp_over(self):
+ assert speed_to_pwm(10.0, 1.5, 1500, 500) == 2000
+
+ def test_clamp_under(self):
+ assert speed_to_pwm(-10.0, 1.5, 1500, 500) == 1000
+
+ def test_zero_max_returns_neutral(self):
+ assert speed_to_pwm(1.0, 0.0, 1500, 500) == 1500
+
+ def test_returns_int(self):
+ assert isinstance(speed_to_pwm(0.5, 1.5, 1500, 500), int)
+
+ def test_rounding(self):
+ # 1.0/1.5 = 0.6667 → 1500 + 0.6667*500 = 1833.33 → 1833
+ pwm = speed_to_pwm(1.0, 1.5, 1500, 500)
+ assert pwm == 1833
+
+
+# ─── compute_track_speeds ────────────────────────────────────────────────────
+
+class TestComputeTrackSpeeds:
+
+ def test_returns_two_values(self):
+ result = compute_track_speeds(1.0, 0.0, 0.30, 1.5)
+ assert len(result) == 2
+
+ def test_straight_tracked(self):
+ vl, vr = compute_track_speeds(1.0, 0.0, 0.30, 1.5, drive_mode="tracked", slip_factor=0.3)
+ assert vl == pytest.approx(1.0)
+ assert vr == pytest.approx(1.0)
+
+ def test_straight_2wd(self):
+ vl, vr = compute_track_speeds(1.0, 0.0, 0.30, 1.5, drive_mode="2wd")
+ assert vl == pytest.approx(1.0)
+ assert vr == pytest.approx(1.0)
+
+ def test_straight_4wd(self):
+ vl, vr = compute_track_speeds(1.0, 0.0, 0.30, 1.5, drive_mode="4wd")
+ assert vl == pytest.approx(1.0)
+ assert vr == pytest.approx(1.0)
+
+ def test_clipping_tracked(self):
+ vl, vr = compute_track_speeds(1.5, 5.0, 0.30, 1.5, drive_mode="tracked", slip_factor=0.3)
+ assert -1.5 <= vl <= 1.5
+ assert -1.5 <= vr <= 1.5
+
+ def test_clipping_2wd(self):
+ vl, vr = compute_track_speeds(1.5, 5.0, 0.30, 1.5, drive_mode="2wd")
+ assert -1.5 <= vl <= 1.5
+ assert -1.5 <= vr <= 1.5
+
+ def test_tracked_differs_from_2wd_on_turn(self):
+ """With slip>0, tracked mode should produce a different differential."""
+ vl_t, vr_t = compute_track_speeds(0.0, 1.0, 0.30, 1.5,
+ drive_mode="tracked", slip_factor=0.3)
+ vl_2, vr_2 = compute_track_speeds(0.0, 1.0, 0.30, 1.5,
+ drive_mode="2wd", slip_factor=0.3)
+ assert abs(vr_t - vl_t) > abs(vr_2 - vl_2)
+
+ def test_unknown_mode_falls_back(self):
+ """Unknown drive_mode: falls through to skid_steer (no slip comp)."""
+ vl, vr = compute_track_speeds(1.0, 0.5, 0.30, 1.5, drive_mode="bogus")
+ vl_ref, vr_ref = compute_track_speeds(1.0, 0.5, 0.30, 1.5, drive_mode="2wd")
+ assert vl == pytest.approx(vl_ref)
+ assert vr == pytest.approx(vr_ref)
+
+ def test_zero_pivot_tracked(self):
+ vl, vr = compute_track_speeds(0.0, 1.0, 0.30, 1.5, drive_mode="tracked", slip_factor=0.0)
+ assert vl == pytest.approx(-vr)
+
+ def test_reverse_straight(self):
+ vl, vr = compute_track_speeds(-1.0, 0.0, 0.30, 1.5, drive_mode="tracked")
+ assert vl == pytest.approx(-1.0)
+ assert vr == pytest.approx(-1.0)
+
+
+# ─── expand_to_4ch ───────────────────────────────────────────────────────────
+
+class TestExpandTo4ch:
+
+ def test_returns_four_values(self):
+ result = expand_to_4ch(1.0, -1.0)
+ assert len(result) == 4
+
+ def test_left_pair(self):
+ vfl, vlr, vrf, vrr = expand_to_4ch(0.5, 1.0)
+ assert vfl == pytest.approx(0.5)
+ assert vlr == pytest.approx(0.5)
+
+ def test_right_pair(self):
+ vfl, vlr, vrf, vrr = expand_to_4ch(0.5, 1.0)
+ assert vrf == pytest.approx(1.0)
+ assert vrr == pytest.approx(1.0)
+
+ def test_zero_inputs(self):
+ assert expand_to_4ch(0.0, 0.0) == (0.0, 0.0, 0.0, 0.0)
+
+ def test_negative(self):
+ vfl, vlr, vrf, vrr = expand_to_4ch(-0.5, -0.5)
+ assert all(v == pytest.approx(-0.5) for v in (vfl, vlr, vrf, vrr))
+
+
+# ─── odometry_from_track_speeds ──────────────────────────────────────────────
+
+class TestOdometryFromTrackSpeeds:
+
+ def test_straight_no_slip(self):
+ lin, ang = odometry_from_track_speeds(1.0, 1.0, 0.30, slip_factor=0.0)
+ assert lin == pytest.approx(1.0)
+ assert ang == pytest.approx(0.0)
+
+ def test_point_turn_ccw_no_slip(self):
+ vl, vr = -0.15, 0.15
+ lin, ang = odometry_from_track_speeds(vl, vr, 0.30, slip_factor=0.0)
+ assert lin == pytest.approx(0.0)
+ assert ang == pytest.approx(1.0)
+
+ def test_slip_reduces_angular_estimate(self):
+ """With slip, measured angular is scaled down."""
+ lin0, ang0 = odometry_from_track_speeds(-0.15, 0.15, 0.30, slip_factor=0.0)
+ lin3, ang3 = odometry_from_track_speeds(-0.15, 0.15, 0.30, slip_factor=0.3)
+ assert abs(ang3) < abs(ang0)
+
+ def test_known_slip_value(self):
+ """slip=0.5 → angular halved."""
+ lin, ang = odometry_from_track_speeds(-0.15, 0.15, 0.30, slip_factor=0.5)
+ # (0.15 - -0.15) / 0.30 * (1 - 0.5) = 1.0 * 0.5
+ assert ang == pytest.approx(0.5)
+
+ def test_zero_track_width_no_crash(self):
+ lin, ang = odometry_from_track_speeds(1.0, 1.0, 0.0, slip_factor=0.0)
+ assert lin == pytest.approx(1.0)
+ assert ang == pytest.approx(0.0)
+
+ def test_zero_inputs(self):
+ lin, ang = odometry_from_track_speeds(0.0, 0.0, 0.30, slip_factor=0.0)
+ assert lin == pytest.approx(0.0)
+ assert ang == pytest.approx(0.0)
+
+ def test_reverse_straight(self):
+ lin, ang = odometry_from_track_speeds(-1.0, -1.0, 0.30, slip_factor=0.0)
+ assert lin == pytest.approx(-1.0)
+ assert ang == pytest.approx(0.0)
+
+
+# ─── Integration: command ↔ odometry round-trip ──────────────────────────────
+
+class TestRoundTrip:
+
+ @pytest.mark.parametrize("slip", [0.0, 0.1, 0.3, 0.5])
+ @pytest.mark.parametrize("lin,ang", [
+ (1.0, 0.0),
+ (0.0, 1.0),
+ (0.5, 0.5),
+ (-0.5, 0.3),
+ ])
+ def test_roundtrip_tracked(self, lin, ang, slip):
+ """
+ Commanding (lin, ang) then estimating back via odometry should recover
+ (lin, ang) for all slip values, provided the speeds are unclipped.
+ """
+ track = 0.30
+ max_speed = 5.0 # generous to avoid clipping in test
+ vl, vr = compute_track_speeds(lin, ang, track, max_speed,
+ drive_mode="tracked", slip_factor=slip)
+ lin_out, ang_out = odometry_from_track_speeds(vl, vr, track, slip_factor=slip)
+ assert lin_out == pytest.approx(lin, abs=1e-9)
+ assert ang_out == pytest.approx(ang, abs=1e-9)
+
+ def test_zero_pivot_pwm_symmetric(self):
+ """Zero-radius pivot: left and right PWMs are symmetric around neutral."""
+ vl, vr = compute_track_speeds(0.0, 1.0, 0.30, 1.5,
+ drive_mode="tracked", slip_factor=0.0)
+ pwm_l = speed_to_pwm(vl, 1.5, 1500, 500)
+ pwm_r = speed_to_pwm(vr, 1.5, 1500, 500)
+ assert pwm_l + pwm_r == 3000 # symmetric: 1500 - x + 1500 + x
+
+ def test_all_modes_forward_equal(self):
+ """Straight forward: all drive modes produce the same speeds."""
+ for mode in DRIVE_MODES:
+ vl, vr = compute_track_speeds(1.0, 0.0, 0.30, 1.5, drive_mode=mode)
+ assert vl == pytest.approx(1.0)
+ assert vr == pytest.approx(1.0)
+
+ def test_4ch_expansion_pwm(self):
+ """4-channel expansion: all channels driven during straight motion."""
+ vl, vr = compute_track_speeds(1.0, 0.0, 0.30, 1.5, drive_mode="4wd")
+ pwms = [speed_to_pwm(v, 1.5, 1500, 500) for v in expand_to_4ch(vl, vr)]
+ assert all(p > 1500 for p in pwms)
+
+ def test_pwm_neutral_when_stopped(self):
+ vl, vr = compute_track_speeds(0.0, 0.0, 0.30, 1.5, drive_mode="tracked")
+ for v in (vl, vr):
+ assert speed_to_pwm(v, 1.5, 1500, 500) == 1500