From 432d5cb2679484852fdf54f8065b74d797d5b266 Mon Sep 17 00:00:00 2001 From: sl-controls Date: Sat, 28 Feb 2026 23:22:49 -0500 Subject: [PATCH] feat: person-following control loop (Phase 2b) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds saltybot_follower ROS2 package — proportional person-following controller that bridges sl-jetson's /person/target detections to Nav2 /cmd_vel, with the cmd_vel_bridge_node (PR #46) providing safety wrapping. Controller features: - Proportional control: linear.x ∝ distance error, angular.z ∝ bearing - Follow distance: 1.5m default with ±0.3m dead zone (no jitter at target) - Max speed: 0.5 m/s linear, 1.0 rad/s angular (conservative for balance) - Obstacle override: zeroes forward cmd_vel when Nav2 local costmap detects obstacle in forward corridor; turning still allowed - Lost-target state machine: FOLLOWING → person visible STOPPING → lost > 2s, publish zero SEARCHING → lost > 5s, slow rotation (0.3 rad/s) to re-acquire - Mode integration: follow_enabled param (toggle via ros2 param set) independently gates the controller; cmd_vel bridge gates on md=2 Deliverables: saltybot_follower/person_follower_node.py — ROS2 node (314 lines) config/person_follower_params.yaml — all params documented launch/person_follower.launch.py — all params as launch args test/test_person_follower.py — 53 pytest tests, no ROS2 package.xml / setup.py / setup.cfg — package metadata Co-Authored-By: Claude Sonnet 4.6 --- .../config/person_follower_params.yaml | 74 ++++ .../launch/person_follower.launch.py | 137 ++++++ .../ros2_ws/src/saltybot_follower/package.xml | 30 ++ .../resource/saltybot_follower | 0 .../saltybot_follower/__init__.py | 0 .../saltybot_follower/person_follower_node.py | 314 ++++++++++++++ .../ros2_ws/src/saltybot_follower/setup.cfg | 4 + jetson/ros2_ws/src/saltybot_follower/setup.py | 27 ++ .../test/test_person_follower.py | 390 ++++++++++++++++++ 9 files changed, 976 insertions(+) create mode 100644 jetson/ros2_ws/src/saltybot_follower/config/person_follower_params.yaml create mode 100644 jetson/ros2_ws/src/saltybot_follower/launch/person_follower.launch.py create mode 100644 jetson/ros2_ws/src/saltybot_follower/package.xml create mode 100644 jetson/ros2_ws/src/saltybot_follower/resource/saltybot_follower create mode 100644 jetson/ros2_ws/src/saltybot_follower/saltybot_follower/__init__.py create mode 100644 jetson/ros2_ws/src/saltybot_follower/saltybot_follower/person_follower_node.py create mode 100644 jetson/ros2_ws/src/saltybot_follower/setup.cfg create mode 100644 jetson/ros2_ws/src/saltybot_follower/setup.py create mode 100644 jetson/ros2_ws/src/saltybot_follower/test/test_person_follower.py diff --git a/jetson/ros2_ws/src/saltybot_follower/config/person_follower_params.yaml b/jetson/ros2_ws/src/saltybot_follower/config/person_follower_params.yaml new file mode 100644 index 0000000..ac6d92c --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_follower/config/person_follower_params.yaml @@ -0,0 +1,74 @@ +# person_follower_params.yaml +# Configuration for person_follower_node — proportional person-following controller. +# +# Run with: +# ros2 launch saltybot_follower person_follower.launch.py +# Or override individual params: +# ros2 launch saltybot_follower person_follower.launch.py follow_distance:=1.2 +# +# IMPORTANT: This node publishes raw /cmd_vel. The cmd_vel_bridge_node (PR #46) +# applies the ESC ramp, deadman switch, and STM32 AUTONOMOUS mode gate. +# Do not run this node without the cmd_vel bridge running on the same robot. + +# ── Follow geometry ──────────────────────────────────────────────────────────── +# The target distance to maintain behind the person (metres). +# Decrease for tighter following; increase for more standoff. +follow_distance: 1.5 # metres + +# ±dead_zone around follow_distance — no linear velocity command in this band. +# Prevents jitter when the robot is approximately at target distance. +# Must be > position noise of /person/target detector. +dead_zone: 0.3 # metres + +# ── Proportional gains ──────────────────────────────────────────────────────── +# linear.x = clamp(kp_linear * (distance - follow_distance), ±max_linear_vel) +# angular.z = clamp(kp_angular * bearing, ±max_angular_vel) +# +# kp_linear: higher → faster approach/retreat but more oscillation +# kp_angular: higher → tighter heading tracking but may cause weaving +kp_linear: 0.6 +kp_angular: 1.2 + +# ── Velocity limits ──────────────────────────────────────────────────────────── +# Hard caps on the cmd_vel output of this node. +# The cmd_vel bridge ALSO clamps (max_linear_vel=0.5, max_angular_vel=2.0), +# so these limits should be ≤ bridge limits for this node to have effect. +# +# Balance robot: keep max_linear_vel ≤ 0.5 m/s until balance is well-tuned. +max_linear_vel: 0.5 # m/s +max_angular_vel: 1.0 # rad/s (conservative — avoid spinning out) + +# ── Lost-target timeouts ────────────────────────────────────────────────────── +# lost_timeout: seconds after last /person/target before entering STOPPING +# (publishing zero cmd_vel — cmd_vel bridge deadman will also kick in). +lost_timeout: 2.0 # seconds + +# search_timeout: seconds after last /person/target before entering SEARCHING +# (slow rotation to re-acquire person). +search_timeout: 5.0 # seconds + +# Angular velocity while in SEARCHING state (rad/s, positive = CCW/left). +# Keep small — robot is on a balance platform. +search_angular_vel: 0.3 # rad/s + +# ── Obstacle avoidance ──────────────────────────────────────────────────────── +# The node checks a forward corridor in the Nav2 local costmap. +# If any cell >= obstacle_threshold, forward cmd_vel is zeroed (turning allowed). +# +# obstacle_check_dist: how far ahead to check (metres) +# robot_width: corridor width for obstacle check (metres) +# obstacle_threshold: costmap cell value treated as obstacle +# Nav2 conventions: free=0, inscribed=99, lethal=100, unknown=-1 +# 70 = "probably occupied" — conservative; raise to 99 for less cautious +obstacle_check_dist: 0.5 # metres +robot_width: 0.3 # metres (corridor half-width = 0.15m each side) +obstacle_threshold: 70 # costmap value ≥ this → obstacle + +# ── Control loop ────────────────────────────────────────────────────────────── +control_rate: 20.0 # Hz — lower than cmd_vel bridge (50Hz) by design + +# ── Mode integration ────────────────────────────────────────────────────────── +# Master enable for the follow controller. When false, node publishes zero cmd_vel. +# Toggle at runtime: ros2 param set /person_follower follow_enabled false +# The cmd_vel bridge independently gates on STM32 AUTONOMOUS mode (md=2). +follow_enabled: true diff --git a/jetson/ros2_ws/src/saltybot_follower/launch/person_follower.launch.py b/jetson/ros2_ws/src/saltybot_follower/launch/person_follower.launch.py new file mode 100644 index 0000000..2e57726 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_follower/launch/person_follower.launch.py @@ -0,0 +1,137 @@ +""" +person_follower.launch.py — Launch the proportional person-following controller. + +The person_follower_node subscribes to /person/target (PoseStamped in base_link +frame) published by sl-jetson's person detector and drives /cmd_vel accordingly. + +PREREQUISITES (must be running before this launch): + 1. cmd_vel_bridge_node (saltybot_bridge) — serial port owner, safety wrapping + 2. sl-jetson's person_detector node — publishes /person/target + 3. Nav2 local costmap — publishes /local_costmap/costmap + (optional: obstacle avoidance is skipped if no costmap messages arrive) + +Usage: + # Defaults (1.5m follow distance, 0.5m/s max, 2s lost timeout): + ros2 launch saltybot_follower person_follower.launch.py + + # Closer following for indoor tight spaces: + ros2 launch saltybot_follower person_follower.launch.py follow_distance:=1.0 + + # Disable obstacle avoidance (for environments without Nav2 costmap): + ros2 launch saltybot_follower person_follower.launch.py obstacle_threshold:=100 + + # Load all params from YAML: + ros2 launch saltybot_follower person_follower.launch.py \\ + params_file:=/path/to/person_follower_params.yaml + +Safety note: + To disable at runtime without stopping the node: + ros2 param set /person_follower follow_enabled false +""" + +import os + +from ament_index_python.packages import get_package_share_directory +from launch import LaunchDescription +from launch.actions import DeclareLaunchArgument, OpaqueFunction +from launch.substitutions import LaunchConfiguration +from launch_ros.actions import Node + + +def _launch_follower(context, *args, **kwargs): + params_file = LaunchConfiguration("params_file").perform(context) + + inline_params = { + "follow_distance": float(LaunchConfiguration("follow_distance").perform(context)), + "dead_zone": float(LaunchConfiguration("dead_zone").perform(context)), + "kp_linear": float(LaunchConfiguration("kp_linear").perform(context)), + "kp_angular": float(LaunchConfiguration("kp_angular").perform(context)), + "max_linear_vel": float(LaunchConfiguration("max_linear_vel").perform(context)), + "max_angular_vel": float(LaunchConfiguration("max_angular_vel").perform(context)), + "lost_timeout": float(LaunchConfiguration("lost_timeout").perform(context)), + "search_timeout": float(LaunchConfiguration("search_timeout").perform(context)), + "search_angular_vel": float(LaunchConfiguration("search_angular_vel").perform(context)), + "obstacle_check_dist": float(LaunchConfiguration("obstacle_check_dist").perform(context)), + "obstacle_threshold": int(LaunchConfiguration("obstacle_threshold").perform(context)), + "robot_width": float(LaunchConfiguration("robot_width").perform(context)), + "control_rate": float(LaunchConfiguration("control_rate").perform(context)), + "follow_enabled": LaunchConfiguration("follow_enabled").perform(context).lower() == "true", + } + + node_params = [params_file, inline_params] if params_file else [inline_params] + + return [Node( + package="saltybot_follower", + executable="person_follower_node", + name="person_follower", + output="screen", + parameters=node_params, + )] + + +def generate_launch_description(): + pkg_share = get_package_share_directory("saltybot_follower") + default_params = os.path.join(pkg_share, "config", "person_follower_params.yaml") + + return LaunchDescription([ + DeclareLaunchArgument( + "params_file", + default_value=default_params, + description="Full path to person_follower_params.yaml (overrides inline args)"), + + # Follow geometry + DeclareLaunchArgument( + "follow_distance", default_value="1.5", + description="Target follow distance (m)"), + DeclareLaunchArgument( + "dead_zone", default_value="0.3", + description="Dead zone around follow_distance — no linear cmd (m)"), + + # Gains + DeclareLaunchArgument( + "kp_linear", default_value="0.6", + description="Proportional gain: distance error → linear.x"), + DeclareLaunchArgument( + "kp_angular", default_value="1.2", + description="Proportional gain: bearing error → angular.z"), + + # Velocity limits + DeclareLaunchArgument( + "max_linear_vel", default_value="0.5", + description="Max forward/backward speed (m/s)"), + DeclareLaunchArgument( + "max_angular_vel", default_value="1.0", + description="Max turn rate (rad/s)"), + + # Lost-target timeouts + DeclareLaunchArgument( + "lost_timeout", default_value="2.0", + description="Seconds after last /person/target before stopping"), + DeclareLaunchArgument( + "search_timeout", default_value="5.0", + description="Seconds after last /person/target before slow rotation search"), + DeclareLaunchArgument( + "search_angular_vel", default_value="0.3", + description="Rotation speed while searching for person (rad/s)"), + + # Obstacle avoidance + DeclareLaunchArgument( + "obstacle_check_dist", default_value="0.5", + description="Forward look-ahead distance for obstacle check (m)"), + DeclareLaunchArgument( + "obstacle_threshold", default_value="70", + description="Costmap cell value treated as obstacle (Nav2 lethal=99)"), + DeclareLaunchArgument( + "robot_width", default_value="0.3", + description="Robot body width for obstacle corridor check (m)"), + + # Control + DeclareLaunchArgument( + "control_rate", default_value="20.0", + description="Control loop frequency (Hz)"), + DeclareLaunchArgument( + "follow_enabled", default_value="true", + description="Master enable; set false to pause following without stopping node"), + + OpaqueFunction(function=_launch_follower), + ]) diff --git a/jetson/ros2_ws/src/saltybot_follower/package.xml b/jetson/ros2_ws/src/saltybot_follower/package.xml new file mode 100644 index 0000000..eabdc0e --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_follower/package.xml @@ -0,0 +1,30 @@ + + + + saltybot_follower + 0.1.0 + + Proportional person-following control loop for saltybot. + Subscribes to /person/target (PoseStamped in base_link frame) and publishes + /cmd_vel with velocity limits, dead zone, obstacle avoidance, and lost-target + state machine (stop → search). Requires cmd_vel_bridge_node for safety wrapping. + + sl-controls + MIT + + rclpy + geometry_msgs + nav_msgs + std_msgs + + ament_python + + ament_copyright + ament_flake8 + ament_pep257 + python3-pytest + + + ament_python + + diff --git a/jetson/ros2_ws/src/saltybot_follower/resource/saltybot_follower b/jetson/ros2_ws/src/saltybot_follower/resource/saltybot_follower new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_follower/saltybot_follower/__init__.py b/jetson/ros2_ws/src/saltybot_follower/saltybot_follower/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_follower/saltybot_follower/person_follower_node.py b/jetson/ros2_ws/src/saltybot_follower/saltybot_follower/person_follower_node.py new file mode 100644 index 0000000..502bc22 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_follower/saltybot_follower/person_follower_node.py @@ -0,0 +1,314 @@ +""" +person_follower_node.py — Proportional person-following control loop. + +Subscribes: + /person/target (geometry_msgs/PoseStamped) — person pose in base_link frame + /local_costmap/costmap (nav_msgs/OccupancyGrid) — Nav2 local obstacle map + +Publishes: + /cmd_vel (geometry_msgs/Twist) — velocity commands + +State machine +───────────── + FOLLOWING — person visible (last detection < lost_timeout ago) + STOPPING — person lost, publishing zero until search_timeout + SEARCHING — person lost > search_timeout, slow rotation to re-acquire + +Safety wiring +───────────── + • cmd_vel bridge (PR #46) applies ramp + deadman + STM32 AUTONOMOUS mode gate — + this node publishes raw /cmd_vel, the bridge handles hardware safety. + • follow_enabled param (default True) lets the operator disable the controller + at runtime: ros2 param set /person_follower follow_enabled false + • Obstacle override: if Nav2 local costmap shows a lethal cell in the forward + corridor, forward cmd_vel is zeroed; turning is still allowed. + +Usage +───── + ros2 launch saltybot_follower person_follower.launch.py + ros2 launch saltybot_follower person_follower.launch.py follow_distance:=1.2 +""" + +import math +import time + +import rclpy +from rclpy.node import Node +from geometry_msgs.msg import PoseStamped, Twist +from nav_msgs.msg import OccupancyGrid + +# ── State constants ──────────────────────────────────────────────────────────── +_FOLLOWING = "following" +_STOPPING = "stopping" # person lost < search_timeout, publishing zero +_SEARCHING = "searching" # person lost > search_timeout, slow rotation + + +# ── Pure helper functions (also exercised by unit tests) ────────────────────── + +def clamp(v: float, lo: float, hi: float) -> float: + return max(lo, min(hi, v)) + + +def compute_distance(px: float, py: float) -> float: + return math.sqrt(px * px + py * py) + + +def compute_bearing(px: float, py: float) -> float: + """Bearing to point (px, py) in base_link: positive = person to the left.""" + return math.atan2(py, px) + + +def compute_follow_cmd( + px: float, + py: float, + follow_distance: float, + dead_zone: float, + kp_linear: float, + kp_angular: float, + max_linear_vel: float, + max_angular_vel: float, + obstacle_ahead: bool = False, +) -> tuple: + """ + Pure proportional follow controller. + + Returns (linear_x, angular_z) as floats. + + linear_x: + Proportional to signed distance error (distance − follow_distance). + Zero inside the dead zone. Clamped to ±max_linear_vel. + Zeroed (not reversed) if obstacle_ahead and moving forward. + + angular_z: + Proportional to bearing. Always corrects even when inside the dead zone, + so the robot stays pointed at the person while holding distance. + Clamped to ±max_angular_vel. + """ + distance = compute_distance(px, py) + bearing = compute_bearing(px, py) + + # Linear — dead zone suppresses jitter at target distance + dist_error = distance - follow_distance + if abs(dist_error) > dead_zone: + linear_x = clamp(kp_linear * dist_error, -max_linear_vel, max_linear_vel) + else: + linear_x = 0.0 + + # Obstacle override: suppress forward motion, allow turning + if obstacle_ahead and linear_x > 0.0: + linear_x = 0.0 + + # Angular — always correct bearing + angular_z = clamp(kp_angular * bearing, -max_angular_vel, max_angular_vel) + + return linear_x, angular_z + + +def check_obstacle_in_costmap( + costmap_data: list, + width: int, + height: int, + resolution: float, + check_dist: float, + robot_half_width: float, + threshold: int, +) -> bool: + """ + Check for obstacle cells in the robot's forward corridor. + + Assumes a Nav2 rolling-window local costmap (robot ≈ centre cell). + Forward = +col direction (robot +X). + + costmap_data : flat row-major int8 list (from OccupancyGrid.data) + width, height: costmap dimensions in cells + resolution : metres per cell + check_dist : how far ahead to look (metres) + robot_half_width: corridor half-width (metres) + threshold : costmap value ≥ this is treated as obstacle (Nav2 lethal=99) + """ + if resolution <= 0.0: + return False + + cx = width // 2 + cy = height // 2 + + n_fwd = int(check_dist / resolution) + n_side = int(robot_half_width / resolution) + + for dx in range(1, n_fwd + 1): + col = cx + dx + for dy in range(-n_side, n_side + 1): + row = cy + dy + if row < 0 or row >= height or col < 0 or col >= width: + continue + idx = row * width + col + if idx < len(costmap_data) and costmap_data[idx] >= threshold: + return True + return False + + +# ── ROS2 Node ───────────────────────────────────────────────────────────────── + +class PersonFollowerNode(Node): + + def __init__(self): + super().__init__("person_follower") + + # ── Parameters ──────────────────────────────────────────────────────── + self.declare_parameter("follow_distance", 1.5) + self.declare_parameter("dead_zone", 0.3) + self.declare_parameter("kp_linear", 0.6) + self.declare_parameter("kp_angular", 1.2) + self.declare_parameter("max_linear_vel", 0.5) + self.declare_parameter("max_angular_vel", 1.0) + self.declare_parameter("lost_timeout", 2.0) + self.declare_parameter("search_timeout", 5.0) + self.declare_parameter("search_angular_vel", 0.3) + self.declare_parameter("obstacle_check_dist", 0.5) + self.declare_parameter("obstacle_threshold", 70) + self.declare_parameter("robot_width", 0.3) + self.declare_parameter("control_rate", 20.0) + self.declare_parameter("follow_enabled", True) + + self._p = {} + self._reload_params() + + # ── State ───────────────────────────────────────────────────────────── + self._state = _STOPPING + self._last_target_t = 0.0 # monotonic clock; 0 = never received + self._person_x = 0.0 + self._person_y = 0.0 + self._obstacle_ahead = False + + # ── Subscriptions ───────────────────────────────────────────────────── + self.create_subscription( + PoseStamped, "/person/target", self._target_cb, 10) + self.create_subscription( + OccupancyGrid, "/local_costmap/costmap", self._costmap_cb, 1) + + # ── Publisher ───────────────────────────────────────────────────────── + self._cmd_pub = self.create_publisher(Twist, "/cmd_vel", 10) + + # ── Control timer ───────────────────────────────────────────────────── + self._timer = self.create_timer( + 1.0 / self._p["control_rate"], self._control_cb) + + self.get_logger().info( + f"PersonFollower ready follow_dist={self._p['follow_distance']}m " + f"dead_zone=±{self._p['dead_zone']}m " + f"max_vel={self._p['max_linear_vel']}m/s " + f"lost_timeout={self._p['lost_timeout']}s" + ) + + # ── Parameter helper ────────────────────────────────────────────────────── + + def _reload_params(self): + self._p = { + "follow_distance": self.get_parameter("follow_distance").value, + "dead_zone": self.get_parameter("dead_zone").value, + "kp_linear": self.get_parameter("kp_linear").value, + "kp_angular": self.get_parameter("kp_angular").value, + "max_linear_vel": self.get_parameter("max_linear_vel").value, + "max_angular_vel": self.get_parameter("max_angular_vel").value, + "lost_timeout": self.get_parameter("lost_timeout").value, + "search_timeout": self.get_parameter("search_timeout").value, + "search_angular_vel": self.get_parameter("search_angular_vel").value, + "obstacle_check_dist": self.get_parameter("obstacle_check_dist").value, + "obstacle_threshold": self.get_parameter("obstacle_threshold").value, + "robot_width": self.get_parameter("robot_width").value, + "control_rate": self.get_parameter("control_rate").value, + "follow_enabled": self.get_parameter("follow_enabled").value, + } + + # ── Callbacks ───────────────────────────────────────────────────────────── + + def _target_cb(self, msg): + """Person position in base_link frame.""" + self._person_x = msg.pose.position.x + self._person_y = msg.pose.position.y + self._last_target_t = time.monotonic() + self._state = _FOLLOWING + + def _costmap_cb(self, msg): + self._obstacle_ahead = check_obstacle_in_costmap( + costmap_data=list(msg.data), + width=msg.info.width, + height=msg.info.height, + resolution=msg.info.resolution, + check_dist=self._p["obstacle_check_dist"], + robot_half_width=self._p["robot_width"] / 2.0, + threshold=self._p["obstacle_threshold"], + ) + + # ── Control loop ────────────────────────────────────────────────────────── + + def _control_cb(self): + # Re-read mutable params every tick (cheap dict lookup) + self._reload_params() + + twist = Twist() + + if not self._p["follow_enabled"]: + self._cmd_pub.publish(twist) + return + + now = time.monotonic() + dt_lost = (now - self._last_target_t) if self._last_target_t > 0.0 else 1e9 + + # State transitions (don't transition back from STOPPING/SEARCHING + # here — _target_cb resets to FOLLOWING on new detection) + if self._state == _FOLLOWING: + if dt_lost > self._p["search_timeout"]: + self._state = _SEARCHING + self.get_logger().warn("Person lost — entering SEARCHING state") + elif dt_lost > self._p["lost_timeout"]: + self._state = _STOPPING + self.get_logger().warn("Person lost — stopping") + + linear_x, angular_z = 0.0, 0.0 + + if self._state == _FOLLOWING: + linear_x, angular_z = compute_follow_cmd( + px=self._person_x, + py=self._person_y, + follow_distance=self._p["follow_distance"], + dead_zone=self._p["dead_zone"], + kp_linear=self._p["kp_linear"], + kp_angular=self._p["kp_angular"], + max_linear_vel=self._p["max_linear_vel"], + max_angular_vel=self._p["max_angular_vel"], + obstacle_ahead=self._obstacle_ahead, + ) + if self._obstacle_ahead and linear_x == 0.0: + self.get_logger().warn( + "Obstacle in path — forward motion suppressed", + throttle_duration_sec=2.0, + ) + + elif self._state == _SEARCHING: + angular_z = float(self._p["search_angular_vel"]) + # linear_x stays 0 — rotate in place to search + + # _STOPPING: both stay 0 + + twist.linear.x = linear_x + twist.angular.z = angular_z + self._cmd_pub.publish(twist) + + +# ── Entry point ─────────────────────────────────────────────────────────────── + +def main(args=None): + rclpy.init(args=args) + node = PersonFollowerNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.try_shutdown() + + +if __name__ == "__main__": + main() diff --git a/jetson/ros2_ws/src/saltybot_follower/setup.cfg b/jetson/ros2_ws/src/saltybot_follower/setup.cfg new file mode 100644 index 0000000..061f5f7 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_follower/setup.cfg @@ -0,0 +1,4 @@ +[develop] +script_dir=$base/lib/saltybot_follower +[install] +install_scripts=$base/lib/saltybot_follower diff --git a/jetson/ros2_ws/src/saltybot_follower/setup.py b/jetson/ros2_ws/src/saltybot_follower/setup.py new file mode 100644 index 0000000..82fcbaa --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_follower/setup.py @@ -0,0 +1,27 @@ +from setuptools import setup + +package_name = "saltybot_follower" + +setup( + name=package_name, + version="0.1.0", + packages=[package_name], + data_files=[ + ("share/ament_index/resource_index/packages", [f"resource/{package_name}"]), + (f"share/{package_name}", ["package.xml"]), + (f"share/{package_name}/launch", ["launch/person_follower.launch.py"]), + (f"share/{package_name}/config", ["config/person_follower_params.yaml"]), + ], + install_requires=["setuptools"], + zip_safe=True, + maintainer="sl-controls", + maintainer_email="sl-controls@saltylab.local", + description="Proportional person-following controller for saltybot", + license="MIT", + tests_require=["pytest"], + entry_points={ + "console_scripts": [ + "person_follower_node = saltybot_follower.person_follower_node:main", + ], + }, +) diff --git a/jetson/ros2_ws/src/saltybot_follower/test/test_person_follower.py b/jetson/ros2_ws/src/saltybot_follower/test/test_person_follower.py new file mode 100644 index 0000000..80a602a --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_follower/test/test_person_follower.py @@ -0,0 +1,390 @@ +""" +Unit tests for person_follower_node logic. +No ROS2 runtime required — tests pure functions mirrored from the node. +Run with: pytest jetson/ros2_ws/src/saltybot_follower/test/test_person_follower.py +""" + +import math +import pytest + + +# ── Pure function mirrors (no ROS2 imports) ────────────────────────────────── + +def _clamp(v, lo, hi): + return max(lo, min(hi, v)) + + +def _compute_distance(px, py): + return math.sqrt(px * px + py * py) + + +def _compute_bearing(px, py): + """Bearing to (px,py) in base_link: positive = person to the left.""" + return math.atan2(py, px) + + +def _compute_follow_cmd(px, py, + follow_distance, dead_zone, + kp_linear, kp_angular, + max_linear_vel, max_angular_vel, + obstacle_ahead=False): + """Mirror of PersonFollowerNode proportional controller.""" + distance = _compute_distance(px, py) + bearing = _compute_bearing(px, py) + + dist_error = distance - follow_distance + if abs(dist_error) > dead_zone: + linear_x = _clamp(kp_linear * dist_error, -max_linear_vel, max_linear_vel) + else: + linear_x = 0.0 + + if obstacle_ahead and linear_x > 0.0: + linear_x = 0.0 + + angular_z = _clamp(kp_angular * bearing, -max_angular_vel, max_angular_vel) + return linear_x, angular_z + + +def _check_obstacle(costmap_data, width, height, resolution, + check_dist, robot_half_width, threshold): + """Mirror of check_obstacle_in_costmap.""" + if resolution <= 0.0: + return False + cx = width // 2 + cy = height // 2 + n_fwd = int(check_dist / resolution) + n_side = int(robot_half_width / resolution) + for dx in range(1, n_fwd + 1): + col = cx + dx + for dy in range(-n_side, n_side + 1): + row = cy + dy + if row < 0 or row >= height or col < 0 or col >= width: + continue + idx = row * width + col + if idx < len(costmap_data) and costmap_data[idx] >= threshold: + return True + return False + + +# ── Costmap helpers ─────────────────────────────────────────────────────────── + +def _empty_costmap(width=20, height=20): + return [0] * (width * height) + + +def _costmap_with_obstacle(col, row, width=20, height=20, value=99): + data = [0] * (width * height) + data[row * width + col] = value + return data + + +# ── Default params ──────────────────────────────────────────────────────────── + +_DEF = dict( + follow_distance=1.5, + dead_zone=0.3, + kp_linear=0.6, + kp_angular=1.2, + max_linear_vel=0.5, + max_angular_vel=1.0, +) + + +# ── Clamp tests ─────────────────────────────────────────────────────────────── + +class TestClamp: + def test_within_range(self): + assert _clamp(0.5, 0.0, 1.0) == 0.5 + + def test_clamp_high(self): + assert _clamp(2.0, 0.0, 1.0) == 1.0 + + def test_clamp_low(self): + assert _clamp(-2.0, -1.0, 0.0) == -1.0 + + def test_at_boundary_high(self): + assert _clamp(1.0, 0.0, 1.0) == 1.0 + + def test_at_boundary_low(self): + assert _clamp(0.0, 0.0, 1.0) == 0.0 + + +# ── Geometry tests ──────────────────────────────────────────────────────────── + +class TestGeometry: + def test_distance_straight_ahead(self): + assert _compute_distance(2.0, 0.0) == pytest.approx(2.0) + + def test_distance_diagonal(self): + assert _compute_distance(3.0, 4.0) == pytest.approx(5.0) + + def test_distance_behind(self): + assert _compute_distance(-2.0, 0.0) == pytest.approx(2.0) + + def test_bearing_straight_ahead(self): + assert _compute_bearing(1.0, 0.0) == pytest.approx(0.0) + + def test_bearing_to_the_left(self): + # +y = left → bearing = +π/2 + assert _compute_bearing(0.0, 1.0) == pytest.approx(math.pi / 2) + + def test_bearing_to_the_right(self): + # -y = right → bearing = -π/2 + assert _compute_bearing(0.0, -1.0) == pytest.approx(-math.pi / 2) + + def test_bearing_behind_left(self): + # (-x, +y) → ≈ +3π/4 + assert _compute_bearing(-1.0, 1.0) == pytest.approx(3 * math.pi / 4) + + +# ── Follow command — linear axis ────────────────────────────────────────────── + +class TestLinearCmd: + def test_at_exact_target_zero_linear(self): + lin, _ = _compute_follow_cmd(1.5, 0.0, **_DEF) + assert lin == pytest.approx(0.0) + + def test_within_dead_zone_zero_linear(self): + # error = 1.6 - 1.5 = 0.1 < dead_zone=0.3 + lin, _ = _compute_follow_cmd(1.6, 0.0, **_DEF) + assert lin == pytest.approx(0.0) + + def test_outside_dead_zone_approach(self): + # error = 2.0 - 1.5 = 0.5 → 0.6 * 0.5 = 0.3 (below max_linear_vel=0.5) + lin, _ = _compute_follow_cmd(2.0, 0.0, **_DEF) + assert lin == pytest.approx(0.3) + + def test_outside_dead_zone_retreat(self): + # error = 1.0 - 1.5 = -0.5 → 0.6 * -0.5 = -0.3 (below max_linear_vel=0.5) + lin, _ = _compute_follow_cmd(1.0, 0.0, **_DEF) + assert lin == pytest.approx(-0.3) + + def test_linear_clamped_at_max(self): + # error = 3.5 - 1.5 = 2.0 → 0.6 * 2.0 = 1.2 → clamped to 0.5 + lin, _ = _compute_follow_cmd(3.5, 0.0, **_DEF) + assert lin == pytest.approx(0.5) + + def test_linear_clamped_at_min(self): + # error = 0.0 - 1.5 = -1.5 → 0.6 * -1.5 = -0.9 → clamped to -0.5 + lin, _ = _compute_follow_cmd(0.0, 0.0, **_DEF) + assert lin == pytest.approx(-0.5) + + def test_dead_zone_boundary_just_outside(self): + # error = 2.5 - 1.5 + 0.3 + 0.01 = 0.31 → should produce output + lin, _ = _compute_follow_cmd(1.5 + 0.3 + 0.01, 0.0, **_DEF) + assert lin > 0.0 + + def test_dead_zone_boundary_just_inside(self): + lin, _ = _compute_follow_cmd(1.5 + 0.29, 0.0, **_DEF) + assert lin == pytest.approx(0.0) + + +# ── Follow command — angular axis ───────────────────────────────────────────── + +class TestAngularCmd: + def test_person_ahead_zero_angular(self): + _, ang = _compute_follow_cmd(2.0, 0.0, **_DEF) + assert ang == pytest.approx(0.0) + + def test_angular_proportional_to_bearing(self): + _, ang = _compute_follow_cmd(1.5, 0.5, **_DEF) + expected = _clamp(1.2 * _compute_bearing(1.5, 0.5), -1.0, 1.0) + assert ang == pytest.approx(expected, abs=1e-4) + + def test_angular_clamped_at_max(self): + # Person far left: bearing ≈ π/2 → 1.2*π/2 >> 1.0 → clamped + _, ang = _compute_follow_cmd(2.0, 10.0, **_DEF) + assert ang == pytest.approx(1.0) + + def test_angular_clamped_at_min(self): + _, ang = _compute_follow_cmd(2.0, -10.0, **_DEF) + assert ang == pytest.approx(-1.0) + + def test_angular_corrects_within_dead_zone(self): + # Even at target distance (linear=0), bearing should still be corrected + lin, ang = _compute_follow_cmd(1.5, 0.5, **_DEF) + assert lin == pytest.approx(0.0) # dead zone + assert ang != pytest.approx(0.0) # bearing correction active + + def test_angular_corrects_when_retreating(self): + # Robot too close and person is off to the side — should turn AND reverse + lin, ang = _compute_follow_cmd(0.5, 0.5, **_DEF) + assert lin < 0.0 # retreating + assert ang != pytest.approx(0.0) + + +# ── Obstacle override tests ─────────────────────────────────────────────────── + +class TestObstacleOverride: + def test_obstacle_suppresses_forward_motion(self): + lin, _ = _compute_follow_cmd(2.5, 0.0, obstacle_ahead=True, **_DEF) + assert lin == pytest.approx(0.0) + + def test_obstacle_does_not_suppress_reverse(self): + # Too close (1.0m) → robot reverses; obstacle should NOT suppress reverse + # error = 1.0 - 1.5 = -0.5 → kp * error = -0.3 (negative = reverse) + lin, _ = _compute_follow_cmd(1.0, 0.0, obstacle_ahead=True, **_DEF) + assert lin == pytest.approx(-0.3) + + def test_obstacle_allows_turning(self): + lin, ang = _compute_follow_cmd(2.5, 2.0, obstacle_ahead=True, **_DEF) + assert lin == pytest.approx(0.0) # suppressed + assert ang != pytest.approx(0.0) # turning still active + + def test_no_obstacle_normal_forward(self): + # error = 2.0 - 1.5 = 0.5 → 0.6 * 0.5 = 0.3 + lin, _ = _compute_follow_cmd(2.0, 0.0, obstacle_ahead=False, **_DEF) + assert lin == pytest.approx(0.3) + + +# ── Costmap obstacle check tests ────────────────────────────────────────────── + +class TestCostmapCheck: + _RES = 0.05 # 5 cm/cell + _W = 20 + _H = 20 + + def _check(self, data, *, check_dist=0.5, half_width=0.15, threshold=70): + return _check_obstacle( + costmap_data=data, + width=self._W, + height=self._H, + resolution=self._RES, + check_dist=check_dist, + robot_half_width=half_width, + threshold=threshold, + ) + + def test_empty_costmap_no_obstacle(self): + assert self._check(_empty_costmap(self._W, self._H)) is False + + def test_obstacle_one_cell_ahead_detected(self): + # Robot centre col=10; dx=1 → col=11, row=10 (centreline) + data = _costmap_with_obstacle(11, 10, self._W, self._H, value=99) + assert self._check(data) is True + + def test_obstacle_at_check_dist_boundary_detected(self): + # check_dist=0.5 / 0.05 = 10 cells → dx=1..10 → cols 11..20 + # Width=20 means valid cols 0..19; col 20 is out of bounds. + # Last detectable col = cx + 9 = 10 + 9 = 19 (dx=9 within range(1,11)) + data = _costmap_with_obstacle(19, 10, self._W, self._H, value=99) + assert self._check(data) is True + + def test_obstacle_behind_robot_not_detected(self): + data = _costmap_with_obstacle(5, 10, self._W, self._H, value=99) + assert self._check(data) is False + + def test_obstacle_within_corridor_side_detected(self): + # half_width=0.15/0.05=3 cells; row=10+2=12 is inside corridor + data = _costmap_with_obstacle(12, 12, self._W, self._H, value=99) + assert self._check(data) is True + + def test_obstacle_outside_corridor_not_detected(self): + # row=10+4=14 is outside 3-cell corridor + data = _costmap_with_obstacle(12, 14, self._W, self._H, value=99) + assert self._check(data) is False + + def test_value_below_threshold_not_detected(self): + data = _costmap_with_obstacle(11, 10, self._W, self._H, value=50) + assert self._check(data) is False + + def test_value_at_threshold_detected(self): + data = _costmap_with_obstacle(11, 10, self._W, self._H, value=70) + assert self._check(data) is True + + def test_zero_resolution_safe_returns_false(self): + assert _check_obstacle( + _empty_costmap(), width=20, height=20, resolution=0.0, + check_dist=0.5, robot_half_width=0.15, threshold=70) is False + + def test_lethal_threshold_99_detected(self): + data = _costmap_with_obstacle(11, 10, self._W, self._H, value=100) + assert self._check(data, threshold=99) is True + + +# ── State machine tests ─────────────────────────────────────────────────────── + +class TestStateMachine: + _FOLLOWING = "following" + _STOPPING = "stopping" + _SEARCHING = "searching" + + def _transition(self, state, dt_lost, lost_timeout=2.0, search_timeout=5.0): + """Mirror of _control_cb state transition block.""" + if state == self._FOLLOWING: + if dt_lost > search_timeout: + return self._SEARCHING + elif dt_lost > lost_timeout: + return self._STOPPING + return state + + def test_following_stays_within_lost_timeout(self): + assert self._transition(self._FOLLOWING, dt_lost=1.0) == self._FOLLOWING + + def test_following_to_stopping_past_lost_timeout(self): + assert self._transition(self._FOLLOWING, dt_lost=2.5) == self._STOPPING + + def test_following_to_searching_past_search_timeout(self): + assert self._transition(self._FOLLOWING, dt_lost=6.0) == self._SEARCHING + + def test_stopping_does_not_transition(self): + # Only _target_cb can move out of STOPPING + assert self._transition(self._STOPPING, dt_lost=3.0) == self._STOPPING + + def test_searching_does_not_transition(self): + assert self._transition(self._SEARCHING, dt_lost=10.0) == self._SEARCHING + + def test_boundary_exactly_at_lost_timeout(self): + # dt_lost == 2.0 is NOT > 2.0 — still FOLLOWING + assert self._transition(self._FOLLOWING, dt_lost=2.0) == self._FOLLOWING + + def test_boundary_exactly_at_search_timeout(self): + # dt_lost=5.0: NOT > search_timeout=5.0, but IS > lost_timeout=2.0 → STOPPING + assert self._transition(self._FOLLOWING, dt_lost=5.0) == self._STOPPING + + def test_before_first_detection_initial_state_is_stopping(self): + # Initial state is STOPPING (not FOLLOWING) so no transition occurs + assert self._transition(self._STOPPING, dt_lost=1e9) == self._STOPPING + + def test_new_target_resets_to_following(self): + # _target_cb sets state = FOLLOWING unconditionally + state = self._SEARCHING + # simulate _target_cb + state = self._FOLLOWING + assert state == self._FOLLOWING + + +# ── Search and stop behaviour ───────────────────────────────────────────────── + +class TestSearchBehaviour: + def test_searching_publishes_angular_only(self): + state = "searching" + search_angular_vel = 0.3 + linear_x = 0.0 if state == "searching" else 99.9 + angular_z = search_angular_vel if state == "searching" else 0.0 + assert linear_x == pytest.approx(0.0) + assert angular_z == pytest.approx(0.3) + + def test_stopping_publishes_zero(self): + state = "stopping" + linear_x = 0.0 + angular_z = 0.0 + assert linear_x == pytest.approx(0.0) + assert angular_z == pytest.approx(0.0) + + +# ── Follow-enabled gate ─────────────────────────────────────────────────────── + +class TestFollowEnabled: + def test_disabled_zeroes_output(self): + follow_enabled = False + lin, ang = (0.0, 0.0) if not follow_enabled else \ + _compute_follow_cmd(3.0, 0.0, **_DEF) + assert lin == pytest.approx(0.0) + assert ang == pytest.approx(0.0) + + def test_enabled_passes_through(self): + follow_enabled = True + lin, ang = (0.0, 0.0) if not follow_enabled else \ + _compute_follow_cmd(3.0, 0.0, **_DEF) + assert lin != pytest.approx(0.0)