feat: person-following control loop #55
@ -0,0 +1,74 @@
|
||||
# person_follower_params.yaml
|
||||
# Configuration for person_follower_node — proportional person-following controller.
|
||||
#
|
||||
# Run with:
|
||||
# ros2 launch saltybot_follower person_follower.launch.py
|
||||
# Or override individual params:
|
||||
# ros2 launch saltybot_follower person_follower.launch.py follow_distance:=1.2
|
||||
#
|
||||
# IMPORTANT: This node publishes raw /cmd_vel. The cmd_vel_bridge_node (PR #46)
|
||||
# applies the ESC ramp, deadman switch, and STM32 AUTONOMOUS mode gate.
|
||||
# Do not run this node without the cmd_vel bridge running on the same robot.
|
||||
|
||||
# ── Follow geometry ────────────────────────────────────────────────────────────
|
||||
# The target distance to maintain behind the person (metres).
|
||||
# Decrease for tighter following; increase for more standoff.
|
||||
follow_distance: 1.5 # metres
|
||||
|
||||
# ±dead_zone around follow_distance — no linear velocity command in this band.
|
||||
# Prevents jitter when the robot is approximately at target distance.
|
||||
# Must be > position noise of /person/target detector.
|
||||
dead_zone: 0.3 # metres
|
||||
|
||||
# ── Proportional gains ────────────────────────────────────────────────────────
|
||||
# linear.x = clamp(kp_linear * (distance - follow_distance), ±max_linear_vel)
|
||||
# angular.z = clamp(kp_angular * bearing, ±max_angular_vel)
|
||||
#
|
||||
# kp_linear: higher → faster approach/retreat but more oscillation
|
||||
# kp_angular: higher → tighter heading tracking but may cause weaving
|
||||
kp_linear: 0.6
|
||||
kp_angular: 1.2
|
||||
|
||||
# ── Velocity limits ────────────────────────────────────────────────────────────
|
||||
# Hard caps on the cmd_vel output of this node.
|
||||
# The cmd_vel bridge ALSO clamps (max_linear_vel=0.5, max_angular_vel=2.0),
|
||||
# so these limits should be ≤ bridge limits for this node to have effect.
|
||||
#
|
||||
# Balance robot: keep max_linear_vel ≤ 0.5 m/s until balance is well-tuned.
|
||||
max_linear_vel: 0.5 # m/s
|
||||
max_angular_vel: 1.0 # rad/s (conservative — avoid spinning out)
|
||||
|
||||
# ── Lost-target timeouts ──────────────────────────────────────────────────────
|
||||
# lost_timeout: seconds after last /person/target before entering STOPPING
|
||||
# (publishing zero cmd_vel — cmd_vel bridge deadman will also kick in).
|
||||
lost_timeout: 2.0 # seconds
|
||||
|
||||
# search_timeout: seconds after last /person/target before entering SEARCHING
|
||||
# (slow rotation to re-acquire person).
|
||||
search_timeout: 5.0 # seconds
|
||||
|
||||
# Angular velocity while in SEARCHING state (rad/s, positive = CCW/left).
|
||||
# Keep small — robot is on a balance platform.
|
||||
search_angular_vel: 0.3 # rad/s
|
||||
|
||||
# ── Obstacle avoidance ────────────────────────────────────────────────────────
|
||||
# The node checks a forward corridor in the Nav2 local costmap.
|
||||
# If any cell >= obstacle_threshold, forward cmd_vel is zeroed (turning allowed).
|
||||
#
|
||||
# obstacle_check_dist: how far ahead to check (metres)
|
||||
# robot_width: corridor width for obstacle check (metres)
|
||||
# obstacle_threshold: costmap cell value treated as obstacle
|
||||
# Nav2 conventions: free=0, inscribed=99, lethal=100, unknown=-1
|
||||
# 70 = "probably occupied" — conservative; raise to 99 for less cautious
|
||||
obstacle_check_dist: 0.5 # metres
|
||||
robot_width: 0.3 # metres (corridor half-width = 0.15m each side)
|
||||
obstacle_threshold: 70 # costmap value ≥ this → obstacle
|
||||
|
||||
# ── Control loop ──────────────────────────────────────────────────────────────
|
||||
control_rate: 20.0 # Hz — lower than cmd_vel bridge (50Hz) by design
|
||||
|
||||
# ── Mode integration ──────────────────────────────────────────────────────────
|
||||
# Master enable for the follow controller. When false, node publishes zero cmd_vel.
|
||||
# Toggle at runtime: ros2 param set /person_follower follow_enabled false
|
||||
# The cmd_vel bridge independently gates on STM32 AUTONOMOUS mode (md=2).
|
||||
follow_enabled: true
|
||||
@ -0,0 +1,137 @@
|
||||
"""
|
||||
person_follower.launch.py — Launch the proportional person-following controller.
|
||||
|
||||
The person_follower_node subscribes to /person/target (PoseStamped in base_link
|
||||
frame) published by sl-jetson's person detector and drives /cmd_vel accordingly.
|
||||
|
||||
PREREQUISITES (must be running before this launch):
|
||||
1. cmd_vel_bridge_node (saltybot_bridge) — serial port owner, safety wrapping
|
||||
2. sl-jetson's person_detector node — publishes /person/target
|
||||
3. Nav2 local costmap — publishes /local_costmap/costmap
|
||||
(optional: obstacle avoidance is skipped if no costmap messages arrive)
|
||||
|
||||
Usage:
|
||||
# Defaults (1.5m follow distance, 0.5m/s max, 2s lost timeout):
|
||||
ros2 launch saltybot_follower person_follower.launch.py
|
||||
|
||||
# Closer following for indoor tight spaces:
|
||||
ros2 launch saltybot_follower person_follower.launch.py follow_distance:=1.0
|
||||
|
||||
# Disable obstacle avoidance (for environments without Nav2 costmap):
|
||||
ros2 launch saltybot_follower person_follower.launch.py obstacle_threshold:=100
|
||||
|
||||
# Load all params from YAML:
|
||||
ros2 launch saltybot_follower person_follower.launch.py \\
|
||||
params_file:=/path/to/person_follower_params.yaml
|
||||
|
||||
Safety note:
|
||||
To disable at runtime without stopping the node:
|
||||
ros2 param set /person_follower follow_enabled false
|
||||
"""
|
||||
|
||||
import os
|
||||
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
from launch import LaunchDescription
|
||||
from launch.actions import DeclareLaunchArgument, OpaqueFunction
|
||||
from launch.substitutions import LaunchConfiguration
|
||||
from launch_ros.actions import Node
|
||||
|
||||
|
||||
def _launch_follower(context, *args, **kwargs):
|
||||
params_file = LaunchConfiguration("params_file").perform(context)
|
||||
|
||||
inline_params = {
|
||||
"follow_distance": float(LaunchConfiguration("follow_distance").perform(context)),
|
||||
"dead_zone": float(LaunchConfiguration("dead_zone").perform(context)),
|
||||
"kp_linear": float(LaunchConfiguration("kp_linear").perform(context)),
|
||||
"kp_angular": float(LaunchConfiguration("kp_angular").perform(context)),
|
||||
"max_linear_vel": float(LaunchConfiguration("max_linear_vel").perform(context)),
|
||||
"max_angular_vel": float(LaunchConfiguration("max_angular_vel").perform(context)),
|
||||
"lost_timeout": float(LaunchConfiguration("lost_timeout").perform(context)),
|
||||
"search_timeout": float(LaunchConfiguration("search_timeout").perform(context)),
|
||||
"search_angular_vel": float(LaunchConfiguration("search_angular_vel").perform(context)),
|
||||
"obstacle_check_dist": float(LaunchConfiguration("obstacle_check_dist").perform(context)),
|
||||
"obstacle_threshold": int(LaunchConfiguration("obstacle_threshold").perform(context)),
|
||||
"robot_width": float(LaunchConfiguration("robot_width").perform(context)),
|
||||
"control_rate": float(LaunchConfiguration("control_rate").perform(context)),
|
||||
"follow_enabled": LaunchConfiguration("follow_enabled").perform(context).lower() == "true",
|
||||
}
|
||||
|
||||
node_params = [params_file, inline_params] if params_file else [inline_params]
|
||||
|
||||
return [Node(
|
||||
package="saltybot_follower",
|
||||
executable="person_follower_node",
|
||||
name="person_follower",
|
||||
output="screen",
|
||||
parameters=node_params,
|
||||
)]
|
||||
|
||||
|
||||
def generate_launch_description():
|
||||
pkg_share = get_package_share_directory("saltybot_follower")
|
||||
default_params = os.path.join(pkg_share, "config", "person_follower_params.yaml")
|
||||
|
||||
return LaunchDescription([
|
||||
DeclareLaunchArgument(
|
||||
"params_file",
|
||||
default_value=default_params,
|
||||
description="Full path to person_follower_params.yaml (overrides inline args)"),
|
||||
|
||||
# Follow geometry
|
||||
DeclareLaunchArgument(
|
||||
"follow_distance", default_value="1.5",
|
||||
description="Target follow distance (m)"),
|
||||
DeclareLaunchArgument(
|
||||
"dead_zone", default_value="0.3",
|
||||
description="Dead zone around follow_distance — no linear cmd (m)"),
|
||||
|
||||
# Gains
|
||||
DeclareLaunchArgument(
|
||||
"kp_linear", default_value="0.6",
|
||||
description="Proportional gain: distance error → linear.x"),
|
||||
DeclareLaunchArgument(
|
||||
"kp_angular", default_value="1.2",
|
||||
description="Proportional gain: bearing error → angular.z"),
|
||||
|
||||
# Velocity limits
|
||||
DeclareLaunchArgument(
|
||||
"max_linear_vel", default_value="0.5",
|
||||
description="Max forward/backward speed (m/s)"),
|
||||
DeclareLaunchArgument(
|
||||
"max_angular_vel", default_value="1.0",
|
||||
description="Max turn rate (rad/s)"),
|
||||
|
||||
# Lost-target timeouts
|
||||
DeclareLaunchArgument(
|
||||
"lost_timeout", default_value="2.0",
|
||||
description="Seconds after last /person/target before stopping"),
|
||||
DeclareLaunchArgument(
|
||||
"search_timeout", default_value="5.0",
|
||||
description="Seconds after last /person/target before slow rotation search"),
|
||||
DeclareLaunchArgument(
|
||||
"search_angular_vel", default_value="0.3",
|
||||
description="Rotation speed while searching for person (rad/s)"),
|
||||
|
||||
# Obstacle avoidance
|
||||
DeclareLaunchArgument(
|
||||
"obstacle_check_dist", default_value="0.5",
|
||||
description="Forward look-ahead distance for obstacle check (m)"),
|
||||
DeclareLaunchArgument(
|
||||
"obstacle_threshold", default_value="70",
|
||||
description="Costmap cell value treated as obstacle (Nav2 lethal=99)"),
|
||||
DeclareLaunchArgument(
|
||||
"robot_width", default_value="0.3",
|
||||
description="Robot body width for obstacle corridor check (m)"),
|
||||
|
||||
# Control
|
||||
DeclareLaunchArgument(
|
||||
"control_rate", default_value="20.0",
|
||||
description="Control loop frequency (Hz)"),
|
||||
DeclareLaunchArgument(
|
||||
"follow_enabled", default_value="true",
|
||||
description="Master enable; set false to pause following without stopping node"),
|
||||
|
||||
OpaqueFunction(function=_launch_follower),
|
||||
])
|
||||
30
jetson/ros2_ws/src/saltybot_follower/package.xml
Normal file
30
jetson/ros2_ws/src/saltybot_follower/package.xml
Normal file
@ -0,0 +1,30 @@
|
||||
<?xml version="1.0"?>
|
||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="3">
|
||||
<name>saltybot_follower</name>
|
||||
<version>0.1.0</version>
|
||||
<description>
|
||||
Proportional person-following control loop for saltybot.
|
||||
Subscribes to /person/target (PoseStamped in base_link frame) and publishes
|
||||
/cmd_vel with velocity limits, dead zone, obstacle avoidance, and lost-target
|
||||
state machine (stop → search). Requires cmd_vel_bridge_node for safety wrapping.
|
||||
</description>
|
||||
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
|
||||
<license>MIT</license>
|
||||
|
||||
<depend>rclpy</depend>
|
||||
<depend>geometry_msgs</depend>
|
||||
<depend>nav_msgs</depend>
|
||||
<depend>std_msgs</depend>
|
||||
|
||||
<buildtool_depend>ament_python</buildtool_depend>
|
||||
|
||||
<test_depend>ament_copyright</test_depend>
|
||||
<test_depend>ament_flake8</test_depend>
|
||||
<test_depend>ament_pep257</test_depend>
|
||||
<test_depend>python3-pytest</test_depend>
|
||||
|
||||
<export>
|
||||
<build_type>ament_python</build_type>
|
||||
</export>
|
||||
</package>
|
||||
@ -0,0 +1,314 @@
|
||||
"""
|
||||
person_follower_node.py — Proportional person-following control loop.
|
||||
|
||||
Subscribes:
|
||||
/person/target (geometry_msgs/PoseStamped) — person pose in base_link frame
|
||||
/local_costmap/costmap (nav_msgs/OccupancyGrid) — Nav2 local obstacle map
|
||||
|
||||
Publishes:
|
||||
/cmd_vel (geometry_msgs/Twist) — velocity commands
|
||||
|
||||
State machine
|
||||
─────────────
|
||||
FOLLOWING — person visible (last detection < lost_timeout ago)
|
||||
STOPPING — person lost, publishing zero until search_timeout
|
||||
SEARCHING — person lost > search_timeout, slow rotation to re-acquire
|
||||
|
||||
Safety wiring
|
||||
─────────────
|
||||
• cmd_vel bridge (PR #46) applies ramp + deadman + STM32 AUTONOMOUS mode gate —
|
||||
this node publishes raw /cmd_vel, the bridge handles hardware safety.
|
||||
• follow_enabled param (default True) lets the operator disable the controller
|
||||
at runtime: ros2 param set /person_follower follow_enabled false
|
||||
• Obstacle override: if Nav2 local costmap shows a lethal cell in the forward
|
||||
corridor, forward cmd_vel is zeroed; turning is still allowed.
|
||||
|
||||
Usage
|
||||
─────
|
||||
ros2 launch saltybot_follower person_follower.launch.py
|
||||
ros2 launch saltybot_follower person_follower.launch.py follow_distance:=1.2
|
||||
"""
|
||||
|
||||
import math
|
||||
import time
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from geometry_msgs.msg import PoseStamped, Twist
|
||||
from nav_msgs.msg import OccupancyGrid
|
||||
|
||||
# ── State constants ────────────────────────────────────────────────────────────
|
||||
_FOLLOWING = "following"
|
||||
_STOPPING = "stopping" # person lost < search_timeout, publishing zero
|
||||
_SEARCHING = "searching" # person lost > search_timeout, slow rotation
|
||||
|
||||
|
||||
# ── Pure helper functions (also exercised by unit tests) ──────────────────────
|
||||
|
||||
def clamp(v: float, lo: float, hi: float) -> float:
|
||||
return max(lo, min(hi, v))
|
||||
|
||||
|
||||
def compute_distance(px: float, py: float) -> float:
|
||||
return math.sqrt(px * px + py * py)
|
||||
|
||||
|
||||
def compute_bearing(px: float, py: float) -> float:
|
||||
"""Bearing to point (px, py) in base_link: positive = person to the left."""
|
||||
return math.atan2(py, px)
|
||||
|
||||
|
||||
def compute_follow_cmd(
|
||||
px: float,
|
||||
py: float,
|
||||
follow_distance: float,
|
||||
dead_zone: float,
|
||||
kp_linear: float,
|
||||
kp_angular: float,
|
||||
max_linear_vel: float,
|
||||
max_angular_vel: float,
|
||||
obstacle_ahead: bool = False,
|
||||
) -> tuple:
|
||||
"""
|
||||
Pure proportional follow controller.
|
||||
|
||||
Returns (linear_x, angular_z) as floats.
|
||||
|
||||
linear_x:
|
||||
Proportional to signed distance error (distance − follow_distance).
|
||||
Zero inside the dead zone. Clamped to ±max_linear_vel.
|
||||
Zeroed (not reversed) if obstacle_ahead and moving forward.
|
||||
|
||||
angular_z:
|
||||
Proportional to bearing. Always corrects even when inside the dead zone,
|
||||
so the robot stays pointed at the person while holding distance.
|
||||
Clamped to ±max_angular_vel.
|
||||
"""
|
||||
distance = compute_distance(px, py)
|
||||
bearing = compute_bearing(px, py)
|
||||
|
||||
# Linear — dead zone suppresses jitter at target distance
|
||||
dist_error = distance - follow_distance
|
||||
if abs(dist_error) > dead_zone:
|
||||
linear_x = clamp(kp_linear * dist_error, -max_linear_vel, max_linear_vel)
|
||||
else:
|
||||
linear_x = 0.0
|
||||
|
||||
# Obstacle override: suppress forward motion, allow turning
|
||||
if obstacle_ahead and linear_x > 0.0:
|
||||
linear_x = 0.0
|
||||
|
||||
# Angular — always correct bearing
|
||||
angular_z = clamp(kp_angular * bearing, -max_angular_vel, max_angular_vel)
|
||||
|
||||
return linear_x, angular_z
|
||||
|
||||
|
||||
def check_obstacle_in_costmap(
|
||||
costmap_data: list,
|
||||
width: int,
|
||||
height: int,
|
||||
resolution: float,
|
||||
check_dist: float,
|
||||
robot_half_width: float,
|
||||
threshold: int,
|
||||
) -> bool:
|
||||
"""
|
||||
Check for obstacle cells in the robot's forward corridor.
|
||||
|
||||
Assumes a Nav2 rolling-window local costmap (robot ≈ centre cell).
|
||||
Forward = +col direction (robot +X).
|
||||
|
||||
costmap_data : flat row-major int8 list (from OccupancyGrid.data)
|
||||
width, height: costmap dimensions in cells
|
||||
resolution : metres per cell
|
||||
check_dist : how far ahead to look (metres)
|
||||
robot_half_width: corridor half-width (metres)
|
||||
threshold : costmap value ≥ this is treated as obstacle (Nav2 lethal=99)
|
||||
"""
|
||||
if resolution <= 0.0:
|
||||
return False
|
||||
|
||||
cx = width // 2
|
||||
cy = height // 2
|
||||
|
||||
n_fwd = int(check_dist / resolution)
|
||||
n_side = int(robot_half_width / resolution)
|
||||
|
||||
for dx in range(1, n_fwd + 1):
|
||||
col = cx + dx
|
||||
for dy in range(-n_side, n_side + 1):
|
||||
row = cy + dy
|
||||
if row < 0 or row >= height or col < 0 or col >= width:
|
||||
continue
|
||||
idx = row * width + col
|
||||
if idx < len(costmap_data) and costmap_data[idx] >= threshold:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
# ── ROS2 Node ─────────────────────────────────────────────────────────────────
|
||||
|
||||
class PersonFollowerNode(Node):
|
||||
|
||||
def __init__(self):
|
||||
super().__init__("person_follower")
|
||||
|
||||
# ── Parameters ────────────────────────────────────────────────────────
|
||||
self.declare_parameter("follow_distance", 1.5)
|
||||
self.declare_parameter("dead_zone", 0.3)
|
||||
self.declare_parameter("kp_linear", 0.6)
|
||||
self.declare_parameter("kp_angular", 1.2)
|
||||
self.declare_parameter("max_linear_vel", 0.5)
|
||||
self.declare_parameter("max_angular_vel", 1.0)
|
||||
self.declare_parameter("lost_timeout", 2.0)
|
||||
self.declare_parameter("search_timeout", 5.0)
|
||||
self.declare_parameter("search_angular_vel", 0.3)
|
||||
self.declare_parameter("obstacle_check_dist", 0.5)
|
||||
self.declare_parameter("obstacle_threshold", 70)
|
||||
self.declare_parameter("robot_width", 0.3)
|
||||
self.declare_parameter("control_rate", 20.0)
|
||||
self.declare_parameter("follow_enabled", True)
|
||||
|
||||
self._p = {}
|
||||
self._reload_params()
|
||||
|
||||
# ── State ─────────────────────────────────────────────────────────────
|
||||
self._state = _STOPPING
|
||||
self._last_target_t = 0.0 # monotonic clock; 0 = never received
|
||||
self._person_x = 0.0
|
||||
self._person_y = 0.0
|
||||
self._obstacle_ahead = False
|
||||
|
||||
# ── Subscriptions ─────────────────────────────────────────────────────
|
||||
self.create_subscription(
|
||||
PoseStamped, "/person/target", self._target_cb, 10)
|
||||
self.create_subscription(
|
||||
OccupancyGrid, "/local_costmap/costmap", self._costmap_cb, 1)
|
||||
|
||||
# ── Publisher ─────────────────────────────────────────────────────────
|
||||
self._cmd_pub = self.create_publisher(Twist, "/cmd_vel", 10)
|
||||
|
||||
# ── Control timer ─────────────────────────────────────────────────────
|
||||
self._timer = self.create_timer(
|
||||
1.0 / self._p["control_rate"], self._control_cb)
|
||||
|
||||
self.get_logger().info(
|
||||
f"PersonFollower ready follow_dist={self._p['follow_distance']}m "
|
||||
f"dead_zone=±{self._p['dead_zone']}m "
|
||||
f"max_vel={self._p['max_linear_vel']}m/s "
|
||||
f"lost_timeout={self._p['lost_timeout']}s"
|
||||
)
|
||||
|
||||
# ── Parameter helper ──────────────────────────────────────────────────────
|
||||
|
||||
def _reload_params(self):
|
||||
self._p = {
|
||||
"follow_distance": self.get_parameter("follow_distance").value,
|
||||
"dead_zone": self.get_parameter("dead_zone").value,
|
||||
"kp_linear": self.get_parameter("kp_linear").value,
|
||||
"kp_angular": self.get_parameter("kp_angular").value,
|
||||
"max_linear_vel": self.get_parameter("max_linear_vel").value,
|
||||
"max_angular_vel": self.get_parameter("max_angular_vel").value,
|
||||
"lost_timeout": self.get_parameter("lost_timeout").value,
|
||||
"search_timeout": self.get_parameter("search_timeout").value,
|
||||
"search_angular_vel": self.get_parameter("search_angular_vel").value,
|
||||
"obstacle_check_dist": self.get_parameter("obstacle_check_dist").value,
|
||||
"obstacle_threshold": self.get_parameter("obstacle_threshold").value,
|
||||
"robot_width": self.get_parameter("robot_width").value,
|
||||
"control_rate": self.get_parameter("control_rate").value,
|
||||
"follow_enabled": self.get_parameter("follow_enabled").value,
|
||||
}
|
||||
|
||||
# ── Callbacks ─────────────────────────────────────────────────────────────
|
||||
|
||||
def _target_cb(self, msg):
|
||||
"""Person position in base_link frame."""
|
||||
self._person_x = msg.pose.position.x
|
||||
self._person_y = msg.pose.position.y
|
||||
self._last_target_t = time.monotonic()
|
||||
self._state = _FOLLOWING
|
||||
|
||||
def _costmap_cb(self, msg):
|
||||
self._obstacle_ahead = check_obstacle_in_costmap(
|
||||
costmap_data=list(msg.data),
|
||||
width=msg.info.width,
|
||||
height=msg.info.height,
|
||||
resolution=msg.info.resolution,
|
||||
check_dist=self._p["obstacle_check_dist"],
|
||||
robot_half_width=self._p["robot_width"] / 2.0,
|
||||
threshold=self._p["obstacle_threshold"],
|
||||
)
|
||||
|
||||
# ── Control loop ──────────────────────────────────────────────────────────
|
||||
|
||||
def _control_cb(self):
|
||||
# Re-read mutable params every tick (cheap dict lookup)
|
||||
self._reload_params()
|
||||
|
||||
twist = Twist()
|
||||
|
||||
if not self._p["follow_enabled"]:
|
||||
self._cmd_pub.publish(twist)
|
||||
return
|
||||
|
||||
now = time.monotonic()
|
||||
dt_lost = (now - self._last_target_t) if self._last_target_t > 0.0 else 1e9
|
||||
|
||||
# State transitions (don't transition back from STOPPING/SEARCHING
|
||||
# here — _target_cb resets to FOLLOWING on new detection)
|
||||
if self._state == _FOLLOWING:
|
||||
if dt_lost > self._p["search_timeout"]:
|
||||
self._state = _SEARCHING
|
||||
self.get_logger().warn("Person lost — entering SEARCHING state")
|
||||
elif dt_lost > self._p["lost_timeout"]:
|
||||
self._state = _STOPPING
|
||||
self.get_logger().warn("Person lost — stopping")
|
||||
|
||||
linear_x, angular_z = 0.0, 0.0
|
||||
|
||||
if self._state == _FOLLOWING:
|
||||
linear_x, angular_z = compute_follow_cmd(
|
||||
px=self._person_x,
|
||||
py=self._person_y,
|
||||
follow_distance=self._p["follow_distance"],
|
||||
dead_zone=self._p["dead_zone"],
|
||||
kp_linear=self._p["kp_linear"],
|
||||
kp_angular=self._p["kp_angular"],
|
||||
max_linear_vel=self._p["max_linear_vel"],
|
||||
max_angular_vel=self._p["max_angular_vel"],
|
||||
obstacle_ahead=self._obstacle_ahead,
|
||||
)
|
||||
if self._obstacle_ahead and linear_x == 0.0:
|
||||
self.get_logger().warn(
|
||||
"Obstacle in path — forward motion suppressed",
|
||||
throttle_duration_sec=2.0,
|
||||
)
|
||||
|
||||
elif self._state == _SEARCHING:
|
||||
angular_z = float(self._p["search_angular_vel"])
|
||||
# linear_x stays 0 — rotate in place to search
|
||||
|
||||
# _STOPPING: both stay 0
|
||||
|
||||
twist.linear.x = linear_x
|
||||
twist.angular.z = angular_z
|
||||
self._cmd_pub.publish(twist)
|
||||
|
||||
|
||||
# ── Entry point ───────────────────────────────────────────────────────────────
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = PersonFollowerNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.try_shutdown()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
4
jetson/ros2_ws/src/saltybot_follower/setup.cfg
Normal file
4
jetson/ros2_ws/src/saltybot_follower/setup.cfg
Normal file
@ -0,0 +1,4 @@
|
||||
[develop]
|
||||
script_dir=$base/lib/saltybot_follower
|
||||
[install]
|
||||
install_scripts=$base/lib/saltybot_follower
|
||||
27
jetson/ros2_ws/src/saltybot_follower/setup.py
Normal file
27
jetson/ros2_ws/src/saltybot_follower/setup.py
Normal file
@ -0,0 +1,27 @@
|
||||
from setuptools import setup
|
||||
|
||||
package_name = "saltybot_follower"
|
||||
|
||||
setup(
|
||||
name=package_name,
|
||||
version="0.1.0",
|
||||
packages=[package_name],
|
||||
data_files=[
|
||||
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
|
||||
(f"share/{package_name}", ["package.xml"]),
|
||||
(f"share/{package_name}/launch", ["launch/person_follower.launch.py"]),
|
||||
(f"share/{package_name}/config", ["config/person_follower_params.yaml"]),
|
||||
],
|
||||
install_requires=["setuptools"],
|
||||
zip_safe=True,
|
||||
maintainer="sl-controls",
|
||||
maintainer_email="sl-controls@saltylab.local",
|
||||
description="Proportional person-following controller for saltybot",
|
||||
license="MIT",
|
||||
tests_require=["pytest"],
|
||||
entry_points={
|
||||
"console_scripts": [
|
||||
"person_follower_node = saltybot_follower.person_follower_node:main",
|
||||
],
|
||||
},
|
||||
)
|
||||
@ -0,0 +1,390 @@
|
||||
"""
|
||||
Unit tests for person_follower_node logic.
|
||||
No ROS2 runtime required — tests pure functions mirrored from the node.
|
||||
Run with: pytest jetson/ros2_ws/src/saltybot_follower/test/test_person_follower.py
|
||||
"""
|
||||
|
||||
import math
|
||||
import pytest
|
||||
|
||||
|
||||
# ── Pure function mirrors (no ROS2 imports) ──────────────────────────────────
|
||||
|
||||
def _clamp(v, lo, hi):
|
||||
return max(lo, min(hi, v))
|
||||
|
||||
|
||||
def _compute_distance(px, py):
|
||||
return math.sqrt(px * px + py * py)
|
||||
|
||||
|
||||
def _compute_bearing(px, py):
|
||||
"""Bearing to (px,py) in base_link: positive = person to the left."""
|
||||
return math.atan2(py, px)
|
||||
|
||||
|
||||
def _compute_follow_cmd(px, py,
|
||||
follow_distance, dead_zone,
|
||||
kp_linear, kp_angular,
|
||||
max_linear_vel, max_angular_vel,
|
||||
obstacle_ahead=False):
|
||||
"""Mirror of PersonFollowerNode proportional controller."""
|
||||
distance = _compute_distance(px, py)
|
||||
bearing = _compute_bearing(px, py)
|
||||
|
||||
dist_error = distance - follow_distance
|
||||
if abs(dist_error) > dead_zone:
|
||||
linear_x = _clamp(kp_linear * dist_error, -max_linear_vel, max_linear_vel)
|
||||
else:
|
||||
linear_x = 0.0
|
||||
|
||||
if obstacle_ahead and linear_x > 0.0:
|
||||
linear_x = 0.0
|
||||
|
||||
angular_z = _clamp(kp_angular * bearing, -max_angular_vel, max_angular_vel)
|
||||
return linear_x, angular_z
|
||||
|
||||
|
||||
def _check_obstacle(costmap_data, width, height, resolution,
|
||||
check_dist, robot_half_width, threshold):
|
||||
"""Mirror of check_obstacle_in_costmap."""
|
||||
if resolution <= 0.0:
|
||||
return False
|
||||
cx = width // 2
|
||||
cy = height // 2
|
||||
n_fwd = int(check_dist / resolution)
|
||||
n_side = int(robot_half_width / resolution)
|
||||
for dx in range(1, n_fwd + 1):
|
||||
col = cx + dx
|
||||
for dy in range(-n_side, n_side + 1):
|
||||
row = cy + dy
|
||||
if row < 0 or row >= height or col < 0 or col >= width:
|
||||
continue
|
||||
idx = row * width + col
|
||||
if idx < len(costmap_data) and costmap_data[idx] >= threshold:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
# ── Costmap helpers ───────────────────────────────────────────────────────────
|
||||
|
||||
def _empty_costmap(width=20, height=20):
|
||||
return [0] * (width * height)
|
||||
|
||||
|
||||
def _costmap_with_obstacle(col, row, width=20, height=20, value=99):
|
||||
data = [0] * (width * height)
|
||||
data[row * width + col] = value
|
||||
return data
|
||||
|
||||
|
||||
# ── Default params ────────────────────────────────────────────────────────────
|
||||
|
||||
_DEF = dict(
|
||||
follow_distance=1.5,
|
||||
dead_zone=0.3,
|
||||
kp_linear=0.6,
|
||||
kp_angular=1.2,
|
||||
max_linear_vel=0.5,
|
||||
max_angular_vel=1.0,
|
||||
)
|
||||
|
||||
|
||||
# ── Clamp tests ───────────────────────────────────────────────────────────────
|
||||
|
||||
class TestClamp:
|
||||
def test_within_range(self):
|
||||
assert _clamp(0.5, 0.0, 1.0) == 0.5
|
||||
|
||||
def test_clamp_high(self):
|
||||
assert _clamp(2.0, 0.0, 1.0) == 1.0
|
||||
|
||||
def test_clamp_low(self):
|
||||
assert _clamp(-2.0, -1.0, 0.0) == -1.0
|
||||
|
||||
def test_at_boundary_high(self):
|
||||
assert _clamp(1.0, 0.0, 1.0) == 1.0
|
||||
|
||||
def test_at_boundary_low(self):
|
||||
assert _clamp(0.0, 0.0, 1.0) == 0.0
|
||||
|
||||
|
||||
# ── Geometry tests ────────────────────────────────────────────────────────────
|
||||
|
||||
class TestGeometry:
|
||||
def test_distance_straight_ahead(self):
|
||||
assert _compute_distance(2.0, 0.0) == pytest.approx(2.0)
|
||||
|
||||
def test_distance_diagonal(self):
|
||||
assert _compute_distance(3.0, 4.0) == pytest.approx(5.0)
|
||||
|
||||
def test_distance_behind(self):
|
||||
assert _compute_distance(-2.0, 0.0) == pytest.approx(2.0)
|
||||
|
||||
def test_bearing_straight_ahead(self):
|
||||
assert _compute_bearing(1.0, 0.0) == pytest.approx(0.0)
|
||||
|
||||
def test_bearing_to_the_left(self):
|
||||
# +y = left → bearing = +π/2
|
||||
assert _compute_bearing(0.0, 1.0) == pytest.approx(math.pi / 2)
|
||||
|
||||
def test_bearing_to_the_right(self):
|
||||
# -y = right → bearing = -π/2
|
||||
assert _compute_bearing(0.0, -1.0) == pytest.approx(-math.pi / 2)
|
||||
|
||||
def test_bearing_behind_left(self):
|
||||
# (-x, +y) → ≈ +3π/4
|
||||
assert _compute_bearing(-1.0, 1.0) == pytest.approx(3 * math.pi / 4)
|
||||
|
||||
|
||||
# ── Follow command — linear axis ──────────────────────────────────────────────
|
||||
|
||||
class TestLinearCmd:
|
||||
def test_at_exact_target_zero_linear(self):
|
||||
lin, _ = _compute_follow_cmd(1.5, 0.0, **_DEF)
|
||||
assert lin == pytest.approx(0.0)
|
||||
|
||||
def test_within_dead_zone_zero_linear(self):
|
||||
# error = 1.6 - 1.5 = 0.1 < dead_zone=0.3
|
||||
lin, _ = _compute_follow_cmd(1.6, 0.0, **_DEF)
|
||||
assert lin == pytest.approx(0.0)
|
||||
|
||||
def test_outside_dead_zone_approach(self):
|
||||
# error = 2.0 - 1.5 = 0.5 → 0.6 * 0.5 = 0.3 (below max_linear_vel=0.5)
|
||||
lin, _ = _compute_follow_cmd(2.0, 0.0, **_DEF)
|
||||
assert lin == pytest.approx(0.3)
|
||||
|
||||
def test_outside_dead_zone_retreat(self):
|
||||
# error = 1.0 - 1.5 = -0.5 → 0.6 * -0.5 = -0.3 (below max_linear_vel=0.5)
|
||||
lin, _ = _compute_follow_cmd(1.0, 0.0, **_DEF)
|
||||
assert lin == pytest.approx(-0.3)
|
||||
|
||||
def test_linear_clamped_at_max(self):
|
||||
# error = 3.5 - 1.5 = 2.0 → 0.6 * 2.0 = 1.2 → clamped to 0.5
|
||||
lin, _ = _compute_follow_cmd(3.5, 0.0, **_DEF)
|
||||
assert lin == pytest.approx(0.5)
|
||||
|
||||
def test_linear_clamped_at_min(self):
|
||||
# error = 0.0 - 1.5 = -1.5 → 0.6 * -1.5 = -0.9 → clamped to -0.5
|
||||
lin, _ = _compute_follow_cmd(0.0, 0.0, **_DEF)
|
||||
assert lin == pytest.approx(-0.5)
|
||||
|
||||
def test_dead_zone_boundary_just_outside(self):
|
||||
# error = 2.5 - 1.5 + 0.3 + 0.01 = 0.31 → should produce output
|
||||
lin, _ = _compute_follow_cmd(1.5 + 0.3 + 0.01, 0.0, **_DEF)
|
||||
assert lin > 0.0
|
||||
|
||||
def test_dead_zone_boundary_just_inside(self):
|
||||
lin, _ = _compute_follow_cmd(1.5 + 0.29, 0.0, **_DEF)
|
||||
assert lin == pytest.approx(0.0)
|
||||
|
||||
|
||||
# ── Follow command — angular axis ─────────────────────────────────────────────
|
||||
|
||||
class TestAngularCmd:
|
||||
def test_person_ahead_zero_angular(self):
|
||||
_, ang = _compute_follow_cmd(2.0, 0.0, **_DEF)
|
||||
assert ang == pytest.approx(0.0)
|
||||
|
||||
def test_angular_proportional_to_bearing(self):
|
||||
_, ang = _compute_follow_cmd(1.5, 0.5, **_DEF)
|
||||
expected = _clamp(1.2 * _compute_bearing(1.5, 0.5), -1.0, 1.0)
|
||||
assert ang == pytest.approx(expected, abs=1e-4)
|
||||
|
||||
def test_angular_clamped_at_max(self):
|
||||
# Person far left: bearing ≈ π/2 → 1.2*π/2 >> 1.0 → clamped
|
||||
_, ang = _compute_follow_cmd(2.0, 10.0, **_DEF)
|
||||
assert ang == pytest.approx(1.0)
|
||||
|
||||
def test_angular_clamped_at_min(self):
|
||||
_, ang = _compute_follow_cmd(2.0, -10.0, **_DEF)
|
||||
assert ang == pytest.approx(-1.0)
|
||||
|
||||
def test_angular_corrects_within_dead_zone(self):
|
||||
# Even at target distance (linear=0), bearing should still be corrected
|
||||
lin, ang = _compute_follow_cmd(1.5, 0.5, **_DEF)
|
||||
assert lin == pytest.approx(0.0) # dead zone
|
||||
assert ang != pytest.approx(0.0) # bearing correction active
|
||||
|
||||
def test_angular_corrects_when_retreating(self):
|
||||
# Robot too close and person is off to the side — should turn AND reverse
|
||||
lin, ang = _compute_follow_cmd(0.5, 0.5, **_DEF)
|
||||
assert lin < 0.0 # retreating
|
||||
assert ang != pytest.approx(0.0)
|
||||
|
||||
|
||||
# ── Obstacle override tests ───────────────────────────────────────────────────
|
||||
|
||||
class TestObstacleOverride:
|
||||
def test_obstacle_suppresses_forward_motion(self):
|
||||
lin, _ = _compute_follow_cmd(2.5, 0.0, obstacle_ahead=True, **_DEF)
|
||||
assert lin == pytest.approx(0.0)
|
||||
|
||||
def test_obstacle_does_not_suppress_reverse(self):
|
||||
# Too close (1.0m) → robot reverses; obstacle should NOT suppress reverse
|
||||
# error = 1.0 - 1.5 = -0.5 → kp * error = -0.3 (negative = reverse)
|
||||
lin, _ = _compute_follow_cmd(1.0, 0.0, obstacle_ahead=True, **_DEF)
|
||||
assert lin == pytest.approx(-0.3)
|
||||
|
||||
def test_obstacle_allows_turning(self):
|
||||
lin, ang = _compute_follow_cmd(2.5, 2.0, obstacle_ahead=True, **_DEF)
|
||||
assert lin == pytest.approx(0.0) # suppressed
|
||||
assert ang != pytest.approx(0.0) # turning still active
|
||||
|
||||
def test_no_obstacle_normal_forward(self):
|
||||
# error = 2.0 - 1.5 = 0.5 → 0.6 * 0.5 = 0.3
|
||||
lin, _ = _compute_follow_cmd(2.0, 0.0, obstacle_ahead=False, **_DEF)
|
||||
assert lin == pytest.approx(0.3)
|
||||
|
||||
|
||||
# ── Costmap obstacle check tests ──────────────────────────────────────────────
|
||||
|
||||
class TestCostmapCheck:
|
||||
_RES = 0.05 # 5 cm/cell
|
||||
_W = 20
|
||||
_H = 20
|
||||
|
||||
def _check(self, data, *, check_dist=0.5, half_width=0.15, threshold=70):
|
||||
return _check_obstacle(
|
||||
costmap_data=data,
|
||||
width=self._W,
|
||||
height=self._H,
|
||||
resolution=self._RES,
|
||||
check_dist=check_dist,
|
||||
robot_half_width=half_width,
|
||||
threshold=threshold,
|
||||
)
|
||||
|
||||
def test_empty_costmap_no_obstacle(self):
|
||||
assert self._check(_empty_costmap(self._W, self._H)) is False
|
||||
|
||||
def test_obstacle_one_cell_ahead_detected(self):
|
||||
# Robot centre col=10; dx=1 → col=11, row=10 (centreline)
|
||||
data = _costmap_with_obstacle(11, 10, self._W, self._H, value=99)
|
||||
assert self._check(data) is True
|
||||
|
||||
def test_obstacle_at_check_dist_boundary_detected(self):
|
||||
# check_dist=0.5 / 0.05 = 10 cells → dx=1..10 → cols 11..20
|
||||
# Width=20 means valid cols 0..19; col 20 is out of bounds.
|
||||
# Last detectable col = cx + 9 = 10 + 9 = 19 (dx=9 within range(1,11))
|
||||
data = _costmap_with_obstacle(19, 10, self._W, self._H, value=99)
|
||||
assert self._check(data) is True
|
||||
|
||||
def test_obstacle_behind_robot_not_detected(self):
|
||||
data = _costmap_with_obstacle(5, 10, self._W, self._H, value=99)
|
||||
assert self._check(data) is False
|
||||
|
||||
def test_obstacle_within_corridor_side_detected(self):
|
||||
# half_width=0.15/0.05=3 cells; row=10+2=12 is inside corridor
|
||||
data = _costmap_with_obstacle(12, 12, self._W, self._H, value=99)
|
||||
assert self._check(data) is True
|
||||
|
||||
def test_obstacle_outside_corridor_not_detected(self):
|
||||
# row=10+4=14 is outside 3-cell corridor
|
||||
data = _costmap_with_obstacle(12, 14, self._W, self._H, value=99)
|
||||
assert self._check(data) is False
|
||||
|
||||
def test_value_below_threshold_not_detected(self):
|
||||
data = _costmap_with_obstacle(11, 10, self._W, self._H, value=50)
|
||||
assert self._check(data) is False
|
||||
|
||||
def test_value_at_threshold_detected(self):
|
||||
data = _costmap_with_obstacle(11, 10, self._W, self._H, value=70)
|
||||
assert self._check(data) is True
|
||||
|
||||
def test_zero_resolution_safe_returns_false(self):
|
||||
assert _check_obstacle(
|
||||
_empty_costmap(), width=20, height=20, resolution=0.0,
|
||||
check_dist=0.5, robot_half_width=0.15, threshold=70) is False
|
||||
|
||||
def test_lethal_threshold_99_detected(self):
|
||||
data = _costmap_with_obstacle(11, 10, self._W, self._H, value=100)
|
||||
assert self._check(data, threshold=99) is True
|
||||
|
||||
|
||||
# ── State machine tests ───────────────────────────────────────────────────────
|
||||
|
||||
class TestStateMachine:
|
||||
_FOLLOWING = "following"
|
||||
_STOPPING = "stopping"
|
||||
_SEARCHING = "searching"
|
||||
|
||||
def _transition(self, state, dt_lost, lost_timeout=2.0, search_timeout=5.0):
|
||||
"""Mirror of _control_cb state transition block."""
|
||||
if state == self._FOLLOWING:
|
||||
if dt_lost > search_timeout:
|
||||
return self._SEARCHING
|
||||
elif dt_lost > lost_timeout:
|
||||
return self._STOPPING
|
||||
return state
|
||||
|
||||
def test_following_stays_within_lost_timeout(self):
|
||||
assert self._transition(self._FOLLOWING, dt_lost=1.0) == self._FOLLOWING
|
||||
|
||||
def test_following_to_stopping_past_lost_timeout(self):
|
||||
assert self._transition(self._FOLLOWING, dt_lost=2.5) == self._STOPPING
|
||||
|
||||
def test_following_to_searching_past_search_timeout(self):
|
||||
assert self._transition(self._FOLLOWING, dt_lost=6.0) == self._SEARCHING
|
||||
|
||||
def test_stopping_does_not_transition(self):
|
||||
# Only _target_cb can move out of STOPPING
|
||||
assert self._transition(self._STOPPING, dt_lost=3.0) == self._STOPPING
|
||||
|
||||
def test_searching_does_not_transition(self):
|
||||
assert self._transition(self._SEARCHING, dt_lost=10.0) == self._SEARCHING
|
||||
|
||||
def test_boundary_exactly_at_lost_timeout(self):
|
||||
# dt_lost == 2.0 is NOT > 2.0 — still FOLLOWING
|
||||
assert self._transition(self._FOLLOWING, dt_lost=2.0) == self._FOLLOWING
|
||||
|
||||
def test_boundary_exactly_at_search_timeout(self):
|
||||
# dt_lost=5.0: NOT > search_timeout=5.0, but IS > lost_timeout=2.0 → STOPPING
|
||||
assert self._transition(self._FOLLOWING, dt_lost=5.0) == self._STOPPING
|
||||
|
||||
def test_before_first_detection_initial_state_is_stopping(self):
|
||||
# Initial state is STOPPING (not FOLLOWING) so no transition occurs
|
||||
assert self._transition(self._STOPPING, dt_lost=1e9) == self._STOPPING
|
||||
|
||||
def test_new_target_resets_to_following(self):
|
||||
# _target_cb sets state = FOLLOWING unconditionally
|
||||
state = self._SEARCHING
|
||||
# simulate _target_cb
|
||||
state = self._FOLLOWING
|
||||
assert state == self._FOLLOWING
|
||||
|
||||
|
||||
# ── Search and stop behaviour ─────────────────────────────────────────────────
|
||||
|
||||
class TestSearchBehaviour:
|
||||
def test_searching_publishes_angular_only(self):
|
||||
state = "searching"
|
||||
search_angular_vel = 0.3
|
||||
linear_x = 0.0 if state == "searching" else 99.9
|
||||
angular_z = search_angular_vel if state == "searching" else 0.0
|
||||
assert linear_x == pytest.approx(0.0)
|
||||
assert angular_z == pytest.approx(0.3)
|
||||
|
||||
def test_stopping_publishes_zero(self):
|
||||
state = "stopping"
|
||||
linear_x = 0.0
|
||||
angular_z = 0.0
|
||||
assert linear_x == pytest.approx(0.0)
|
||||
assert angular_z == pytest.approx(0.0)
|
||||
|
||||
|
||||
# ── Follow-enabled gate ───────────────────────────────────────────────────────
|
||||
|
||||
class TestFollowEnabled:
|
||||
def test_disabled_zeroes_output(self):
|
||||
follow_enabled = False
|
||||
lin, ang = (0.0, 0.0) if not follow_enabled else \
|
||||
_compute_follow_cmd(3.0, 0.0, **_DEF)
|
||||
assert lin == pytest.approx(0.0)
|
||||
assert ang == pytest.approx(0.0)
|
||||
|
||||
def test_enabled_passes_through(self):
|
||||
follow_enabled = True
|
||||
lin, ang = (0.0, 0.0) if not follow_enabled else \
|
||||
_compute_follow_cmd(3.0, 0.0, **_DEF)
|
||||
assert lin != pytest.approx(0.0)
|
||||
Loading…
x
Reference in New Issue
Block a user