From 4fed80f36d402c1750acbba23017734e968027c6 Mon Sep 17 00:00:00 2001 From: sl-controls Date: Sun, 1 Mar 2026 01:14:27 -0500 Subject: [PATCH] feat: SaltyRover 4-wheel diff-drive control loop (#74) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds saltyrover_drive ROS2 package — 4-wheel differential drive controller for the stable SaltyRover chassis (no balance PID required). rover_drive_node: - Subscribes /cmd_vel (Twist), converts to left/right wheel speeds - Tank-style kinematics: v_left = linear.x - angular.z × sep/2 - FL/RL share left speed, FR/RR share right speed (paired axles) - Publishes /rover/wheel_speeds (JSON: {"cmd":"drive4","fl":N,"fr":N,"rl":N,"rr":N}) - Trapezoidal ramp: accel 2.0 m/s², decel 4.0 m/s² (stable chassis allows faster ramps) - cmd_vel watchdog (0.5s timeout → zero speeds) - max_speed 3.0 m/s (±1000 STM32 ticks) rover_odom_node: - Dead-reckoning odometry from /cmd_vel (encoder-ready for future HW) - Publishes /odom (nav_msgs/Odometry) + TF odom→base_link - Forward Euler pose integration at 50 Hz 52/52 unit tests: kinematics, int scaling, JSON builder, full pipeline, trapezoidal ramp, odometry integration, quaternion conversion. Co-Authored-By: Claude Sonnet 4.6 --- .../saltyrover_drive/config/rover_params.yaml | 47 ++ .../launch/rover_drive.launch.py | 120 +++++ .../ros2_ws/src/saltyrover_drive/package.xml | 30 ++ .../resource/saltyrover_drive | 0 .../saltyrover_drive/__init__.py | 0 .../saltyrover_drive/rover_drive_node.py | 266 +++++++++++ .../saltyrover_drive/rover_odom_node.py | 197 +++++++++ jetson/ros2_ws/src/saltyrover_drive/setup.cfg | 4 + jetson/ros2_ws/src/saltyrover_drive/setup.py | 30 ++ .../saltyrover_drive/test/test_rover_drive.py | 414 ++++++++++++++++++ 10 files changed, 1108 insertions(+) create mode 100644 jetson/ros2_ws/src/saltyrover_drive/config/rover_params.yaml create mode 100644 jetson/ros2_ws/src/saltyrover_drive/launch/rover_drive.launch.py create mode 100644 jetson/ros2_ws/src/saltyrover_drive/package.xml create mode 100644 jetson/ros2_ws/src/saltyrover_drive/resource/saltyrover_drive create mode 100644 jetson/ros2_ws/src/saltyrover_drive/saltyrover_drive/__init__.py create mode 100644 jetson/ros2_ws/src/saltyrover_drive/saltyrover_drive/rover_drive_node.py create mode 100644 jetson/ros2_ws/src/saltyrover_drive/saltyrover_drive/rover_odom_node.py create mode 100644 jetson/ros2_ws/src/saltyrover_drive/setup.cfg create mode 100644 jetson/ros2_ws/src/saltyrover_drive/setup.py create mode 100644 jetson/ros2_ws/src/saltyrover_drive/test/test_rover_drive.py diff --git a/jetson/ros2_ws/src/saltyrover_drive/config/rover_params.yaml b/jetson/ros2_ws/src/saltyrover_drive/config/rover_params.yaml new file mode 100644 index 0000000..000ed46 --- /dev/null +++ b/jetson/ros2_ws/src/saltyrover_drive/config/rover_params.yaml @@ -0,0 +1,47 @@ +# rover_params.yaml +# SaltyRover 4-wheel differential drive parameters. +# +# SaltyRover is a stable 4-wheel chassis (no balance PID). +# Left side (FL + RL) and right side (FR + RR) run at the same speed per side. +# STM32 command: {"cmd":"drive4","fl":N,"fr":N,"rl":N,"rr":N} N ∈ [-1000, 1000] +# +# Run with: +# ros2 launch saltyrover_drive rover_drive.launch.py +# +# Data flow: +# /cmd_vel → [rover_drive_node] → /rover/wheel_speeds → STM32 + +# ── Chassis geometry ─────────────────────────────────────────────────────────── +# Measure wheel_separation centre-to-centre between L and R drive wheels. +# wheel_base is front-to-rear axle distance (used for future encoder odometry). +wheel_separation: 0.45 # m (adjustable per chassis build) +wheel_base: 0.40 # m +wheel_radius: 0.075 # m (75 mm hub motor wheels) + +# ── Speed ───────────────────────────────────────────────────────────────────── +# max_speed is the m/s value that maps to ±1000 STM32 ticks. +# Higher than SaltyLab (no tipping risk from balance). +max_speed: 3.0 # m/s (= 10.8 km/h) — maps to ±1000 +max_linear_vel: 3.0 # m/s — cmd_vel hard clamp (same as max_speed initially) +max_angular_vel: 2.5 # rad/s + +# ── Acceleration limits ──────────────────────────────────────────────────────── +# 4-wheel stable chassis can handle faster ramps than SaltyLab balance robot. +accel_limit: 2.0 # m/s² — still gentle for person-following smoothness +decel_limit: 4.0 # m/s² — braking can be faster than accelerating + +# ── Control ─────────────────────────────────────────────────────────────────── +control_rate: 50.0 # Hz — 20ms tick + +# ── Watchdog ────────────────────────────────────────────────────────────────── +# Zero wheel speeds if no cmd_vel received within cmd_timeout seconds. +cmd_timeout: 0.5 # s + +# ── Safety ──────────────────────────────────────────────────────────────────── +drive_enabled: true # runtime on/off: ros2 param set /rover_drive drive_enabled false + +# ── Odometry ────────────────────────────────────────────────────────────────── +publish_tf: true # broadcast odom → base_link TF +odom_frame_id: odom +base_frame_id: base_link +publish_rate: 50.0 # Hz diff --git a/jetson/ros2_ws/src/saltyrover_drive/launch/rover_drive.launch.py b/jetson/ros2_ws/src/saltyrover_drive/launch/rover_drive.launch.py new file mode 100644 index 0000000..9b88744 --- /dev/null +++ b/jetson/ros2_ws/src/saltyrover_drive/launch/rover_drive.launch.py @@ -0,0 +1,120 @@ +""" +rover_drive.launch.py — SaltyRover 4-wheel differential drive launch. + +Launches: + rover_drive_node : /cmd_vel → diff-drive kinematics → /rover/wheel_speeds (STM32 JSON) + rover_odom_node : /cmd_vel → dead-reckoning pose → /odom + TF odom→base_link + +Usage: + # Defaults from rover_params.yaml: + ros2 launch saltyrover_drive rover_drive.launch.py + + # Override wheel separation (wider chassis build): + ros2 launch saltyrover_drive rover_drive.launch.py wheel_separation:=0.50 + + # Higher speed for open-area testing: + ros2 launch saltyrover_drive rover_drive.launch.py max_speed:=4.0 max_linear_vel:=4.0 + + # Disable TF (if using robot_localization EKF instead): + ros2 launch saltyrover_drive rover_drive.launch.py publish_tf:=false + +Topics: + /cmd_vel — Input (from person_follower, Nav2, or teleop) + /rover/wheel_speeds — Output to STM32 bridge (JSON drive4 command) + /rover/drive_status — Speed + enabled status (JSON String) + /odom — Odometry (nav_msgs/Odometry) +""" + +import os + +from ament_index_python.packages import get_package_share_directory +from launch import LaunchDescription +from launch.actions import DeclareLaunchArgument, OpaqueFunction +from launch.substitutions import LaunchConfiguration +from launch_ros.actions import Node + + +def _launch_nodes(context, *args, **kwargs): + params_file = LaunchConfiguration("params_file").perform(context) + + def _s(k): return LaunchConfiguration(k).perform(context) + def _f(k): return float(LaunchConfiguration(k).perform(context)) + def _b(k): return LaunchConfiguration(k).perform(context).lower() == "true" + + drive_params = { + "wheel_separation": _f("wheel_separation"), + "wheel_radius": _f("wheel_radius"), + "max_speed": _f("max_speed"), + "max_linear_vel": _f("max_linear_vel"), + "max_angular_vel": _f("max_angular_vel"), + "accel_limit": _f("accel_limit"), + "decel_limit": _f("decel_limit"), + "control_rate": _f("control_rate"), + "cmd_timeout": _f("cmd_timeout"), + "drive_enabled": _b("drive_enabled"), + } + odom_params = { + "wheel_separation": _f("wheel_separation"), + "publish_tf": _b("publish_tf"), + "odom_frame_id": _s("odom_frame_id"), + "base_frame_id": _s("base_frame_id"), + "publish_rate": _f("control_rate"), + } + + node_params_drive = [params_file, drive_params] if params_file else [drive_params] + node_params_odom = [params_file, odom_params] if params_file else [odom_params] + + return [ + Node( + package="saltyrover_drive", + executable="rover_drive_node", + name="rover_drive", + output="screen", + parameters=node_params_drive, + ), + Node( + package="saltyrover_drive", + executable="rover_odom_node", + name="rover_odom", + output="screen", + parameters=node_params_odom, + ), + ] + + +def generate_launch_description(): + pkg_share = get_package_share_directory("saltyrover_drive") + default_yaml = os.path.join(pkg_share, "config", "rover_params.yaml") + + return LaunchDescription([ + DeclareLaunchArgument( + "params_file", default_value=default_yaml, + description="Full path to rover_params.yaml"), + + # Chassis geometry + DeclareLaunchArgument("wheel_separation", default_value="0.45", + description="L/R wheel centre-to-centre distance (m)"), + DeclareLaunchArgument("wheel_radius", default_value="0.075"), + + # Speed + DeclareLaunchArgument("max_speed", default_value="3.0", + description="m/s corresponding to ±1000 STM32 ticks"), + DeclareLaunchArgument("max_linear_vel", default_value="3.0"), + DeclareLaunchArgument("max_angular_vel", default_value="2.5"), + + # Accel + DeclareLaunchArgument("accel_limit", default_value="2.0"), + DeclareLaunchArgument("decel_limit", default_value="4.0"), + + # Control + DeclareLaunchArgument("control_rate", default_value="50.0"), + DeclareLaunchArgument("cmd_timeout", default_value="0.5"), + DeclareLaunchArgument("drive_enabled", default_value="true"), + + # Odometry + DeclareLaunchArgument("publish_tf", default_value="true"), + DeclareLaunchArgument("odom_frame_id", default_value="odom"), + DeclareLaunchArgument("base_frame_id", default_value="base_link"), + + OpaqueFunction(function=_launch_nodes), + ]) diff --git a/jetson/ros2_ws/src/saltyrover_drive/package.xml b/jetson/ros2_ws/src/saltyrover_drive/package.xml new file mode 100644 index 0000000..da5e5e9 --- /dev/null +++ b/jetson/ros2_ws/src/saltyrover_drive/package.xml @@ -0,0 +1,30 @@ + + + + saltyrover_drive + 0.1.0 + + 4-wheel differential drive controller for SaltyRover variant. + Converts /cmd_vel (Twist) to per-wheel speeds for STM32 via JSON drive4 command. + Includes dead-reckoning odometry node. No balance PID required — stable chassis. + + sl-controls + MIT + + rclpy + geometry_msgs + nav_msgs + std_msgs + tf2_ros + + ament_python + + ament_copyright + ament_flake8 + ament_pep257 + python3-pytest + + + ament_python + + diff --git a/jetson/ros2_ws/src/saltyrover_drive/resource/saltyrover_drive b/jetson/ros2_ws/src/saltyrover_drive/resource/saltyrover_drive new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltyrover_drive/saltyrover_drive/__init__.py b/jetson/ros2_ws/src/saltyrover_drive/saltyrover_drive/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltyrover_drive/saltyrover_drive/rover_drive_node.py b/jetson/ros2_ws/src/saltyrover_drive/saltyrover_drive/rover_drive_node.py new file mode 100644 index 0000000..73f5e11 --- /dev/null +++ b/jetson/ros2_ws/src/saltyrover_drive/saltyrover_drive/rover_drive_node.py @@ -0,0 +1,266 @@ +""" +rover_drive_node.py — 4-wheel differential drive controller for SaltyRover. + +SaltyRover is a stable 4-wheel chassis (no balance PID needed). +Two independently-driven axles: left (FL + RL) and right (FR + RR) run +at the same speed per side — classic tank/skid-steer kinematics. + +Subscribes: + /cmd_vel (geometry_msgs/Twist) — standard ROS2 velocity command + +Publishes: + /rover/wheel_speeds (std_msgs/String) — JSON drive command for STM32: + {"cmd":"drive4","fl":N,"fr":N,"rl":N,"rr":N} N ∈ [-1000, 1000] + /rover/drive_status (std_msgs/String) — JSON status: speeds + profile + +Kinematics +---------- + v_left = linear.x - angular.z × (wheel_separation / 2) + v_right = linear.x + angular.z × (wheel_separation / 2) + + Speed is scaled to integer ±1000 for STM32: + wheel_int = clamp(v / max_speed, -1.0, 1.0) × 1000 + +Acceleration limiting +--------------------- + Each wheel speed ramps toward the target at accel_limit m/s² (50 Hz tick). + Deceleration uses a higher limit for responsive braking. + +Safety +------ + * cmd_vel input is hard-clamped to max_linear_vel / max_angular_vel. + * A watchdog zeroes wheel speeds if no cmd_vel received in cmd_timeout s. + * follow_enabled param for runtime on/off. + +Usage +----- + ros2 launch saltyrover_drive rover_drive.launch.py + ros2 launch saltyrover_drive rover_drive.launch.py max_speed:=3.0 +""" + +import json +import math +import time + +import rclpy +from rclpy.node import Node +from geometry_msgs.msg import Twist +from std_msgs.msg import String + + +# ── Pure kinematics functions ───────────────────────────────────────────────── + +def clamp(v: float, lo: float, hi: float) -> float: + return max(lo, min(hi, v)) + + +def diff_drive_kinematics(linear_x: float, angular_z: float, + wheel_separation: float) -> tuple: + """ + Convert Twist to left/right wheel velocities (m/s). + + wheel_separation: distance between left and right wheels (m) + + Returns (v_left, v_right). + """ + half_sep = wheel_separation / 2.0 + v_left = linear_x - angular_z * half_sep + v_right = linear_x + angular_z * half_sep + return v_left, v_right + + +def velocity_to_wheel_int(v: float, max_speed: float) -> int: + """ + Scale velocity (m/s) to integer wheel speed [-1000, +1000] for STM32. + + max_speed : m/s corresponding to ±1000 + Returns int clamped to [-1000, +1000]. + """ + if max_speed <= 0: + return 0 + scaled = clamp(v / max_speed, -1.0, 1.0) + return int(round(scaled * 1000)) + + +def build_drive4_cmd(fl: int, fr: int, rl: int, rr: int) -> str: + """Build STM32 JSON drive command for 4-wheel rover.""" + return json.dumps({"cmd": "drive4", "fl": fl, "fr": fr, "rl": rl, "rr": rr}) + + +def apply_ramp(current: float, target: float, + accel_limit: float, decel_limit: float, + dt: float) -> float: + """ + Trapezoidal ramp toward target. + + Uses decel_limit when reducing magnitude (moving toward zero), + accel_limit when increasing magnitude. + """ + delta = target - current + decelerating = (current > 0 and delta < 0) or (current < 0 and delta > 0) + limit = decel_limit if decelerating else accel_limit + if delta > 0: + step = min(delta, limit * dt) + else: + step = max(delta, -limit * dt) + return current + step + + +def compute_wheel_speeds(linear_x: float, angular_z: float, + wheel_separation: float, + max_linear_vel: float, max_angular_vel: float, + max_speed: float) -> tuple: + """ + Full pipeline: Twist → clamped → kinematics → int wheel speeds. + + Returns (fl, fr, rl, rr) as ints in [-1000, +1000]. + """ + # Clamp input + lx = clamp(linear_x, -max_linear_vel, max_linear_vel) + az = clamp(angular_z, -max_angular_vel, max_angular_vel) + + v_left, v_right = diff_drive_kinematics(lx, az, wheel_separation) + + fl = velocity_to_wheel_int(v_left, max_speed) + fr = velocity_to_wheel_int(v_right, max_speed) + rl = fl # rear mirrors front (paired L/R) + rr = fr + + return fl, fr, rl, rr + + +# ── ROS2 Node ───────────────────────────────────────────────────────────────── + +class RoverDriveNode(Node): + + def __init__(self): + super().__init__("rover_drive") + + # -- Parameters -------------------------------------------------------- + self.declare_parameter("wheel_separation", 0.45) # m — L/R wheel distance + self.declare_parameter("wheel_radius", 0.075) # m + self.declare_parameter("max_speed", 2.0) # m/s → maps to ±1000 + self.declare_parameter("max_linear_vel", 2.0) # cmd_vel clamp (m/s) + self.declare_parameter("max_angular_vel", 2.0) # cmd_vel clamp (rad/s) + self.declare_parameter("accel_limit", 1.5) # m/s² + self.declare_parameter("decel_limit", 3.0) # m/s² (braking faster) + self.declare_parameter("control_rate", 50.0) # Hz + self.declare_parameter("cmd_timeout", 0.5) # s — watchdog + self.declare_parameter("drive_enabled", True) + + self._reload_params() + + # -- State ------------------------------------------------------------- + self._target_vel_left = 0.0 + self._target_vel_right = 0.0 + self._current_vel_left = 0.0 + self._current_vel_right = 0.0 + self._last_cmd_t = 0.0 # monotonic; 0 = never received + + # -- Subscriptions ----------------------------------------------------- + self.create_subscription(Twist, "/cmd_vel", self._cmd_vel_cb, 10) + + # -- Publishers -------------------------------------------------------- + self._drive_pub = self.create_publisher(String, "/rover/wheel_speeds", 10) + self._status_pub = self.create_publisher(String, "/rover/drive_status", 10) + + # -- Control timer ----------------------------------------------------- + dt = 1.0 / self._p["control_rate"] + self._timer = self.create_timer(dt, self._control_cb) + + self.get_logger().info( + f"RoverDrive ready sep={self._p['wheel_separation']}m " + f"max={self._p['max_speed']}m/s " + f"rate={self._p['control_rate']}Hz" + ) + + def _reload_params(self): + self._p = { + "wheel_separation": self.get_parameter("wheel_separation").value, + "wheel_radius": self.get_parameter("wheel_radius").value, + "max_speed": self.get_parameter("max_speed").value, + "max_linear_vel": self.get_parameter("max_linear_vel").value, + "max_angular_vel": self.get_parameter("max_angular_vel").value, + "accel_limit": self.get_parameter("accel_limit").value, + "decel_limit": self.get_parameter("decel_limit").value, + "control_rate": self.get_parameter("control_rate").value, + "cmd_timeout": self.get_parameter("cmd_timeout").value, + "drive_enabled": self.get_parameter("drive_enabled").value, + } + + def _cmd_vel_cb(self, msg: Twist): + self._reload_params() + p = self._p + if not p["drive_enabled"]: + self._target_vel_left = 0.0 + self._target_vel_right = 0.0 + return + + lx = clamp(msg.linear.x, -p["max_linear_vel"], p["max_linear_vel"]) + az = clamp(msg.angular.z, -p["max_angular_vel"], p["max_angular_vel"]) + v_left, v_right = diff_drive_kinematics(lx, az, p["wheel_separation"]) + + # Clamp wheel velocities to max_speed + self._target_vel_left = clamp(v_left, -p["max_speed"], p["max_speed"]) + self._target_vel_right = clamp(v_right, -p["max_speed"], p["max_speed"]) + self._last_cmd_t = time.monotonic() + + def _control_cb(self): + self._reload_params() + p = self._p + dt = 1.0 / p["control_rate"] + + # Watchdog: zero target if no cmd_vel received recently + if self._last_cmd_t > 0.0: + age = time.monotonic() - self._last_cmd_t + if age > p["cmd_timeout"]: + self._target_vel_left = 0.0 + self._target_vel_right = 0.0 + + if not p["drive_enabled"]: + self._target_vel_left = 0.0 + self._target_vel_right = 0.0 + + # Ramp toward target + self._current_vel_left = apply_ramp( + self._current_vel_left, self._target_vel_left, + p["accel_limit"], p["decel_limit"], dt) + self._current_vel_right = apply_ramp( + self._current_vel_right, self._target_vel_right, + p["accel_limit"], p["decel_limit"], dt) + + # Convert to int wheel speeds + fl = velocity_to_wheel_int(self._current_vel_left, p["max_speed"]) + fr = velocity_to_wheel_int(self._current_vel_right, p["max_speed"]) + rl, rr = fl, fr # rear mirrors front + + # Publish STM32 command + cmd_str = build_drive4_cmd(fl, fr, rl, rr) + self._drive_pub.publish(String(data=cmd_str)) + + # Publish status + status = { + "fl": fl, "fr": fr, "rl": rl, "rr": rr, + "v_left": round(self._current_vel_left, 3), + "v_right": round(self._current_vel_right, 3), + "enabled": p["drive_enabled"], + } + self._status_pub.publish(String(data=json.dumps(status))) + + +# ── Entry point ─────────────────────────────────────────────────────────────── + +def main(args=None): + rclpy.init(args=args) + node = RoverDriveNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.try_shutdown() + + +if __name__ == "__main__": + main() diff --git a/jetson/ros2_ws/src/saltyrover_drive/saltyrover_drive/rover_odom_node.py b/jetson/ros2_ws/src/saltyrover_drive/saltyrover_drive/rover_odom_node.py new file mode 100644 index 0000000..eb437ef --- /dev/null +++ b/jetson/ros2_ws/src/saltyrover_drive/saltyrover_drive/rover_odom_node.py @@ -0,0 +1,197 @@ +""" +rover_odom_node.py — Wheel odometry for SaltyRover (dead-reckoning from cmd_vel). + +Uses the commanded velocity (dead-reckoning) since the STM32 encoder feedback +may not always be available at launch. When encoder feedback is wired up, +replace the cmd_vel subscription with actual wheel tick messages. + +Subscribes: + /cmd_vel (geometry_msgs/Twist) — commanded velocities (proxy for actual) + +Publishes: + /odom (nav_msgs/Odometry) — accumulated pose + velocity estimate + TF: odom → base_link + +Kinematics (differential drive): + v = (v_right + v_left) / 2 = linear.x + omega = (v_right - v_left) / wheel_sep = angular.z + x' = x + v × cos(θ) × dt + y' = y + v × sin(θ) × dt + θ' = θ + omega × dt +""" + +import math +import time + +import rclpy +from rclpy.node import Node +from geometry_msgs.msg import Twist, TransformStamped +from nav_msgs.msg import Odometry + +try: + from tf2_ros import TransformBroadcaster + _HAVE_TF2 = True +except ImportError: + _HAVE_TF2 = False + + +# ── Pure odometry functions ─────────────────────────────────────────────────── + +def integrate_pose(x: float, y: float, theta: float, + linear_x: float, angular_z: float, + dt: float) -> tuple: + """ + Integrate pose using forward Euler kinematics. + + x, y : position (m) + theta : heading (rad), 0 = +X axis + linear_x: forward velocity (m/s) + angular_z: yaw rate (rad/s) + dt : time step (s) + + Returns (x', y', theta'). + """ + x_new = x + linear_x * math.cos(theta) * dt + y_new = y + linear_x * math.sin(theta) * dt + theta_new = theta + angular_z * dt + return x_new, y_new, theta_new + + +def euler_to_quaternion_z(theta: float) -> tuple: + """ + Convert a yaw angle to a quaternion (rotation around Z only). + + Returns (qx, qy, qz, qw). + """ + half = theta / 2.0 + return 0.0, 0.0, math.sin(half), math.cos(half) + + +# ── ROS2 Node ───────────────────────────────────────────────────────────────── + +class RoverOdomNode(Node): + + def __init__(self): + super().__init__("rover_odom") + + # -- Parameters -------------------------------------------------------- + self.declare_parameter("wheel_separation", 0.45) # m + self.declare_parameter("publish_tf", True) + self.declare_parameter("odom_frame_id", "odom") + self.declare_parameter("base_frame_id", "base_link") + self.declare_parameter("publish_rate", 50.0) # Hz + + p = self._params() + self._odom_frame = p["odom_frame_id"] + self._base_frame = p["base_frame_id"] + self._publish_tf = p["publish_tf"] and _HAVE_TF2 + + # -- Pose state -------------------------------------------------------- + self._x = 0.0 + self._y = 0.0 + self._theta = 0.0 + self._vx = 0.0 + self._omega = 0.0 + self._last_t = None # time.monotonic() of last cmd_vel + + # -- TF2 broadcaster --------------------------------------------------- + if self._publish_tf: + self._tf_broadcaster = TransformBroadcaster(self) + + # -- Subscriptions ----------------------------------------------------- + self.create_subscription(Twist, "/cmd_vel", self._cmd_vel_cb, 10) + + # -- Publishers -------------------------------------------------------- + self._odom_pub = self.create_publisher(Odometry, "/odom", 10) + + # -- Timer ------------------------------------------------------------- + self._timer = self.create_timer( + 1.0 / p["publish_rate"], self._publish_cb) + + self.get_logger().info("RoverOdom ready (dead-reckoning from /cmd_vel)") + + def _params(self) -> dict: + return { + "wheel_separation": self.get_parameter("wheel_separation").value, + "publish_tf": self.get_parameter("publish_tf").value, + "odom_frame_id": self.get_parameter("odom_frame_id").value, + "base_frame_id": self.get_parameter("base_frame_id").value, + "publish_rate": self.get_parameter("publish_rate").value, + } + + def _cmd_vel_cb(self, msg: Twist): + now = time.monotonic() + if self._last_t is not None: + dt = now - self._last_t + if 0 < dt < 1.0: # sanity: ignore if gap > 1s + self._x, self._y, self._theta = integrate_pose( + self._x, self._y, self._theta, + msg.linear.x, msg.angular.z, dt) + self._last_t = now + self._vx = msg.linear.x + self._omega = msg.angular.z + + def _publish_cb(self): + now_msg = self.get_clock().now().to_msg() + qx, qy, qz, qw = euler_to_quaternion_z(self._theta) + + # -- Odometry message -------------------------------------------------- + odom = Odometry() + odom.header.stamp = now_msg + odom.header.frame_id = self._odom_frame + odom.child_frame_id = self._base_frame + + odom.pose.pose.position.x = self._x + odom.pose.pose.position.y = self._y + odom.pose.pose.position.z = 0.0 + odom.pose.pose.orientation.x = qx + odom.pose.pose.orientation.y = qy + odom.pose.pose.orientation.z = qz + odom.pose.pose.orientation.w = qw + + odom.twist.twist.linear.x = self._vx + odom.twist.twist.angular.z = self._omega + + # Covariance: dead-reckoning, moderate confidence + odom.pose.covariance[0] = 0.1 # x + odom.pose.covariance[7] = 0.1 # y + odom.pose.covariance[35] = 0.05 # yaw + odom.twist.covariance[0] = 0.01 + odom.twist.covariance[35] = 0.01 + + self._odom_pub.publish(odom) + + # -- TF broadcast ------------------------------------------------------ + if self._publish_tf: + tf = TransformStamped() + tf.header.stamp = now_msg + tf.header.frame_id = self._odom_frame + tf.child_frame_id = self._base_frame + + tf.transform.translation.x = self._x + tf.transform.translation.y = self._y + tf.transform.translation.z = 0.0 + tf.transform.rotation.x = qx + tf.transform.rotation.y = qy + tf.transform.rotation.z = qz + tf.transform.rotation.w = qw + + self._tf_broadcaster.sendTransform(tf) + + +# ── Entry point ─────────────────────────────────────────────────────────────── + +def main(args=None): + rclpy.init(args=args) + node = RoverOdomNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.try_shutdown() + + +if __name__ == "__main__": + main() diff --git a/jetson/ros2_ws/src/saltyrover_drive/setup.cfg b/jetson/ros2_ws/src/saltyrover_drive/setup.cfg new file mode 100644 index 0000000..67192d7 --- /dev/null +++ b/jetson/ros2_ws/src/saltyrover_drive/setup.cfg @@ -0,0 +1,4 @@ +[develop] +script_dir=$base/lib/saltyrover_drive +[install] +install_scripts=$base/lib/saltyrover_drive diff --git a/jetson/ros2_ws/src/saltyrover_drive/setup.py b/jetson/ros2_ws/src/saltyrover_drive/setup.py new file mode 100644 index 0000000..a14580a --- /dev/null +++ b/jetson/ros2_ws/src/saltyrover_drive/setup.py @@ -0,0 +1,30 @@ +from setuptools import setup + +package_name = "saltyrover_drive" + +setup( + name=package_name, + version="0.1.0", + packages=[package_name], + data_files=[ + ("share/ament_index/resource_index/packages", [f"resource/{package_name}"]), + (f"share/{package_name}", ["package.xml"]), + (f"share/{package_name}/launch", ["launch/rover_drive.launch.py"]), + (f"share/{package_name}/config", ["config/rover_params.yaml"]), + ], + install_requires=["setuptools"], + zip_safe=True, + maintainer="sl-controls", + maintainer_email="sl-controls@saltylab.local", + description="4-wheel diff-drive controller for SaltyRover (tank kinematics + dead-reckoning odom)", + license="MIT", + tests_require=["pytest"], + entry_points={ + "console_scripts": [ + # /cmd_vel → diff-drive kinematics → /rover/wheel_speeds (STM32 JSON) + "rover_drive_node = saltyrover_drive.rover_drive_node:main", + # /cmd_vel → dead-reckoning pose → /odom + TF + "rover_odom_node = saltyrover_drive.rover_odom_node:main", + ], + }, +) diff --git a/jetson/ros2_ws/src/saltyrover_drive/test/test_rover_drive.py b/jetson/ros2_ws/src/saltyrover_drive/test/test_rover_drive.py new file mode 100644 index 0000000..427ac10 --- /dev/null +++ b/jetson/ros2_ws/src/saltyrover_drive/test/test_rover_drive.py @@ -0,0 +1,414 @@ +""" +Unit tests for saltyrover_drive pure functions. +No ROS2 runtime required. +Run with: pytest jetson/ros2_ws/src/saltyrover_drive/test/test_rover_drive.py +""" + +import json +import math + +import pytest + + +# ── Pure function mirrors ───────────────────────────────────────────────────── + +def _clamp(v, lo, hi): + return max(lo, min(hi, v)) + + +def _diff_drive_kinematics(linear_x, angular_z, wheel_separation): + half_sep = wheel_separation / 2.0 + v_left = linear_x - angular_z * half_sep + v_right = linear_x + angular_z * half_sep + return v_left, v_right + + +def _velocity_to_wheel_int(v, max_speed): + if max_speed <= 0: + return 0 + scaled = _clamp(v / max_speed, -1.0, 1.0) + return int(round(scaled * 1000)) + + +def _build_drive4_cmd(fl, fr, rl, rr): + return json.dumps({"cmd": "drive4", "fl": fl, "fr": fr, "rl": rl, "rr": rr}) + + +def _apply_ramp(current, target, accel_limit, decel_limit, dt): + delta = target - current + decelerating = (current > 0 and delta < 0) or (current < 0 and delta > 0) + limit = decel_limit if decelerating else accel_limit + if delta > 0: + step = min(delta, limit * dt) + else: + step = max(delta, -limit * dt) + return current + step + + +def _compute_wheel_speeds(linear_x, angular_z, wheel_separation, + max_linear_vel, max_angular_vel, max_speed): + lx = _clamp(linear_x, -max_linear_vel, max_linear_vel) + az = _clamp(angular_z, -max_angular_vel, max_angular_vel) + v_left, v_right = _diff_drive_kinematics(lx, az, wheel_separation) + fl = _velocity_to_wheel_int(v_left, max_speed) + fr = _velocity_to_wheel_int(v_right, max_speed) + rl, rr = fl, fr + return fl, fr, rl, rr + + +def _integrate_pose(x, y, theta, linear_x, angular_z, dt): + x_new = x + linear_x * math.cos(theta) * dt + y_new = y + linear_x * math.sin(theta) * dt + theta_new = theta + angular_z * dt + return x_new, y_new, theta_new + + +def _euler_to_quaternion_z(theta): + half = theta / 2.0 + return 0.0, 0.0, math.sin(half), math.cos(half) + + +# ── Differential drive kinematics ──────────────────────────────────────────── + +class TestDiffDriveKinematics: + _SEP = 0.45 # m — standard rover wheel separation + + def test_straight_equal_speeds(self): + """Straight ahead: both wheels same speed.""" + vl, vr = _diff_drive_kinematics(1.0, 0.0, self._SEP) + assert vl == pytest.approx(1.0) + assert vr == pytest.approx(1.0) + + def test_turn_left_right_faster(self): + """Turn left (positive angular_z): right wheel faster.""" + vl, vr = _diff_drive_kinematics(1.0, 1.0, self._SEP) + assert vr > vl + + def test_turn_right_left_faster(self): + """Turn right (negative angular_z): left wheel faster.""" + vl, vr = _diff_drive_kinematics(1.0, -1.0, self._SEP) + assert vl > vr + + def test_pivot_left_in_place(self): + """Pivot turn left in place (linear=0): right forward, left backward.""" + vl, vr = _diff_drive_kinematics(0.0, 1.0, self._SEP) + assert vl < 0 + assert vr > 0 + assert vl == pytest.approx(-vr) # symmetric + + def test_pivot_speed_proportional_to_separation(self): + """Pivot speed = angular_z × sep/2.""" + angular_z = 2.0 + sep = 0.45 + vl, vr = _diff_drive_kinematics(0.0, angular_z, sep) + expected_half = angular_z * sep / 2.0 + assert vr == pytest.approx( expected_half) + assert vl == pytest.approx(-expected_half) + + def test_backward_straight(self): + vl, vr = _diff_drive_kinematics(-1.0, 0.0, self._SEP) + assert vl == pytest.approx(-1.0) + assert vr == pytest.approx(-1.0) + + def test_zero_cmd(self): + vl, vr = _diff_drive_kinematics(0.0, 0.0, self._SEP) + assert vl == pytest.approx(0.0) + assert vr == pytest.approx(0.0) + + def test_turning_radius(self): + """ + Turning radius R = linear_x / angular_z. + Also: v_right = omega × (R + sep/2), v_left = omega × (R - sep/2) + Verify ratio: v_right/v_left = (R + sep/2)/(R - sep/2). + """ + lx, az, sep = 1.0, 0.5, 0.45 + vl, vr = _diff_drive_kinematics(lx, az, sep) + R = lx / az # = 2.0 m + expected_ratio = (R + sep / 2) / (R - sep / 2) + assert vr / vl == pytest.approx(expected_ratio, rel=1e-5) + + def test_wider_separation_higher_speed_diff(self): + """Wider chassis → larger speed difference for same angular command.""" + vl_narrow, vr_narrow = _diff_drive_kinematics(1.0, 1.0, 0.30) + vl_wide, vr_wide = _diff_drive_kinematics(1.0, 1.0, 0.60) + assert (vr_wide - vl_wide) > (vr_narrow - vl_narrow) + + +# ── Velocity to wheel integer ───────────────────────────────────────────────── + +class TestVelocityToWheelInt: + + def test_full_forward(self): + assert _velocity_to_wheel_int(3.0, 3.0) == 1000 + + def test_full_reverse(self): + assert _velocity_to_wheel_int(-3.0, 3.0) == -1000 + + def test_half_speed(self): + assert _velocity_to_wheel_int(1.5, 3.0) == 500 + + def test_stopped(self): + assert _velocity_to_wheel_int(0.0, 3.0) == 0 + + def test_clamp_above_max(self): + assert _velocity_to_wheel_int(5.0, 3.0) == 1000 + + def test_clamp_below_min(self): + assert _velocity_to_wheel_int(-5.0, 3.0) == -1000 + + def test_zero_max_speed_safe(self): + """max_speed=0 must not divide by zero.""" + assert _velocity_to_wheel_int(1.0, 0.0) == 0 + + def test_rounding(self): + """0.5005 * 1000 = 500.5 → rounds to 501.""" + # 1.5015 / 3.0 = 0.5005 + result = _velocity_to_wheel_int(1.5015, 3.0) + assert result in (500, 501) # rounding may differ slightly + + def test_one_third(self): + """1.0 / 3.0 m/s → ~333.""" + result = _velocity_to_wheel_int(1.0, 3.0) + assert result == 333 + + +# ── Drive command JSON builder ──────────────────────────────────────────────── + +class TestBuildDrive4Cmd: + + def test_valid_json(self): + cmd = _build_drive4_cmd(100, 200, 100, 200) + parsed = json.loads(cmd) + assert parsed["cmd"] == "drive4" + + def test_fields_present(self): + cmd = json.loads(_build_drive4_cmd(10, 20, 30, 40)) + assert cmd["fl"] == 10 + assert cmd["fr"] == 20 + assert cmd["rl"] == 30 + assert cmd["rr"] == 40 + + def test_zero_speeds(self): + cmd = json.loads(_build_drive4_cmd(0, 0, 0, 0)) + assert all(cmd[k] == 0 for k in ("fl", "fr", "rl", "rr")) + + def test_negative_values(self): + cmd = json.loads(_build_drive4_cmd(-500, -500, -500, -500)) + assert cmd["fl"] == -500 + + def test_rear_mirrors_front_in_pipeline(self): + """In compute_wheel_speeds, rl=fl and rr=fr.""" + fl, fr, rl, rr = _compute_wheel_speeds(1.0, 0.0, 0.45, 3.0, 2.5, 3.0) + assert rl == fl + assert rr == fr + + +# ── Full pipeline (compute_wheel_speeds) ───────────────────────────────────── + +class TestComputeWheelSpeeds: + _SEP = 0.45 + _MAX_LIN = 3.0 + _MAX_ANG = 2.5 + _MAX_SPD = 3.0 + + def test_straight_full_speed(self): + fl, fr, rl, rr = _compute_wheel_speeds( + 3.0, 0.0, self._SEP, self._MAX_LIN, self._MAX_ANG, self._MAX_SPD) + assert fl == fr == rl == rr == 1000 + + def test_stopped(self): + fl, fr, rl, rr = _compute_wheel_speeds( + 0.0, 0.0, self._SEP, self._MAX_LIN, self._MAX_ANG, self._MAX_SPD) + assert fl == fr == rl == rr == 0 + + def test_turn_left_fr_greater(self): + fl, fr, rl, rr = _compute_wheel_speeds( + 1.0, 1.0, self._SEP, self._MAX_LIN, self._MAX_ANG, self._MAX_SPD) + assert fr > fl + + def test_clamp_linear_vel(self): + """cmd > max_linear_vel is clamped.""" + fl1, fr1, _, _ = _compute_wheel_speeds( + 3.0, 0.0, self._SEP, self._MAX_LIN, self._MAX_ANG, self._MAX_SPD) + fl2, fr2, _, _ = _compute_wheel_speeds( + 10.0, 0.0, self._SEP, self._MAX_LIN, self._MAX_ANG, self._MAX_SPD) + assert fl1 == fl2 == 1000 + + def test_reverse(self): + fl, fr, rl, rr = _compute_wheel_speeds( + -3.0, 0.0, self._SEP, self._MAX_LIN, self._MAX_ANG, self._MAX_SPD) + assert fl == fr == rl == rr == -1000 + + def test_pivot_turn_symmetric(self): + """Pure rotation: FL/RL negative, FR/RR positive, symmetric.""" + fl, fr, rl, rr = _compute_wheel_speeds( + 0.0, 2.0, self._SEP, self._MAX_LIN, self._MAX_ANG, self._MAX_SPD) + assert fl < 0 + assert fr > 0 + assert fl == rl + assert fr == rr + assert fl == -fr + + +# ── Ramp / acceleration limiter ────────────────────────────────────────────── + +class TestApplyRamp: + _DT = 0.02 # 50 Hz + + def test_accel_step(self): + v = _apply_ramp(0.0, 3.0, 2.0, 4.0, self._DT) + assert v == pytest.approx(2.0 * self._DT) # 0.04 m/s + + def test_decel_step(self): + v = _apply_ramp(3.0, 0.0, 2.0, 4.0, self._DT) + assert v == pytest.approx(3.0 - 4.0 * self._DT) # 2.92 m/s + + def test_decel_faster_than_accel(self): + """Decel limit (4.0) > accel limit (2.0) → braking step larger.""" + accel_step = _apply_ramp(0.0, 1.0, 2.0, 4.0, self._DT) - 0.0 + decel_step = 3.0 - _apply_ramp(3.0, 0.0, 2.0, 4.0, self._DT) + assert decel_step > accel_step + + def test_does_not_overshoot(self): + v = _apply_ramp(2.99, 3.0, 2.0, 4.0, self._DT) + assert v == pytest.approx(3.0) + + def test_does_not_undershoot(self): + v = _apply_ramp(0.01, 0.0, 2.0, 4.0, self._DT) + assert v == pytest.approx(0.0) + + def test_negative_to_zero_uses_decel_limit(self): + """From -1.0 toward 0: decelerating → decel_limit applies.""" + v = _apply_ramp(-1.0, 0.0, 2.0, 4.0, self._DT) + assert v == pytest.approx(-1.0 + 4.0 * self._DT) # -0.92 + + def test_zero_target(self): + assert _apply_ramp(0.0, 0.0, 2.0, 4.0, self._DT) == pytest.approx(0.0) + + def test_time_to_reach_max(self): + """At 2.0 m/s², 3.0 m/s target takes ≈ 3/2/0.02 = 75 ticks.""" + v = 0.0 + ticks = 0 + for _ in range(200): + v = _apply_ramp(v, 3.0, 2.0, 4.0, 0.02) + ticks += 1 + if v >= 3.0 - 1e-6: + break + assert v == pytest.approx(3.0, abs=1e-4) + assert 70 <= ticks <= 80 + + +# ── Odometry integration ────────────────────────────────────────────────────── + +class TestIntegratePose: + + def test_straight_along_x(self): + """Moving forward at 1 m/s for 1s → x increases by 1m.""" + x, y, theta = _integrate_pose(0.0, 0.0, 0.0, 1.0, 0.0, 1.0) + assert x == pytest.approx(1.0) + assert y == pytest.approx(0.0) + assert theta == pytest.approx(0.0) + + def test_straight_along_y(self): + """Facing +Y (theta=pi/2), moving forward 1m → y increases.""" + x, y, theta = _integrate_pose(0.0, 0.0, math.pi / 2, 1.0, 0.0, 1.0) + assert x == pytest.approx(0.0, abs=1e-9) + assert y == pytest.approx(1.0) + + def test_pure_rotation(self): + """No linear vel, angular=pi/2 rad/s for 1s → theta = pi/2.""" + x, y, theta = _integrate_pose(0.0, 0.0, 0.0, 0.0, math.pi / 2, 1.0) + assert x == pytest.approx(0.0) + assert y == pytest.approx(0.0) + assert theta == pytest.approx(math.pi / 2) + + def test_backward(self): + """Reverse (linear=-1) → x decreases.""" + x, y, theta = _integrate_pose(0.0, 0.0, 0.0, -1.0, 0.0, 1.0) + assert x == pytest.approx(-1.0) + + def test_accumulated_over_ticks(self): + """5m at 1m/s with dt=0.02 → 250 ticks.""" + x, y, theta = 0.0, 0.0, 0.0 + for _ in range(250): + x, y, theta = _integrate_pose(x, y, theta, 1.0, 0.0, 0.02) + assert x == pytest.approx(5.0, abs=1e-6) + + def test_small_dt_arc(self): + """Short arc: forward + small turn. Position should shift appropriately.""" + x, y, theta = _integrate_pose(0.0, 0.0, 0.0, 1.0, 0.1, 0.1) + # theta changes by 0.01 rad; x ≈ 0.1, y ≈ 0 (small angle) + assert x == pytest.approx(1.0 * 0.1, abs=0.01) + assert theta == pytest.approx(0.01) + + +# ── Quaternion from yaw ─────────────────────────────────────────────────────── + +class TestEulerToQuaternionZ: + + def test_zero_yaw_identity(self): + qx, qy, qz, qw = _euler_to_quaternion_z(0.0) + assert qx == pytest.approx(0.0) + assert qy == pytest.approx(0.0) + assert qz == pytest.approx(0.0) + assert qw == pytest.approx(1.0) + + def test_pi_yaw(self): + """180° rotation: qw ≈ 0, qz ≈ 1.""" + qx, qy, qz, qw = _euler_to_quaternion_z(math.pi) + assert qz == pytest.approx(1.0, abs=1e-6) + assert abs(qw) < 1e-6 + + def test_half_pi_yaw(self): + """90° rotation: qz = qw = 1/sqrt(2).""" + qx, qy, qz, qw = _euler_to_quaternion_z(math.pi / 2) + assert qz == pytest.approx(math.sqrt(2) / 2, rel=1e-5) + assert qw == pytest.approx(math.sqrt(2) / 2, rel=1e-5) + + def test_unit_quaternion(self): + """All quaternions must be unit length.""" + for theta in [0, 0.5, 1.0, math.pi / 2, math.pi, -math.pi / 4]: + qx, qy, qz, qw = _euler_to_quaternion_z(theta) + norm = math.sqrt(qx**2 + qy**2 + qz**2 + qw**2) + assert norm == pytest.approx(1.0, rel=1e-6) + + def test_negative_yaw(self): + """Negative yaw → negative qz.""" + qx, qy, qz, qw = _euler_to_quaternion_z(-math.pi / 2) + assert qz < 0 + + +# ── Integration: kinematics + scaling ──────────────────────────────────────── + +class TestIntegration: + + def test_full_pipeline_straight(self): + """1 m/s forward → both wheels 333 int (1/3 of max_speed=3).""" + fl, fr, rl, rr = _compute_wheel_speeds( + 1.0, 0.0, 0.45, 3.0, 2.5, 3.0) + assert fl == fr == rl == rr == 333 + + def test_drive_cmd_round_trip(self): + """Compute speeds → build JSON → parse → verify values.""" + fl, fr, rl, rr = _compute_wheel_speeds( + 1.5, 0.5, 0.45, 3.0, 2.5, 3.0) + cmd = json.loads(_build_drive4_cmd(fl, fr, rl, rr)) + assert cmd["fl"] == fl + assert cmd["fr"] == fr + assert cmd["cmd"] == "drive4" + + def test_ramp_reaches_target(self): + """Starting from rest, ramp reaches commanded speed.""" + target_vl, _ = _diff_drive_kinematics(2.0, 0.0, 0.45) + v = 0.0 + for _ in range(200): + v = _apply_ramp(v, target_vl, 2.0, 4.0, 0.02) + assert v == pytest.approx(target_vl, abs=1e-4) + + def test_turn_cmd_produces_different_sides(self): + """Turning command: left and right wheel commands must differ.""" + fl, fr, rl, rr = _compute_wheel_speeds( + 1.0, 1.0, 0.45, 3.0, 2.5, 3.0) + assert fl != fr # turn → different wheel speeds + assert fl == rl # front and rear same per side + assert fr == rr