feat: SaltyRover 4-wheel diff-drive control loop (#74) #77

Merged
sl-jetson merged 1 commits from sl-controls/rover-drive into saltyrover-dev 2026-03-01 01:20:52 -05:00
10 changed files with 1108 additions and 0 deletions
Showing only changes of commit 4fed80f36d - Show all commits

View File

@ -0,0 +1,47 @@
# rover_params.yaml
# SaltyRover 4-wheel differential drive parameters.
#
# SaltyRover is a stable 4-wheel chassis (no balance PID).
# Left side (FL + RL) and right side (FR + RR) run at the same speed per side.
# STM32 command: {"cmd":"drive4","fl":N,"fr":N,"rl":N,"rr":N} N ∈ [-1000, 1000]
#
# Run with:
# ros2 launch saltyrover_drive rover_drive.launch.py
#
# Data flow:
# /cmd_vel → [rover_drive_node] → /rover/wheel_speeds → STM32
# ── Chassis geometry ───────────────────────────────────────────────────────────
# Measure wheel_separation centre-to-centre between L and R drive wheels.
# wheel_base is front-to-rear axle distance (used for future encoder odometry).
wheel_separation: 0.45 # m (adjustable per chassis build)
wheel_base: 0.40 # m
wheel_radius: 0.075 # m (75 mm hub motor wheels)
# ── Speed ─────────────────────────────────────────────────────────────────────
# max_speed is the m/s value that maps to ±1000 STM32 ticks.
# Higher than SaltyLab (no tipping risk from balance).
max_speed: 3.0 # m/s (= 10.8 km/h) — maps to ±1000
max_linear_vel: 3.0 # m/s — cmd_vel hard clamp (same as max_speed initially)
max_angular_vel: 2.5 # rad/s
# ── Acceleration limits ────────────────────────────────────────────────────────
# 4-wheel stable chassis can handle faster ramps than SaltyLab balance robot.
accel_limit: 2.0 # m/s² — still gentle for person-following smoothness
decel_limit: 4.0 # m/s² — braking can be faster than accelerating
# ── Control ───────────────────────────────────────────────────────────────────
control_rate: 50.0 # Hz — 20ms tick
# ── Watchdog ──────────────────────────────────────────────────────────────────
# Zero wheel speeds if no cmd_vel received within cmd_timeout seconds.
cmd_timeout: 0.5 # s
# ── Safety ────────────────────────────────────────────────────────────────────
drive_enabled: true # runtime on/off: ros2 param set /rover_drive drive_enabled false
# ── Odometry ──────────────────────────────────────────────────────────────────
publish_tf: true # broadcast odom → base_link TF
odom_frame_id: odom
base_frame_id: base_link
publish_rate: 50.0 # Hz

View File

@ -0,0 +1,120 @@
"""
rover_drive.launch.py SaltyRover 4-wheel differential drive launch.
Launches:
rover_drive_node : /cmd_vel diff-drive kinematics /rover/wheel_speeds (STM32 JSON)
rover_odom_node : /cmd_vel dead-reckoning pose /odom + TF odombase_link
Usage:
# Defaults from rover_params.yaml:
ros2 launch saltyrover_drive rover_drive.launch.py
# Override wheel separation (wider chassis build):
ros2 launch saltyrover_drive rover_drive.launch.py wheel_separation:=0.50
# Higher speed for open-area testing:
ros2 launch saltyrover_drive rover_drive.launch.py max_speed:=4.0 max_linear_vel:=4.0
# Disable TF (if using robot_localization EKF instead):
ros2 launch saltyrover_drive rover_drive.launch.py publish_tf:=false
Topics:
/cmd_vel Input (from person_follower, Nav2, or teleop)
/rover/wheel_speeds Output to STM32 bridge (JSON drive4 command)
/rover/drive_status Speed + enabled status (JSON String)
/odom Odometry (nav_msgs/Odometry)
"""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument, OpaqueFunction
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def _launch_nodes(context, *args, **kwargs):
params_file = LaunchConfiguration("params_file").perform(context)
def _s(k): return LaunchConfiguration(k).perform(context)
def _f(k): return float(LaunchConfiguration(k).perform(context))
def _b(k): return LaunchConfiguration(k).perform(context).lower() == "true"
drive_params = {
"wheel_separation": _f("wheel_separation"),
"wheel_radius": _f("wheel_radius"),
"max_speed": _f("max_speed"),
"max_linear_vel": _f("max_linear_vel"),
"max_angular_vel": _f("max_angular_vel"),
"accel_limit": _f("accel_limit"),
"decel_limit": _f("decel_limit"),
"control_rate": _f("control_rate"),
"cmd_timeout": _f("cmd_timeout"),
"drive_enabled": _b("drive_enabled"),
}
odom_params = {
"wheel_separation": _f("wheel_separation"),
"publish_tf": _b("publish_tf"),
"odom_frame_id": _s("odom_frame_id"),
"base_frame_id": _s("base_frame_id"),
"publish_rate": _f("control_rate"),
}
node_params_drive = [params_file, drive_params] if params_file else [drive_params]
node_params_odom = [params_file, odom_params] if params_file else [odom_params]
return [
Node(
package="saltyrover_drive",
executable="rover_drive_node",
name="rover_drive",
output="screen",
parameters=node_params_drive,
),
Node(
package="saltyrover_drive",
executable="rover_odom_node",
name="rover_odom",
output="screen",
parameters=node_params_odom,
),
]
def generate_launch_description():
pkg_share = get_package_share_directory("saltyrover_drive")
default_yaml = os.path.join(pkg_share, "config", "rover_params.yaml")
return LaunchDescription([
DeclareLaunchArgument(
"params_file", default_value=default_yaml,
description="Full path to rover_params.yaml"),
# Chassis geometry
DeclareLaunchArgument("wheel_separation", default_value="0.45",
description="L/R wheel centre-to-centre distance (m)"),
DeclareLaunchArgument("wheel_radius", default_value="0.075"),
# Speed
DeclareLaunchArgument("max_speed", default_value="3.0",
description="m/s corresponding to ±1000 STM32 ticks"),
DeclareLaunchArgument("max_linear_vel", default_value="3.0"),
DeclareLaunchArgument("max_angular_vel", default_value="2.5"),
# Accel
DeclareLaunchArgument("accel_limit", default_value="2.0"),
DeclareLaunchArgument("decel_limit", default_value="4.0"),
# Control
DeclareLaunchArgument("control_rate", default_value="50.0"),
DeclareLaunchArgument("cmd_timeout", default_value="0.5"),
DeclareLaunchArgument("drive_enabled", default_value="true"),
# Odometry
DeclareLaunchArgument("publish_tf", default_value="true"),
DeclareLaunchArgument("odom_frame_id", default_value="odom"),
DeclareLaunchArgument("base_frame_id", default_value="base_link"),
OpaqueFunction(function=_launch_nodes),
])

View File

@ -0,0 +1,30 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltyrover_drive</name>
<version>0.1.0</version>
<description>
4-wheel differential drive controller for SaltyRover variant.
Converts /cmd_vel (Twist) to per-wheel speeds for STM32 via JSON drive4 command.
Includes dead-reckoning odometry node. No balance PID required — stable chassis.
</description>
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>geometry_msgs</depend>
<depend>nav_msgs</depend>
<depend>std_msgs</depend>
<depend>tf2_ros</depend>
<buildtool_depend>ament_python</buildtool_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,266 @@
"""
rover_drive_node.py 4-wheel differential drive controller for SaltyRover.
SaltyRover is a stable 4-wheel chassis (no balance PID needed).
Two independently-driven axles: left (FL + RL) and right (FR + RR) run
at the same speed per side classic tank/skid-steer kinematics.
Subscribes:
/cmd_vel (geometry_msgs/Twist) standard ROS2 velocity command
Publishes:
/rover/wheel_speeds (std_msgs/String) JSON drive command for STM32:
{"cmd":"drive4","fl":N,"fr":N,"rl":N,"rr":N} N [-1000, 1000]
/rover/drive_status (std_msgs/String) JSON status: speeds + profile
Kinematics
----------
v_left = linear.x - angular.z × (wheel_separation / 2)
v_right = linear.x + angular.z × (wheel_separation / 2)
Speed is scaled to integer ±1000 for STM32:
wheel_int = clamp(v / max_speed, -1.0, 1.0) × 1000
Acceleration limiting
---------------------
Each wheel speed ramps toward the target at accel_limit m/ (50 Hz tick).
Deceleration uses a higher limit for responsive braking.
Safety
------
* cmd_vel input is hard-clamped to max_linear_vel / max_angular_vel.
* A watchdog zeroes wheel speeds if no cmd_vel received in cmd_timeout s.
* follow_enabled param for runtime on/off.
Usage
-----
ros2 launch saltyrover_drive rover_drive.launch.py
ros2 launch saltyrover_drive rover_drive.launch.py max_speed:=3.0
"""
import json
import math
import time
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
from std_msgs.msg import String
# ── Pure kinematics functions ─────────────────────────────────────────────────
def clamp(v: float, lo: float, hi: float) -> float:
return max(lo, min(hi, v))
def diff_drive_kinematics(linear_x: float, angular_z: float,
wheel_separation: float) -> tuple:
"""
Convert Twist to left/right wheel velocities (m/s).
wheel_separation: distance between left and right wheels (m)
Returns (v_left, v_right).
"""
half_sep = wheel_separation / 2.0
v_left = linear_x - angular_z * half_sep
v_right = linear_x + angular_z * half_sep
return v_left, v_right
def velocity_to_wheel_int(v: float, max_speed: float) -> int:
"""
Scale velocity (m/s) to integer wheel speed [-1000, +1000] for STM32.
max_speed : m/s corresponding to ±1000
Returns int clamped to [-1000, +1000].
"""
if max_speed <= 0:
return 0
scaled = clamp(v / max_speed, -1.0, 1.0)
return int(round(scaled * 1000))
def build_drive4_cmd(fl: int, fr: int, rl: int, rr: int) -> str:
"""Build STM32 JSON drive command for 4-wheel rover."""
return json.dumps({"cmd": "drive4", "fl": fl, "fr": fr, "rl": rl, "rr": rr})
def apply_ramp(current: float, target: float,
accel_limit: float, decel_limit: float,
dt: float) -> float:
"""
Trapezoidal ramp toward target.
Uses decel_limit when reducing magnitude (moving toward zero),
accel_limit when increasing magnitude.
"""
delta = target - current
decelerating = (current > 0 and delta < 0) or (current < 0 and delta > 0)
limit = decel_limit if decelerating else accel_limit
if delta > 0:
step = min(delta, limit * dt)
else:
step = max(delta, -limit * dt)
return current + step
def compute_wheel_speeds(linear_x: float, angular_z: float,
wheel_separation: float,
max_linear_vel: float, max_angular_vel: float,
max_speed: float) -> tuple:
"""
Full pipeline: Twist clamped kinematics int wheel speeds.
Returns (fl, fr, rl, rr) as ints in [-1000, +1000].
"""
# Clamp input
lx = clamp(linear_x, -max_linear_vel, max_linear_vel)
az = clamp(angular_z, -max_angular_vel, max_angular_vel)
v_left, v_right = diff_drive_kinematics(lx, az, wheel_separation)
fl = velocity_to_wheel_int(v_left, max_speed)
fr = velocity_to_wheel_int(v_right, max_speed)
rl = fl # rear mirrors front (paired L/R)
rr = fr
return fl, fr, rl, rr
# ── ROS2 Node ─────────────────────────────────────────────────────────────────
class RoverDriveNode(Node):
def __init__(self):
super().__init__("rover_drive")
# -- Parameters --------------------------------------------------------
self.declare_parameter("wheel_separation", 0.45) # m — L/R wheel distance
self.declare_parameter("wheel_radius", 0.075) # m
self.declare_parameter("max_speed", 2.0) # m/s → maps to ±1000
self.declare_parameter("max_linear_vel", 2.0) # cmd_vel clamp (m/s)
self.declare_parameter("max_angular_vel", 2.0) # cmd_vel clamp (rad/s)
self.declare_parameter("accel_limit", 1.5) # m/s²
self.declare_parameter("decel_limit", 3.0) # m/s² (braking faster)
self.declare_parameter("control_rate", 50.0) # Hz
self.declare_parameter("cmd_timeout", 0.5) # s — watchdog
self.declare_parameter("drive_enabled", True)
self._reload_params()
# -- State -------------------------------------------------------------
self._target_vel_left = 0.0
self._target_vel_right = 0.0
self._current_vel_left = 0.0
self._current_vel_right = 0.0
self._last_cmd_t = 0.0 # monotonic; 0 = never received
# -- Subscriptions -----------------------------------------------------
self.create_subscription(Twist, "/cmd_vel", self._cmd_vel_cb, 10)
# -- Publishers --------------------------------------------------------
self._drive_pub = self.create_publisher(String, "/rover/wheel_speeds", 10)
self._status_pub = self.create_publisher(String, "/rover/drive_status", 10)
# -- Control timer -----------------------------------------------------
dt = 1.0 / self._p["control_rate"]
self._timer = self.create_timer(dt, self._control_cb)
self.get_logger().info(
f"RoverDrive ready sep={self._p['wheel_separation']}m "
f"max={self._p['max_speed']}m/s "
f"rate={self._p['control_rate']}Hz"
)
def _reload_params(self):
self._p = {
"wheel_separation": self.get_parameter("wheel_separation").value,
"wheel_radius": self.get_parameter("wheel_radius").value,
"max_speed": self.get_parameter("max_speed").value,
"max_linear_vel": self.get_parameter("max_linear_vel").value,
"max_angular_vel": self.get_parameter("max_angular_vel").value,
"accel_limit": self.get_parameter("accel_limit").value,
"decel_limit": self.get_parameter("decel_limit").value,
"control_rate": self.get_parameter("control_rate").value,
"cmd_timeout": self.get_parameter("cmd_timeout").value,
"drive_enabled": self.get_parameter("drive_enabled").value,
}
def _cmd_vel_cb(self, msg: Twist):
self._reload_params()
p = self._p
if not p["drive_enabled"]:
self._target_vel_left = 0.0
self._target_vel_right = 0.0
return
lx = clamp(msg.linear.x, -p["max_linear_vel"], p["max_linear_vel"])
az = clamp(msg.angular.z, -p["max_angular_vel"], p["max_angular_vel"])
v_left, v_right = diff_drive_kinematics(lx, az, p["wheel_separation"])
# Clamp wheel velocities to max_speed
self._target_vel_left = clamp(v_left, -p["max_speed"], p["max_speed"])
self._target_vel_right = clamp(v_right, -p["max_speed"], p["max_speed"])
self._last_cmd_t = time.monotonic()
def _control_cb(self):
self._reload_params()
p = self._p
dt = 1.0 / p["control_rate"]
# Watchdog: zero target if no cmd_vel received recently
if self._last_cmd_t > 0.0:
age = time.monotonic() - self._last_cmd_t
if age > p["cmd_timeout"]:
self._target_vel_left = 0.0
self._target_vel_right = 0.0
if not p["drive_enabled"]:
self._target_vel_left = 0.0
self._target_vel_right = 0.0
# Ramp toward target
self._current_vel_left = apply_ramp(
self._current_vel_left, self._target_vel_left,
p["accel_limit"], p["decel_limit"], dt)
self._current_vel_right = apply_ramp(
self._current_vel_right, self._target_vel_right,
p["accel_limit"], p["decel_limit"], dt)
# Convert to int wheel speeds
fl = velocity_to_wheel_int(self._current_vel_left, p["max_speed"])
fr = velocity_to_wheel_int(self._current_vel_right, p["max_speed"])
rl, rr = fl, fr # rear mirrors front
# Publish STM32 command
cmd_str = build_drive4_cmd(fl, fr, rl, rr)
self._drive_pub.publish(String(data=cmd_str))
# Publish status
status = {
"fl": fl, "fr": fr, "rl": rl, "rr": rr,
"v_left": round(self._current_vel_left, 3),
"v_right": round(self._current_vel_right, 3),
"enabled": p["drive_enabled"],
}
self._status_pub.publish(String(data=json.dumps(status)))
# ── Entry point ───────────────────────────────────────────────────────────────
def main(args=None):
rclpy.init(args=args)
node = RoverDriveNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.try_shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,197 @@
"""
rover_odom_node.py Wheel odometry for SaltyRover (dead-reckoning from cmd_vel).
Uses the commanded velocity (dead-reckoning) since the STM32 encoder feedback
may not always be available at launch. When encoder feedback is wired up,
replace the cmd_vel subscription with actual wheel tick messages.
Subscribes:
/cmd_vel (geometry_msgs/Twist) commanded velocities (proxy for actual)
Publishes:
/odom (nav_msgs/Odometry) accumulated pose + velocity estimate
TF: odom base_link
Kinematics (differential drive):
v = (v_right + v_left) / 2 = linear.x
omega = (v_right - v_left) / wheel_sep = angular.z
x' = x + v × cos(θ) × dt
y' = y + v × sin(θ) × dt
θ' = θ + omega × dt
"""
import math
import time
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist, TransformStamped
from nav_msgs.msg import Odometry
try:
from tf2_ros import TransformBroadcaster
_HAVE_TF2 = True
except ImportError:
_HAVE_TF2 = False
# ── Pure odometry functions ───────────────────────────────────────────────────
def integrate_pose(x: float, y: float, theta: float,
linear_x: float, angular_z: float,
dt: float) -> tuple:
"""
Integrate pose using forward Euler kinematics.
x, y : position (m)
theta : heading (rad), 0 = +X axis
linear_x: forward velocity (m/s)
angular_z: yaw rate (rad/s)
dt : time step (s)
Returns (x', y', theta').
"""
x_new = x + linear_x * math.cos(theta) * dt
y_new = y + linear_x * math.sin(theta) * dt
theta_new = theta + angular_z * dt
return x_new, y_new, theta_new
def euler_to_quaternion_z(theta: float) -> tuple:
"""
Convert a yaw angle to a quaternion (rotation around Z only).
Returns (qx, qy, qz, qw).
"""
half = theta / 2.0
return 0.0, 0.0, math.sin(half), math.cos(half)
# ── ROS2 Node ─────────────────────────────────────────────────────────────────
class RoverOdomNode(Node):
def __init__(self):
super().__init__("rover_odom")
# -- Parameters --------------------------------------------------------
self.declare_parameter("wheel_separation", 0.45) # m
self.declare_parameter("publish_tf", True)
self.declare_parameter("odom_frame_id", "odom")
self.declare_parameter("base_frame_id", "base_link")
self.declare_parameter("publish_rate", 50.0) # Hz
p = self._params()
self._odom_frame = p["odom_frame_id"]
self._base_frame = p["base_frame_id"]
self._publish_tf = p["publish_tf"] and _HAVE_TF2
# -- Pose state --------------------------------------------------------
self._x = 0.0
self._y = 0.0
self._theta = 0.0
self._vx = 0.0
self._omega = 0.0
self._last_t = None # time.monotonic() of last cmd_vel
# -- TF2 broadcaster ---------------------------------------------------
if self._publish_tf:
self._tf_broadcaster = TransformBroadcaster(self)
# -- Subscriptions -----------------------------------------------------
self.create_subscription(Twist, "/cmd_vel", self._cmd_vel_cb, 10)
# -- Publishers --------------------------------------------------------
self._odom_pub = self.create_publisher(Odometry, "/odom", 10)
# -- Timer -------------------------------------------------------------
self._timer = self.create_timer(
1.0 / p["publish_rate"], self._publish_cb)
self.get_logger().info("RoverOdom ready (dead-reckoning from /cmd_vel)")
def _params(self) -> dict:
return {
"wheel_separation": self.get_parameter("wheel_separation").value,
"publish_tf": self.get_parameter("publish_tf").value,
"odom_frame_id": self.get_parameter("odom_frame_id").value,
"base_frame_id": self.get_parameter("base_frame_id").value,
"publish_rate": self.get_parameter("publish_rate").value,
}
def _cmd_vel_cb(self, msg: Twist):
now = time.monotonic()
if self._last_t is not None:
dt = now - self._last_t
if 0 < dt < 1.0: # sanity: ignore if gap > 1s
self._x, self._y, self._theta = integrate_pose(
self._x, self._y, self._theta,
msg.linear.x, msg.angular.z, dt)
self._last_t = now
self._vx = msg.linear.x
self._omega = msg.angular.z
def _publish_cb(self):
now_msg = self.get_clock().now().to_msg()
qx, qy, qz, qw = euler_to_quaternion_z(self._theta)
# -- Odometry message --------------------------------------------------
odom = Odometry()
odom.header.stamp = now_msg
odom.header.frame_id = self._odom_frame
odom.child_frame_id = self._base_frame
odom.pose.pose.position.x = self._x
odom.pose.pose.position.y = self._y
odom.pose.pose.position.z = 0.0
odom.pose.pose.orientation.x = qx
odom.pose.pose.orientation.y = qy
odom.pose.pose.orientation.z = qz
odom.pose.pose.orientation.w = qw
odom.twist.twist.linear.x = self._vx
odom.twist.twist.angular.z = self._omega
# Covariance: dead-reckoning, moderate confidence
odom.pose.covariance[0] = 0.1 # x
odom.pose.covariance[7] = 0.1 # y
odom.pose.covariance[35] = 0.05 # yaw
odom.twist.covariance[0] = 0.01
odom.twist.covariance[35] = 0.01
self._odom_pub.publish(odom)
# -- TF broadcast ------------------------------------------------------
if self._publish_tf:
tf = TransformStamped()
tf.header.stamp = now_msg
tf.header.frame_id = self._odom_frame
tf.child_frame_id = self._base_frame
tf.transform.translation.x = self._x
tf.transform.translation.y = self._y
tf.transform.translation.z = 0.0
tf.transform.rotation.x = qx
tf.transform.rotation.y = qy
tf.transform.rotation.z = qz
tf.transform.rotation.w = qw
self._tf_broadcaster.sendTransform(tf)
# ── Entry point ───────────────────────────────────────────────────────────────
def main(args=None):
rclpy.init(args=args)
node = RoverOdomNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.try_shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/saltyrover_drive
[install]
install_scripts=$base/lib/saltyrover_drive

View File

@ -0,0 +1,30 @@
from setuptools import setup
package_name = "saltyrover_drive"
setup(
name=package_name,
version="0.1.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(f"share/{package_name}/launch", ["launch/rover_drive.launch.py"]),
(f"share/{package_name}/config", ["config/rover_params.yaml"]),
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="sl-controls",
maintainer_email="sl-controls@saltylab.local",
description="4-wheel diff-drive controller for SaltyRover (tank kinematics + dead-reckoning odom)",
license="MIT",
tests_require=["pytest"],
entry_points={
"console_scripts": [
# /cmd_vel → diff-drive kinematics → /rover/wheel_speeds (STM32 JSON)
"rover_drive_node = saltyrover_drive.rover_drive_node:main",
# /cmd_vel → dead-reckoning pose → /odom + TF
"rover_odom_node = saltyrover_drive.rover_odom_node:main",
],
},
)

View File

@ -0,0 +1,414 @@
"""
Unit tests for saltyrover_drive pure functions.
No ROS2 runtime required.
Run with: pytest jetson/ros2_ws/src/saltyrover_drive/test/test_rover_drive.py
"""
import json
import math
import pytest
# ── Pure function mirrors ─────────────────────────────────────────────────────
def _clamp(v, lo, hi):
return max(lo, min(hi, v))
def _diff_drive_kinematics(linear_x, angular_z, wheel_separation):
half_sep = wheel_separation / 2.0
v_left = linear_x - angular_z * half_sep
v_right = linear_x + angular_z * half_sep
return v_left, v_right
def _velocity_to_wheel_int(v, max_speed):
if max_speed <= 0:
return 0
scaled = _clamp(v / max_speed, -1.0, 1.0)
return int(round(scaled * 1000))
def _build_drive4_cmd(fl, fr, rl, rr):
return json.dumps({"cmd": "drive4", "fl": fl, "fr": fr, "rl": rl, "rr": rr})
def _apply_ramp(current, target, accel_limit, decel_limit, dt):
delta = target - current
decelerating = (current > 0 and delta < 0) or (current < 0 and delta > 0)
limit = decel_limit if decelerating else accel_limit
if delta > 0:
step = min(delta, limit * dt)
else:
step = max(delta, -limit * dt)
return current + step
def _compute_wheel_speeds(linear_x, angular_z, wheel_separation,
max_linear_vel, max_angular_vel, max_speed):
lx = _clamp(linear_x, -max_linear_vel, max_linear_vel)
az = _clamp(angular_z, -max_angular_vel, max_angular_vel)
v_left, v_right = _diff_drive_kinematics(lx, az, wheel_separation)
fl = _velocity_to_wheel_int(v_left, max_speed)
fr = _velocity_to_wheel_int(v_right, max_speed)
rl, rr = fl, fr
return fl, fr, rl, rr
def _integrate_pose(x, y, theta, linear_x, angular_z, dt):
x_new = x + linear_x * math.cos(theta) * dt
y_new = y + linear_x * math.sin(theta) * dt
theta_new = theta + angular_z * dt
return x_new, y_new, theta_new
def _euler_to_quaternion_z(theta):
half = theta / 2.0
return 0.0, 0.0, math.sin(half), math.cos(half)
# ── Differential drive kinematics ────────────────────────────────────────────
class TestDiffDriveKinematics:
_SEP = 0.45 # m — standard rover wheel separation
def test_straight_equal_speeds(self):
"""Straight ahead: both wheels same speed."""
vl, vr = _diff_drive_kinematics(1.0, 0.0, self._SEP)
assert vl == pytest.approx(1.0)
assert vr == pytest.approx(1.0)
def test_turn_left_right_faster(self):
"""Turn left (positive angular_z): right wheel faster."""
vl, vr = _diff_drive_kinematics(1.0, 1.0, self._SEP)
assert vr > vl
def test_turn_right_left_faster(self):
"""Turn right (negative angular_z): left wheel faster."""
vl, vr = _diff_drive_kinematics(1.0, -1.0, self._SEP)
assert vl > vr
def test_pivot_left_in_place(self):
"""Pivot turn left in place (linear=0): right forward, left backward."""
vl, vr = _diff_drive_kinematics(0.0, 1.0, self._SEP)
assert vl < 0
assert vr > 0
assert vl == pytest.approx(-vr) # symmetric
def test_pivot_speed_proportional_to_separation(self):
"""Pivot speed = angular_z × sep/2."""
angular_z = 2.0
sep = 0.45
vl, vr = _diff_drive_kinematics(0.0, angular_z, sep)
expected_half = angular_z * sep / 2.0
assert vr == pytest.approx( expected_half)
assert vl == pytest.approx(-expected_half)
def test_backward_straight(self):
vl, vr = _diff_drive_kinematics(-1.0, 0.0, self._SEP)
assert vl == pytest.approx(-1.0)
assert vr == pytest.approx(-1.0)
def test_zero_cmd(self):
vl, vr = _diff_drive_kinematics(0.0, 0.0, self._SEP)
assert vl == pytest.approx(0.0)
assert vr == pytest.approx(0.0)
def test_turning_radius(self):
"""
Turning radius R = linear_x / angular_z.
Also: v_right = omega × (R + sep/2), v_left = omega × (R - sep/2)
Verify ratio: v_right/v_left = (R + sep/2)/(R - sep/2).
"""
lx, az, sep = 1.0, 0.5, 0.45
vl, vr = _diff_drive_kinematics(lx, az, sep)
R = lx / az # = 2.0 m
expected_ratio = (R + sep / 2) / (R - sep / 2)
assert vr / vl == pytest.approx(expected_ratio, rel=1e-5)
def test_wider_separation_higher_speed_diff(self):
"""Wider chassis → larger speed difference for same angular command."""
vl_narrow, vr_narrow = _diff_drive_kinematics(1.0, 1.0, 0.30)
vl_wide, vr_wide = _diff_drive_kinematics(1.0, 1.0, 0.60)
assert (vr_wide - vl_wide) > (vr_narrow - vl_narrow)
# ── Velocity to wheel integer ─────────────────────────────────────────────────
class TestVelocityToWheelInt:
def test_full_forward(self):
assert _velocity_to_wheel_int(3.0, 3.0) == 1000
def test_full_reverse(self):
assert _velocity_to_wheel_int(-3.0, 3.0) == -1000
def test_half_speed(self):
assert _velocity_to_wheel_int(1.5, 3.0) == 500
def test_stopped(self):
assert _velocity_to_wheel_int(0.0, 3.0) == 0
def test_clamp_above_max(self):
assert _velocity_to_wheel_int(5.0, 3.0) == 1000
def test_clamp_below_min(self):
assert _velocity_to_wheel_int(-5.0, 3.0) == -1000
def test_zero_max_speed_safe(self):
"""max_speed=0 must not divide by zero."""
assert _velocity_to_wheel_int(1.0, 0.0) == 0
def test_rounding(self):
"""0.5005 * 1000 = 500.5 → rounds to 501."""
# 1.5015 / 3.0 = 0.5005
result = _velocity_to_wheel_int(1.5015, 3.0)
assert result in (500, 501) # rounding may differ slightly
def test_one_third(self):
"""1.0 / 3.0 m/s → ~333."""
result = _velocity_to_wheel_int(1.0, 3.0)
assert result == 333
# ── Drive command JSON builder ────────────────────────────────────────────────
class TestBuildDrive4Cmd:
def test_valid_json(self):
cmd = _build_drive4_cmd(100, 200, 100, 200)
parsed = json.loads(cmd)
assert parsed["cmd"] == "drive4"
def test_fields_present(self):
cmd = json.loads(_build_drive4_cmd(10, 20, 30, 40))
assert cmd["fl"] == 10
assert cmd["fr"] == 20
assert cmd["rl"] == 30
assert cmd["rr"] == 40
def test_zero_speeds(self):
cmd = json.loads(_build_drive4_cmd(0, 0, 0, 0))
assert all(cmd[k] == 0 for k in ("fl", "fr", "rl", "rr"))
def test_negative_values(self):
cmd = json.loads(_build_drive4_cmd(-500, -500, -500, -500))
assert cmd["fl"] == -500
def test_rear_mirrors_front_in_pipeline(self):
"""In compute_wheel_speeds, rl=fl and rr=fr."""
fl, fr, rl, rr = _compute_wheel_speeds(1.0, 0.0, 0.45, 3.0, 2.5, 3.0)
assert rl == fl
assert rr == fr
# ── Full pipeline (compute_wheel_speeds) ─────────────────────────────────────
class TestComputeWheelSpeeds:
_SEP = 0.45
_MAX_LIN = 3.0
_MAX_ANG = 2.5
_MAX_SPD = 3.0
def test_straight_full_speed(self):
fl, fr, rl, rr = _compute_wheel_speeds(
3.0, 0.0, self._SEP, self._MAX_LIN, self._MAX_ANG, self._MAX_SPD)
assert fl == fr == rl == rr == 1000
def test_stopped(self):
fl, fr, rl, rr = _compute_wheel_speeds(
0.0, 0.0, self._SEP, self._MAX_LIN, self._MAX_ANG, self._MAX_SPD)
assert fl == fr == rl == rr == 0
def test_turn_left_fr_greater(self):
fl, fr, rl, rr = _compute_wheel_speeds(
1.0, 1.0, self._SEP, self._MAX_LIN, self._MAX_ANG, self._MAX_SPD)
assert fr > fl
def test_clamp_linear_vel(self):
"""cmd > max_linear_vel is clamped."""
fl1, fr1, _, _ = _compute_wheel_speeds(
3.0, 0.0, self._SEP, self._MAX_LIN, self._MAX_ANG, self._MAX_SPD)
fl2, fr2, _, _ = _compute_wheel_speeds(
10.0, 0.0, self._SEP, self._MAX_LIN, self._MAX_ANG, self._MAX_SPD)
assert fl1 == fl2 == 1000
def test_reverse(self):
fl, fr, rl, rr = _compute_wheel_speeds(
-3.0, 0.0, self._SEP, self._MAX_LIN, self._MAX_ANG, self._MAX_SPD)
assert fl == fr == rl == rr == -1000
def test_pivot_turn_symmetric(self):
"""Pure rotation: FL/RL negative, FR/RR positive, symmetric."""
fl, fr, rl, rr = _compute_wheel_speeds(
0.0, 2.0, self._SEP, self._MAX_LIN, self._MAX_ANG, self._MAX_SPD)
assert fl < 0
assert fr > 0
assert fl == rl
assert fr == rr
assert fl == -fr
# ── Ramp / acceleration limiter ──────────────────────────────────────────────
class TestApplyRamp:
_DT = 0.02 # 50 Hz
def test_accel_step(self):
v = _apply_ramp(0.0, 3.0, 2.0, 4.0, self._DT)
assert v == pytest.approx(2.0 * self._DT) # 0.04 m/s
def test_decel_step(self):
v = _apply_ramp(3.0, 0.0, 2.0, 4.0, self._DT)
assert v == pytest.approx(3.0 - 4.0 * self._DT) # 2.92 m/s
def test_decel_faster_than_accel(self):
"""Decel limit (4.0) > accel limit (2.0) → braking step larger."""
accel_step = _apply_ramp(0.0, 1.0, 2.0, 4.0, self._DT) - 0.0
decel_step = 3.0 - _apply_ramp(3.0, 0.0, 2.0, 4.0, self._DT)
assert decel_step > accel_step
def test_does_not_overshoot(self):
v = _apply_ramp(2.99, 3.0, 2.0, 4.0, self._DT)
assert v == pytest.approx(3.0)
def test_does_not_undershoot(self):
v = _apply_ramp(0.01, 0.0, 2.0, 4.0, self._DT)
assert v == pytest.approx(0.0)
def test_negative_to_zero_uses_decel_limit(self):
"""From -1.0 toward 0: decelerating → decel_limit applies."""
v = _apply_ramp(-1.0, 0.0, 2.0, 4.0, self._DT)
assert v == pytest.approx(-1.0 + 4.0 * self._DT) # -0.92
def test_zero_target(self):
assert _apply_ramp(0.0, 0.0, 2.0, 4.0, self._DT) == pytest.approx(0.0)
def test_time_to_reach_max(self):
"""At 2.0 m/s², 3.0 m/s target takes ≈ 3/2/0.02 = 75 ticks."""
v = 0.0
ticks = 0
for _ in range(200):
v = _apply_ramp(v, 3.0, 2.0, 4.0, 0.02)
ticks += 1
if v >= 3.0 - 1e-6:
break
assert v == pytest.approx(3.0, abs=1e-4)
assert 70 <= ticks <= 80
# ── Odometry integration ──────────────────────────────────────────────────────
class TestIntegratePose:
def test_straight_along_x(self):
"""Moving forward at 1 m/s for 1s → x increases by 1m."""
x, y, theta = _integrate_pose(0.0, 0.0, 0.0, 1.0, 0.0, 1.0)
assert x == pytest.approx(1.0)
assert y == pytest.approx(0.0)
assert theta == pytest.approx(0.0)
def test_straight_along_y(self):
"""Facing +Y (theta=pi/2), moving forward 1m → y increases."""
x, y, theta = _integrate_pose(0.0, 0.0, math.pi / 2, 1.0, 0.0, 1.0)
assert x == pytest.approx(0.0, abs=1e-9)
assert y == pytest.approx(1.0)
def test_pure_rotation(self):
"""No linear vel, angular=pi/2 rad/s for 1s → theta = pi/2."""
x, y, theta = _integrate_pose(0.0, 0.0, 0.0, 0.0, math.pi / 2, 1.0)
assert x == pytest.approx(0.0)
assert y == pytest.approx(0.0)
assert theta == pytest.approx(math.pi / 2)
def test_backward(self):
"""Reverse (linear=-1) → x decreases."""
x, y, theta = _integrate_pose(0.0, 0.0, 0.0, -1.0, 0.0, 1.0)
assert x == pytest.approx(-1.0)
def test_accumulated_over_ticks(self):
"""5m at 1m/s with dt=0.02 → 250 ticks."""
x, y, theta = 0.0, 0.0, 0.0
for _ in range(250):
x, y, theta = _integrate_pose(x, y, theta, 1.0, 0.0, 0.02)
assert x == pytest.approx(5.0, abs=1e-6)
def test_small_dt_arc(self):
"""Short arc: forward + small turn. Position should shift appropriately."""
x, y, theta = _integrate_pose(0.0, 0.0, 0.0, 1.0, 0.1, 0.1)
# theta changes by 0.01 rad; x ≈ 0.1, y ≈ 0 (small angle)
assert x == pytest.approx(1.0 * 0.1, abs=0.01)
assert theta == pytest.approx(0.01)
# ── Quaternion from yaw ───────────────────────────────────────────────────────
class TestEulerToQuaternionZ:
def test_zero_yaw_identity(self):
qx, qy, qz, qw = _euler_to_quaternion_z(0.0)
assert qx == pytest.approx(0.0)
assert qy == pytest.approx(0.0)
assert qz == pytest.approx(0.0)
assert qw == pytest.approx(1.0)
def test_pi_yaw(self):
"""180° rotation: qw ≈ 0, qz ≈ 1."""
qx, qy, qz, qw = _euler_to_quaternion_z(math.pi)
assert qz == pytest.approx(1.0, abs=1e-6)
assert abs(qw) < 1e-6
def test_half_pi_yaw(self):
"""90° rotation: qz = qw = 1/sqrt(2)."""
qx, qy, qz, qw = _euler_to_quaternion_z(math.pi / 2)
assert qz == pytest.approx(math.sqrt(2) / 2, rel=1e-5)
assert qw == pytest.approx(math.sqrt(2) / 2, rel=1e-5)
def test_unit_quaternion(self):
"""All quaternions must be unit length."""
for theta in [0, 0.5, 1.0, math.pi / 2, math.pi, -math.pi / 4]:
qx, qy, qz, qw = _euler_to_quaternion_z(theta)
norm = math.sqrt(qx**2 + qy**2 + qz**2 + qw**2)
assert norm == pytest.approx(1.0, rel=1e-6)
def test_negative_yaw(self):
"""Negative yaw → negative qz."""
qx, qy, qz, qw = _euler_to_quaternion_z(-math.pi / 2)
assert qz < 0
# ── Integration: kinematics + scaling ────────────────────────────────────────
class TestIntegration:
def test_full_pipeline_straight(self):
"""1 m/s forward → both wheels 333 int (1/3 of max_speed=3)."""
fl, fr, rl, rr = _compute_wheel_speeds(
1.0, 0.0, 0.45, 3.0, 2.5, 3.0)
assert fl == fr == rl == rr == 333
def test_drive_cmd_round_trip(self):
"""Compute speeds → build JSON → parse → verify values."""
fl, fr, rl, rr = _compute_wheel_speeds(
1.5, 0.5, 0.45, 3.0, 2.5, 3.0)
cmd = json.loads(_build_drive4_cmd(fl, fr, rl, rr))
assert cmd["fl"] == fl
assert cmd["fr"] == fr
assert cmd["cmd"] == "drive4"
def test_ramp_reaches_target(self):
"""Starting from rest, ramp reaches commanded speed."""
target_vl, _ = _diff_drive_kinematics(2.0, 0.0, 0.45)
v = 0.0
for _ in range(200):
v = _apply_ramp(v, target_vl, 2.0, 4.0, 0.02)
assert v == pytest.approx(target_vl, abs=1e-4)
def test_turn_cmd_produces_different_sides(self):
"""Turning command: left and right wheel commands must differ."""
fl, fr, rl, rr = _compute_wheel_speeds(
1.0, 1.0, 0.45, 3.0, 2.5, 3.0)
assert fl != fr # turn → different wheel speeds
assert fl == rl # front and rear same per side
assert fr == rr