Merge pull request 'feat: person-following control loop' (#55) from sl-controls/person-follower into main

This commit is contained in:
seb 2026-02-28 23:25:40 -05:00
commit fcd59ead80
9 changed files with 976 additions and 0 deletions

View File

@ -0,0 +1,74 @@
# person_follower_params.yaml
# Configuration for person_follower_node — proportional person-following controller.
#
# Run with:
# ros2 launch saltybot_follower person_follower.launch.py
# Or override individual params:
# ros2 launch saltybot_follower person_follower.launch.py follow_distance:=1.2
#
# IMPORTANT: This node publishes raw /cmd_vel. The cmd_vel_bridge_node (PR #46)
# applies the ESC ramp, deadman switch, and STM32 AUTONOMOUS mode gate.
# Do not run this node without the cmd_vel bridge running on the same robot.
# ── Follow geometry ────────────────────────────────────────────────────────────
# The target distance to maintain behind the person (metres).
# Decrease for tighter following; increase for more standoff.
follow_distance: 1.5 # metres
# ±dead_zone around follow_distance — no linear velocity command in this band.
# Prevents jitter when the robot is approximately at target distance.
# Must be > position noise of /person/target detector.
dead_zone: 0.3 # metres
# ── Proportional gains ────────────────────────────────────────────────────────
# linear.x = clamp(kp_linear * (distance - follow_distance), ±max_linear_vel)
# angular.z = clamp(kp_angular * bearing, ±max_angular_vel)
#
# kp_linear: higher → faster approach/retreat but more oscillation
# kp_angular: higher → tighter heading tracking but may cause weaving
kp_linear: 0.6
kp_angular: 1.2
# ── Velocity limits ────────────────────────────────────────────────────────────
# Hard caps on the cmd_vel output of this node.
# The cmd_vel bridge ALSO clamps (max_linear_vel=0.5, max_angular_vel=2.0),
# so these limits should be ≤ bridge limits for this node to have effect.
#
# Balance robot: keep max_linear_vel ≤ 0.5 m/s until balance is well-tuned.
max_linear_vel: 0.5 # m/s
max_angular_vel: 1.0 # rad/s (conservative — avoid spinning out)
# ── Lost-target timeouts ──────────────────────────────────────────────────────
# lost_timeout: seconds after last /person/target before entering STOPPING
# (publishing zero cmd_vel — cmd_vel bridge deadman will also kick in).
lost_timeout: 2.0 # seconds
# search_timeout: seconds after last /person/target before entering SEARCHING
# (slow rotation to re-acquire person).
search_timeout: 5.0 # seconds
# Angular velocity while in SEARCHING state (rad/s, positive = CCW/left).
# Keep small — robot is on a balance platform.
search_angular_vel: 0.3 # rad/s
# ── Obstacle avoidance ────────────────────────────────────────────────────────
# The node checks a forward corridor in the Nav2 local costmap.
# If any cell >= obstacle_threshold, forward cmd_vel is zeroed (turning allowed).
#
# obstacle_check_dist: how far ahead to check (metres)
# robot_width: corridor width for obstacle check (metres)
# obstacle_threshold: costmap cell value treated as obstacle
# Nav2 conventions: free=0, inscribed=99, lethal=100, unknown=-1
# 70 = "probably occupied" — conservative; raise to 99 for less cautious
obstacle_check_dist: 0.5 # metres
robot_width: 0.3 # metres (corridor half-width = 0.15m each side)
obstacle_threshold: 70 # costmap value ≥ this → obstacle
# ── Control loop ──────────────────────────────────────────────────────────────
control_rate: 20.0 # Hz — lower than cmd_vel bridge (50Hz) by design
# ── Mode integration ──────────────────────────────────────────────────────────
# Master enable for the follow controller. When false, node publishes zero cmd_vel.
# Toggle at runtime: ros2 param set /person_follower follow_enabled false
# The cmd_vel bridge independently gates on STM32 AUTONOMOUS mode (md=2).
follow_enabled: true

View File

@ -0,0 +1,137 @@
"""
person_follower.launch.py Launch the proportional person-following controller.
The person_follower_node subscribes to /person/target (PoseStamped in base_link
frame) published by sl-jetson's person detector and drives /cmd_vel accordingly.
PREREQUISITES (must be running before this launch):
1. cmd_vel_bridge_node (saltybot_bridge) serial port owner, safety wrapping
2. sl-jetson's person_detector node — publishes /person/target
3. Nav2 local costmap publishes /local_costmap/costmap
(optional: obstacle avoidance is skipped if no costmap messages arrive)
Usage:
# Defaults (1.5m follow distance, 0.5m/s max, 2s lost timeout):
ros2 launch saltybot_follower person_follower.launch.py
# Closer following for indoor tight spaces:
ros2 launch saltybot_follower person_follower.launch.py follow_distance:=1.0
# Disable obstacle avoidance (for environments without Nav2 costmap):
ros2 launch saltybot_follower person_follower.launch.py obstacle_threshold:=100
# Load all params from YAML:
ros2 launch saltybot_follower person_follower.launch.py \\
params_file:=/path/to/person_follower_params.yaml
Safety note:
To disable at runtime without stopping the node:
ros2 param set /person_follower follow_enabled false
"""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument, OpaqueFunction
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def _launch_follower(context, *args, **kwargs):
params_file = LaunchConfiguration("params_file").perform(context)
inline_params = {
"follow_distance": float(LaunchConfiguration("follow_distance").perform(context)),
"dead_zone": float(LaunchConfiguration("dead_zone").perform(context)),
"kp_linear": float(LaunchConfiguration("kp_linear").perform(context)),
"kp_angular": float(LaunchConfiguration("kp_angular").perform(context)),
"max_linear_vel": float(LaunchConfiguration("max_linear_vel").perform(context)),
"max_angular_vel": float(LaunchConfiguration("max_angular_vel").perform(context)),
"lost_timeout": float(LaunchConfiguration("lost_timeout").perform(context)),
"search_timeout": float(LaunchConfiguration("search_timeout").perform(context)),
"search_angular_vel": float(LaunchConfiguration("search_angular_vel").perform(context)),
"obstacle_check_dist": float(LaunchConfiguration("obstacle_check_dist").perform(context)),
"obstacle_threshold": int(LaunchConfiguration("obstacle_threshold").perform(context)),
"robot_width": float(LaunchConfiguration("robot_width").perform(context)),
"control_rate": float(LaunchConfiguration("control_rate").perform(context)),
"follow_enabled": LaunchConfiguration("follow_enabled").perform(context).lower() == "true",
}
node_params = [params_file, inline_params] if params_file else [inline_params]
return [Node(
package="saltybot_follower",
executable="person_follower_node",
name="person_follower",
output="screen",
parameters=node_params,
)]
def generate_launch_description():
pkg_share = get_package_share_directory("saltybot_follower")
default_params = os.path.join(pkg_share, "config", "person_follower_params.yaml")
return LaunchDescription([
DeclareLaunchArgument(
"params_file",
default_value=default_params,
description="Full path to person_follower_params.yaml (overrides inline args)"),
# Follow geometry
DeclareLaunchArgument(
"follow_distance", default_value="1.5",
description="Target follow distance (m)"),
DeclareLaunchArgument(
"dead_zone", default_value="0.3",
description="Dead zone around follow_distance — no linear cmd (m)"),
# Gains
DeclareLaunchArgument(
"kp_linear", default_value="0.6",
description="Proportional gain: distance error → linear.x"),
DeclareLaunchArgument(
"kp_angular", default_value="1.2",
description="Proportional gain: bearing error → angular.z"),
# Velocity limits
DeclareLaunchArgument(
"max_linear_vel", default_value="0.5",
description="Max forward/backward speed (m/s)"),
DeclareLaunchArgument(
"max_angular_vel", default_value="1.0",
description="Max turn rate (rad/s)"),
# Lost-target timeouts
DeclareLaunchArgument(
"lost_timeout", default_value="2.0",
description="Seconds after last /person/target before stopping"),
DeclareLaunchArgument(
"search_timeout", default_value="5.0",
description="Seconds after last /person/target before slow rotation search"),
DeclareLaunchArgument(
"search_angular_vel", default_value="0.3",
description="Rotation speed while searching for person (rad/s)"),
# Obstacle avoidance
DeclareLaunchArgument(
"obstacle_check_dist", default_value="0.5",
description="Forward look-ahead distance for obstacle check (m)"),
DeclareLaunchArgument(
"obstacle_threshold", default_value="70",
description="Costmap cell value treated as obstacle (Nav2 lethal=99)"),
DeclareLaunchArgument(
"robot_width", default_value="0.3",
description="Robot body width for obstacle corridor check (m)"),
# Control
DeclareLaunchArgument(
"control_rate", default_value="20.0",
description="Control loop frequency (Hz)"),
DeclareLaunchArgument(
"follow_enabled", default_value="true",
description="Master enable; set false to pause following without stopping node"),
OpaqueFunction(function=_launch_follower),
])

View File

@ -0,0 +1,30 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_follower</name>
<version>0.1.0</version>
<description>
Proportional person-following control loop for saltybot.
Subscribes to /person/target (PoseStamped in base_link frame) and publishes
/cmd_vel with velocity limits, dead zone, obstacle avoidance, and lost-target
state machine (stop → search). Requires cmd_vel_bridge_node for safety wrapping.
</description>
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>geometry_msgs</depend>
<depend>nav_msgs</depend>
<depend>std_msgs</depend>
<buildtool_depend>ament_python</buildtool_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,314 @@
"""
person_follower_node.py Proportional person-following control loop.
Subscribes:
/person/target (geometry_msgs/PoseStamped) person pose in base_link frame
/local_costmap/costmap (nav_msgs/OccupancyGrid) Nav2 local obstacle map
Publishes:
/cmd_vel (geometry_msgs/Twist) velocity commands
State machine
FOLLOWING person visible (last detection < lost_timeout ago)
STOPPING person lost, publishing zero until search_timeout
SEARCHING person lost > search_timeout, slow rotation to re-acquire
Safety wiring
cmd_vel bridge (PR #46) applies ramp + deadman + STM32 AUTONOMOUS mode gate —
this node publishes raw /cmd_vel, the bridge handles hardware safety.
follow_enabled param (default True) lets the operator disable the controller
at runtime: ros2 param set /person_follower follow_enabled false
Obstacle override: if Nav2 local costmap shows a lethal cell in the forward
corridor, forward cmd_vel is zeroed; turning is still allowed.
Usage
ros2 launch saltybot_follower person_follower.launch.py
ros2 launch saltybot_follower person_follower.launch.py follow_distance:=1.2
"""
import math
import time
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import PoseStamped, Twist
from nav_msgs.msg import OccupancyGrid
# ── State constants ────────────────────────────────────────────────────────────
_FOLLOWING = "following"
_STOPPING = "stopping" # person lost < search_timeout, publishing zero
_SEARCHING = "searching" # person lost > search_timeout, slow rotation
# ── Pure helper functions (also exercised by unit tests) ──────────────────────
def clamp(v: float, lo: float, hi: float) -> float:
return max(lo, min(hi, v))
def compute_distance(px: float, py: float) -> float:
return math.sqrt(px * px + py * py)
def compute_bearing(px: float, py: float) -> float:
"""Bearing to point (px, py) in base_link: positive = person to the left."""
return math.atan2(py, px)
def compute_follow_cmd(
px: float,
py: float,
follow_distance: float,
dead_zone: float,
kp_linear: float,
kp_angular: float,
max_linear_vel: float,
max_angular_vel: float,
obstacle_ahead: bool = False,
) -> tuple:
"""
Pure proportional follow controller.
Returns (linear_x, angular_z) as floats.
linear_x:
Proportional to signed distance error (distance follow_distance).
Zero inside the dead zone. Clamped to ±max_linear_vel.
Zeroed (not reversed) if obstacle_ahead and moving forward.
angular_z:
Proportional to bearing. Always corrects even when inside the dead zone,
so the robot stays pointed at the person while holding distance.
Clamped to ±max_angular_vel.
"""
distance = compute_distance(px, py)
bearing = compute_bearing(px, py)
# Linear — dead zone suppresses jitter at target distance
dist_error = distance - follow_distance
if abs(dist_error) > dead_zone:
linear_x = clamp(kp_linear * dist_error, -max_linear_vel, max_linear_vel)
else:
linear_x = 0.0
# Obstacle override: suppress forward motion, allow turning
if obstacle_ahead and linear_x > 0.0:
linear_x = 0.0
# Angular — always correct bearing
angular_z = clamp(kp_angular * bearing, -max_angular_vel, max_angular_vel)
return linear_x, angular_z
def check_obstacle_in_costmap(
costmap_data: list,
width: int,
height: int,
resolution: float,
check_dist: float,
robot_half_width: float,
threshold: int,
) -> bool:
"""
Check for obstacle cells in the robot's forward corridor.
Assumes a Nav2 rolling-window local costmap (robot centre cell).
Forward = +col direction (robot +X).
costmap_data : flat row-major int8 list (from OccupancyGrid.data)
width, height: costmap dimensions in cells
resolution : metres per cell
check_dist : how far ahead to look (metres)
robot_half_width: corridor half-width (metres)
threshold : costmap value this is treated as obstacle (Nav2 lethal=99)
"""
if resolution <= 0.0:
return False
cx = width // 2
cy = height // 2
n_fwd = int(check_dist / resolution)
n_side = int(robot_half_width / resolution)
for dx in range(1, n_fwd + 1):
col = cx + dx
for dy in range(-n_side, n_side + 1):
row = cy + dy
if row < 0 or row >= height or col < 0 or col >= width:
continue
idx = row * width + col
if idx < len(costmap_data) and costmap_data[idx] >= threshold:
return True
return False
# ── ROS2 Node ─────────────────────────────────────────────────────────────────
class PersonFollowerNode(Node):
def __init__(self):
super().__init__("person_follower")
# ── Parameters ────────────────────────────────────────────────────────
self.declare_parameter("follow_distance", 1.5)
self.declare_parameter("dead_zone", 0.3)
self.declare_parameter("kp_linear", 0.6)
self.declare_parameter("kp_angular", 1.2)
self.declare_parameter("max_linear_vel", 0.5)
self.declare_parameter("max_angular_vel", 1.0)
self.declare_parameter("lost_timeout", 2.0)
self.declare_parameter("search_timeout", 5.0)
self.declare_parameter("search_angular_vel", 0.3)
self.declare_parameter("obstacle_check_dist", 0.5)
self.declare_parameter("obstacle_threshold", 70)
self.declare_parameter("robot_width", 0.3)
self.declare_parameter("control_rate", 20.0)
self.declare_parameter("follow_enabled", True)
self._p = {}
self._reload_params()
# ── State ─────────────────────────────────────────────────────────────
self._state = _STOPPING
self._last_target_t = 0.0 # monotonic clock; 0 = never received
self._person_x = 0.0
self._person_y = 0.0
self._obstacle_ahead = False
# ── Subscriptions ─────────────────────────────────────────────────────
self.create_subscription(
PoseStamped, "/person/target", self._target_cb, 10)
self.create_subscription(
OccupancyGrid, "/local_costmap/costmap", self._costmap_cb, 1)
# ── Publisher ─────────────────────────────────────────────────────────
self._cmd_pub = self.create_publisher(Twist, "/cmd_vel", 10)
# ── Control timer ─────────────────────────────────────────────────────
self._timer = self.create_timer(
1.0 / self._p["control_rate"], self._control_cb)
self.get_logger().info(
f"PersonFollower ready follow_dist={self._p['follow_distance']}m "
f"dead_zone=±{self._p['dead_zone']}m "
f"max_vel={self._p['max_linear_vel']}m/s "
f"lost_timeout={self._p['lost_timeout']}s"
)
# ── Parameter helper ──────────────────────────────────────────────────────
def _reload_params(self):
self._p = {
"follow_distance": self.get_parameter("follow_distance").value,
"dead_zone": self.get_parameter("dead_zone").value,
"kp_linear": self.get_parameter("kp_linear").value,
"kp_angular": self.get_parameter("kp_angular").value,
"max_linear_vel": self.get_parameter("max_linear_vel").value,
"max_angular_vel": self.get_parameter("max_angular_vel").value,
"lost_timeout": self.get_parameter("lost_timeout").value,
"search_timeout": self.get_parameter("search_timeout").value,
"search_angular_vel": self.get_parameter("search_angular_vel").value,
"obstacle_check_dist": self.get_parameter("obstacle_check_dist").value,
"obstacle_threshold": self.get_parameter("obstacle_threshold").value,
"robot_width": self.get_parameter("robot_width").value,
"control_rate": self.get_parameter("control_rate").value,
"follow_enabled": self.get_parameter("follow_enabled").value,
}
# ── Callbacks ─────────────────────────────────────────────────────────────
def _target_cb(self, msg):
"""Person position in base_link frame."""
self._person_x = msg.pose.position.x
self._person_y = msg.pose.position.y
self._last_target_t = time.monotonic()
self._state = _FOLLOWING
def _costmap_cb(self, msg):
self._obstacle_ahead = check_obstacle_in_costmap(
costmap_data=list(msg.data),
width=msg.info.width,
height=msg.info.height,
resolution=msg.info.resolution,
check_dist=self._p["obstacle_check_dist"],
robot_half_width=self._p["robot_width"] / 2.0,
threshold=self._p["obstacle_threshold"],
)
# ── Control loop ──────────────────────────────────────────────────────────
def _control_cb(self):
# Re-read mutable params every tick (cheap dict lookup)
self._reload_params()
twist = Twist()
if not self._p["follow_enabled"]:
self._cmd_pub.publish(twist)
return
now = time.monotonic()
dt_lost = (now - self._last_target_t) if self._last_target_t > 0.0 else 1e9
# State transitions (don't transition back from STOPPING/SEARCHING
# here — _target_cb resets to FOLLOWING on new detection)
if self._state == _FOLLOWING:
if dt_lost > self._p["search_timeout"]:
self._state = _SEARCHING
self.get_logger().warn("Person lost — entering SEARCHING state")
elif dt_lost > self._p["lost_timeout"]:
self._state = _STOPPING
self.get_logger().warn("Person lost — stopping")
linear_x, angular_z = 0.0, 0.0
if self._state == _FOLLOWING:
linear_x, angular_z = compute_follow_cmd(
px=self._person_x,
py=self._person_y,
follow_distance=self._p["follow_distance"],
dead_zone=self._p["dead_zone"],
kp_linear=self._p["kp_linear"],
kp_angular=self._p["kp_angular"],
max_linear_vel=self._p["max_linear_vel"],
max_angular_vel=self._p["max_angular_vel"],
obstacle_ahead=self._obstacle_ahead,
)
if self._obstacle_ahead and linear_x == 0.0:
self.get_logger().warn(
"Obstacle in path — forward motion suppressed",
throttle_duration_sec=2.0,
)
elif self._state == _SEARCHING:
angular_z = float(self._p["search_angular_vel"])
# linear_x stays 0 — rotate in place to search
# _STOPPING: both stay 0
twist.linear.x = linear_x
twist.angular.z = angular_z
self._cmd_pub.publish(twist)
# ── Entry point ───────────────────────────────────────────────────────────────
def main(args=None):
rclpy.init(args=args)
node = PersonFollowerNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.try_shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/saltybot_follower
[install]
install_scripts=$base/lib/saltybot_follower

View File

@ -0,0 +1,27 @@
from setuptools import setup
package_name = "saltybot_follower"
setup(
name=package_name,
version="0.1.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(f"share/{package_name}/launch", ["launch/person_follower.launch.py"]),
(f"share/{package_name}/config", ["config/person_follower_params.yaml"]),
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="sl-controls",
maintainer_email="sl-controls@saltylab.local",
description="Proportional person-following controller for saltybot",
license="MIT",
tests_require=["pytest"],
entry_points={
"console_scripts": [
"person_follower_node = saltybot_follower.person_follower_node:main",
],
},
)

View File

@ -0,0 +1,390 @@
"""
Unit tests for person_follower_node logic.
No ROS2 runtime required tests pure functions mirrored from the node.
Run with: pytest jetson/ros2_ws/src/saltybot_follower/test/test_person_follower.py
"""
import math
import pytest
# ── Pure function mirrors (no ROS2 imports) ──────────────────────────────────
def _clamp(v, lo, hi):
return max(lo, min(hi, v))
def _compute_distance(px, py):
return math.sqrt(px * px + py * py)
def _compute_bearing(px, py):
"""Bearing to (px,py) in base_link: positive = person to the left."""
return math.atan2(py, px)
def _compute_follow_cmd(px, py,
follow_distance, dead_zone,
kp_linear, kp_angular,
max_linear_vel, max_angular_vel,
obstacle_ahead=False):
"""Mirror of PersonFollowerNode proportional controller."""
distance = _compute_distance(px, py)
bearing = _compute_bearing(px, py)
dist_error = distance - follow_distance
if abs(dist_error) > dead_zone:
linear_x = _clamp(kp_linear * dist_error, -max_linear_vel, max_linear_vel)
else:
linear_x = 0.0
if obstacle_ahead and linear_x > 0.0:
linear_x = 0.0
angular_z = _clamp(kp_angular * bearing, -max_angular_vel, max_angular_vel)
return linear_x, angular_z
def _check_obstacle(costmap_data, width, height, resolution,
check_dist, robot_half_width, threshold):
"""Mirror of check_obstacle_in_costmap."""
if resolution <= 0.0:
return False
cx = width // 2
cy = height // 2
n_fwd = int(check_dist / resolution)
n_side = int(robot_half_width / resolution)
for dx in range(1, n_fwd + 1):
col = cx + dx
for dy in range(-n_side, n_side + 1):
row = cy + dy
if row < 0 or row >= height or col < 0 or col >= width:
continue
idx = row * width + col
if idx < len(costmap_data) and costmap_data[idx] >= threshold:
return True
return False
# ── Costmap helpers ───────────────────────────────────────────────────────────
def _empty_costmap(width=20, height=20):
return [0] * (width * height)
def _costmap_with_obstacle(col, row, width=20, height=20, value=99):
data = [0] * (width * height)
data[row * width + col] = value
return data
# ── Default params ────────────────────────────────────────────────────────────
_DEF = dict(
follow_distance=1.5,
dead_zone=0.3,
kp_linear=0.6,
kp_angular=1.2,
max_linear_vel=0.5,
max_angular_vel=1.0,
)
# ── Clamp tests ───────────────────────────────────────────────────────────────
class TestClamp:
def test_within_range(self):
assert _clamp(0.5, 0.0, 1.0) == 0.5
def test_clamp_high(self):
assert _clamp(2.0, 0.0, 1.0) == 1.0
def test_clamp_low(self):
assert _clamp(-2.0, -1.0, 0.0) == -1.0
def test_at_boundary_high(self):
assert _clamp(1.0, 0.0, 1.0) == 1.0
def test_at_boundary_low(self):
assert _clamp(0.0, 0.0, 1.0) == 0.0
# ── Geometry tests ────────────────────────────────────────────────────────────
class TestGeometry:
def test_distance_straight_ahead(self):
assert _compute_distance(2.0, 0.0) == pytest.approx(2.0)
def test_distance_diagonal(self):
assert _compute_distance(3.0, 4.0) == pytest.approx(5.0)
def test_distance_behind(self):
assert _compute_distance(-2.0, 0.0) == pytest.approx(2.0)
def test_bearing_straight_ahead(self):
assert _compute_bearing(1.0, 0.0) == pytest.approx(0.0)
def test_bearing_to_the_left(self):
# +y = left → bearing = +π/2
assert _compute_bearing(0.0, 1.0) == pytest.approx(math.pi / 2)
def test_bearing_to_the_right(self):
# -y = right → bearing = -π/2
assert _compute_bearing(0.0, -1.0) == pytest.approx(-math.pi / 2)
def test_bearing_behind_left(self):
# (-x, +y) → ≈ +3π/4
assert _compute_bearing(-1.0, 1.0) == pytest.approx(3 * math.pi / 4)
# ── Follow command — linear axis ──────────────────────────────────────────────
class TestLinearCmd:
def test_at_exact_target_zero_linear(self):
lin, _ = _compute_follow_cmd(1.5, 0.0, **_DEF)
assert lin == pytest.approx(0.0)
def test_within_dead_zone_zero_linear(self):
# error = 1.6 - 1.5 = 0.1 < dead_zone=0.3
lin, _ = _compute_follow_cmd(1.6, 0.0, **_DEF)
assert lin == pytest.approx(0.0)
def test_outside_dead_zone_approach(self):
# error = 2.0 - 1.5 = 0.5 → 0.6 * 0.5 = 0.3 (below max_linear_vel=0.5)
lin, _ = _compute_follow_cmd(2.0, 0.0, **_DEF)
assert lin == pytest.approx(0.3)
def test_outside_dead_zone_retreat(self):
# error = 1.0 - 1.5 = -0.5 → 0.6 * -0.5 = -0.3 (below max_linear_vel=0.5)
lin, _ = _compute_follow_cmd(1.0, 0.0, **_DEF)
assert lin == pytest.approx(-0.3)
def test_linear_clamped_at_max(self):
# error = 3.5 - 1.5 = 2.0 → 0.6 * 2.0 = 1.2 → clamped to 0.5
lin, _ = _compute_follow_cmd(3.5, 0.0, **_DEF)
assert lin == pytest.approx(0.5)
def test_linear_clamped_at_min(self):
# error = 0.0 - 1.5 = -1.5 → 0.6 * -1.5 = -0.9 → clamped to -0.5
lin, _ = _compute_follow_cmd(0.0, 0.0, **_DEF)
assert lin == pytest.approx(-0.5)
def test_dead_zone_boundary_just_outside(self):
# error = 2.5 - 1.5 + 0.3 + 0.01 = 0.31 → should produce output
lin, _ = _compute_follow_cmd(1.5 + 0.3 + 0.01, 0.0, **_DEF)
assert lin > 0.0
def test_dead_zone_boundary_just_inside(self):
lin, _ = _compute_follow_cmd(1.5 + 0.29, 0.0, **_DEF)
assert lin == pytest.approx(0.0)
# ── Follow command — angular axis ─────────────────────────────────────────────
class TestAngularCmd:
def test_person_ahead_zero_angular(self):
_, ang = _compute_follow_cmd(2.0, 0.0, **_DEF)
assert ang == pytest.approx(0.0)
def test_angular_proportional_to_bearing(self):
_, ang = _compute_follow_cmd(1.5, 0.5, **_DEF)
expected = _clamp(1.2 * _compute_bearing(1.5, 0.5), -1.0, 1.0)
assert ang == pytest.approx(expected, abs=1e-4)
def test_angular_clamped_at_max(self):
# Person far left: bearing ≈ π/2 → 1.2*π/2 >> 1.0 → clamped
_, ang = _compute_follow_cmd(2.0, 10.0, **_DEF)
assert ang == pytest.approx(1.0)
def test_angular_clamped_at_min(self):
_, ang = _compute_follow_cmd(2.0, -10.0, **_DEF)
assert ang == pytest.approx(-1.0)
def test_angular_corrects_within_dead_zone(self):
# Even at target distance (linear=0), bearing should still be corrected
lin, ang = _compute_follow_cmd(1.5, 0.5, **_DEF)
assert lin == pytest.approx(0.0) # dead zone
assert ang != pytest.approx(0.0) # bearing correction active
def test_angular_corrects_when_retreating(self):
# Robot too close and person is off to the side — should turn AND reverse
lin, ang = _compute_follow_cmd(0.5, 0.5, **_DEF)
assert lin < 0.0 # retreating
assert ang != pytest.approx(0.0)
# ── Obstacle override tests ───────────────────────────────────────────────────
class TestObstacleOverride:
def test_obstacle_suppresses_forward_motion(self):
lin, _ = _compute_follow_cmd(2.5, 0.0, obstacle_ahead=True, **_DEF)
assert lin == pytest.approx(0.0)
def test_obstacle_does_not_suppress_reverse(self):
# Too close (1.0m) → robot reverses; obstacle should NOT suppress reverse
# error = 1.0 - 1.5 = -0.5 → kp * error = -0.3 (negative = reverse)
lin, _ = _compute_follow_cmd(1.0, 0.0, obstacle_ahead=True, **_DEF)
assert lin == pytest.approx(-0.3)
def test_obstacle_allows_turning(self):
lin, ang = _compute_follow_cmd(2.5, 2.0, obstacle_ahead=True, **_DEF)
assert lin == pytest.approx(0.0) # suppressed
assert ang != pytest.approx(0.0) # turning still active
def test_no_obstacle_normal_forward(self):
# error = 2.0 - 1.5 = 0.5 → 0.6 * 0.5 = 0.3
lin, _ = _compute_follow_cmd(2.0, 0.0, obstacle_ahead=False, **_DEF)
assert lin == pytest.approx(0.3)
# ── Costmap obstacle check tests ──────────────────────────────────────────────
class TestCostmapCheck:
_RES = 0.05 # 5 cm/cell
_W = 20
_H = 20
def _check(self, data, *, check_dist=0.5, half_width=0.15, threshold=70):
return _check_obstacle(
costmap_data=data,
width=self._W,
height=self._H,
resolution=self._RES,
check_dist=check_dist,
robot_half_width=half_width,
threshold=threshold,
)
def test_empty_costmap_no_obstacle(self):
assert self._check(_empty_costmap(self._W, self._H)) is False
def test_obstacle_one_cell_ahead_detected(self):
# Robot centre col=10; dx=1 → col=11, row=10 (centreline)
data = _costmap_with_obstacle(11, 10, self._W, self._H, value=99)
assert self._check(data) is True
def test_obstacle_at_check_dist_boundary_detected(self):
# check_dist=0.5 / 0.05 = 10 cells → dx=1..10 → cols 11..20
# Width=20 means valid cols 0..19; col 20 is out of bounds.
# Last detectable col = cx + 9 = 10 + 9 = 19 (dx=9 within range(1,11))
data = _costmap_with_obstacle(19, 10, self._W, self._H, value=99)
assert self._check(data) is True
def test_obstacle_behind_robot_not_detected(self):
data = _costmap_with_obstacle(5, 10, self._W, self._H, value=99)
assert self._check(data) is False
def test_obstacle_within_corridor_side_detected(self):
# half_width=0.15/0.05=3 cells; row=10+2=12 is inside corridor
data = _costmap_with_obstacle(12, 12, self._W, self._H, value=99)
assert self._check(data) is True
def test_obstacle_outside_corridor_not_detected(self):
# row=10+4=14 is outside 3-cell corridor
data = _costmap_with_obstacle(12, 14, self._W, self._H, value=99)
assert self._check(data) is False
def test_value_below_threshold_not_detected(self):
data = _costmap_with_obstacle(11, 10, self._W, self._H, value=50)
assert self._check(data) is False
def test_value_at_threshold_detected(self):
data = _costmap_with_obstacle(11, 10, self._W, self._H, value=70)
assert self._check(data) is True
def test_zero_resolution_safe_returns_false(self):
assert _check_obstacle(
_empty_costmap(), width=20, height=20, resolution=0.0,
check_dist=0.5, robot_half_width=0.15, threshold=70) is False
def test_lethal_threshold_99_detected(self):
data = _costmap_with_obstacle(11, 10, self._W, self._H, value=100)
assert self._check(data, threshold=99) is True
# ── State machine tests ───────────────────────────────────────────────────────
class TestStateMachine:
_FOLLOWING = "following"
_STOPPING = "stopping"
_SEARCHING = "searching"
def _transition(self, state, dt_lost, lost_timeout=2.0, search_timeout=5.0):
"""Mirror of _control_cb state transition block."""
if state == self._FOLLOWING:
if dt_lost > search_timeout:
return self._SEARCHING
elif dt_lost > lost_timeout:
return self._STOPPING
return state
def test_following_stays_within_lost_timeout(self):
assert self._transition(self._FOLLOWING, dt_lost=1.0) == self._FOLLOWING
def test_following_to_stopping_past_lost_timeout(self):
assert self._transition(self._FOLLOWING, dt_lost=2.5) == self._STOPPING
def test_following_to_searching_past_search_timeout(self):
assert self._transition(self._FOLLOWING, dt_lost=6.0) == self._SEARCHING
def test_stopping_does_not_transition(self):
# Only _target_cb can move out of STOPPING
assert self._transition(self._STOPPING, dt_lost=3.0) == self._STOPPING
def test_searching_does_not_transition(self):
assert self._transition(self._SEARCHING, dt_lost=10.0) == self._SEARCHING
def test_boundary_exactly_at_lost_timeout(self):
# dt_lost == 2.0 is NOT > 2.0 — still FOLLOWING
assert self._transition(self._FOLLOWING, dt_lost=2.0) == self._FOLLOWING
def test_boundary_exactly_at_search_timeout(self):
# dt_lost=5.0: NOT > search_timeout=5.0, but IS > lost_timeout=2.0 → STOPPING
assert self._transition(self._FOLLOWING, dt_lost=5.0) == self._STOPPING
def test_before_first_detection_initial_state_is_stopping(self):
# Initial state is STOPPING (not FOLLOWING) so no transition occurs
assert self._transition(self._STOPPING, dt_lost=1e9) == self._STOPPING
def test_new_target_resets_to_following(self):
# _target_cb sets state = FOLLOWING unconditionally
state = self._SEARCHING
# simulate _target_cb
state = self._FOLLOWING
assert state == self._FOLLOWING
# ── Search and stop behaviour ─────────────────────────────────────────────────
class TestSearchBehaviour:
def test_searching_publishes_angular_only(self):
state = "searching"
search_angular_vel = 0.3
linear_x = 0.0 if state == "searching" else 99.9
angular_z = search_angular_vel if state == "searching" else 0.0
assert linear_x == pytest.approx(0.0)
assert angular_z == pytest.approx(0.3)
def test_stopping_publishes_zero(self):
state = "stopping"
linear_x = 0.0
angular_z = 0.0
assert linear_x == pytest.approx(0.0)
assert angular_z == pytest.approx(0.0)
# ── Follow-enabled gate ───────────────────────────────────────────────────────
class TestFollowEnabled:
def test_disabled_zeroes_output(self):
follow_enabled = False
lin, ang = (0.0, 0.0) if not follow_enabled else \
_compute_follow_cmd(3.0, 0.0, **_DEF)
assert lin == pytest.approx(0.0)
assert ang == pytest.approx(0.0)
def test_enabled_passes_through(self):
follow_enabled = True
lin, ang = (0.0, 0.0) if not follow_enabled else \
_compute_follow_cmd(3.0, 0.0, **_DEF)
assert lin != pytest.approx(0.0)