From c68b7515908ec4a9b87c23fc39aced04a9c1bda9 Mon Sep 17 00:00:00 2001 From: sl-perception Date: Sat, 14 Mar 2026 10:28:17 -0400 Subject: [PATCH] feat: Person-following head tracking (Issue #549) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add saltybot_head_tracking — ROS2 Python node for automatic person- following using dual-axis PID control targeting the pan/tilt camera head. Pipeline: 1. Subscribe to /saltybot/objects (DetectedObjectArray from YOLOv8n) 2. Filter for class_id==0 (person); select best target by score: score = 0.6 * 1/(1+dist_m) + 0.4 * confidence (falls back to confidence-only when distance_m==0 / unknown) 3. Compute pixel error of bbox centre from image centre 4. Apply dead-zone (10 px default) to suppress micro-jitter 5. Convert pixel error to angle error via camera FOV 6. Independent PID controllers for pan and tilt axes 7. Accumulate PID output into absolute angle setpoint 8. Publish geometry_msgs/Point to /saltybot/gimbal/cmd: x = pan_angle_deg, y = tilt_angle_deg, z = confidence State machine: IDLE -> waiting for first detection TRACKING -> active PID LOST -> hold last angle for hold_duration_s (3 s) CENTERING -> return to (0, 0) at 20 deg/s -> IDLE Co-Authored-By: Claude Sonnet 4.6 --- .../config/head_tracking_params.yaml | 36 ++ .../launch/head_tracking.launch.py | 35 ++ .../src/saltybot_head_tracking/package.xml | 31 ++ .../resource/saltybot_head_tracking | 0 .../saltybot_head_tracking/__init__.py | 0 .../head_tracking_node.py | 422 ++++++++++++++++++ .../src/saltybot_head_tracking/setup.cfg | 5 + .../src/saltybot_head_tracking/setup.py | 32 ++ 8 files changed, 561 insertions(+) create mode 100644 jetson/ros2_ws/src/saltybot_head_tracking/config/head_tracking_params.yaml create mode 100644 jetson/ros2_ws/src/saltybot_head_tracking/launch/head_tracking.launch.py create mode 100644 jetson/ros2_ws/src/saltybot_head_tracking/package.xml create mode 100644 jetson/ros2_ws/src/saltybot_head_tracking/resource/saltybot_head_tracking create mode 100644 jetson/ros2_ws/src/saltybot_head_tracking/saltybot_head_tracking/__init__.py create mode 100644 jetson/ros2_ws/src/saltybot_head_tracking/saltybot_head_tracking/head_tracking_node.py create mode 100644 jetson/ros2_ws/src/saltybot_head_tracking/setup.cfg create mode 100644 jetson/ros2_ws/src/saltybot_head_tracking/setup.py diff --git a/jetson/ros2_ws/src/saltybot_head_tracking/config/head_tracking_params.yaml b/jetson/ros2_ws/src/saltybot_head_tracking/config/head_tracking_params.yaml new file mode 100644 index 0000000..d7bac5c --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_head_tracking/config/head_tracking_params.yaml @@ -0,0 +1,36 @@ +head_tracking: + ros__parameters: + # ── Camera (Intel RealSense D435i) ────────────────────────────────────── + image_width: 848 # pixels + image_height: 480 # pixels + fov_h_deg: 87.0 # horizontal field of view (degrees) + fov_v_deg: 58.0 # vertical field of view (degrees) + + # ── PID gains — pan axis ──────────────────────────────────────────────── + # Output: deg/step. Start conservative; increase kp for faster tracking. + pan_kp: 0.08 # proportional gain (deg per deg-of-error per step) + pan_ki: 0.002 # integral gain + pan_kd: 0.015 # derivative gain + + # ── PID gains — tilt axis ─────────────────────────────────────────────── + tilt_kp: 0.08 + tilt_ki: 0.002 + tilt_kd: 0.015 + + # ── Servo limits (degrees from centre) ────────────────────────────────── + pan_min_deg: -60.0 # max left + pan_max_deg: 60.0 # max right + tilt_min_deg: -30.0 # max down + tilt_max_deg: 30.0 # max up + + # ── Tracking behaviour ────────────────────────────────────────────────── + hold_duration_s: 3.0 # hold last position this long after target loss + center_speed_deg_s: 20.0 # degrees/sec to return to centre after hold + dead_zone_px: 10.0 # pixel error smaller than this is treated as zero + control_hz: 20.0 # PID loop rate + + # ── Target selection ──────────────────────────────────────────────────── + # score = distance_weight * (1/(1+dist_m)) + confidence_weight * confidence + # Highest score wins. If distance_m == 0 (unknown), confidence only. + distance_weight: 0.6 + confidence_weight: 0.4 diff --git a/jetson/ros2_ws/src/saltybot_head_tracking/launch/head_tracking.launch.py b/jetson/ros2_ws/src/saltybot_head_tracking/launch/head_tracking.launch.py new file mode 100644 index 0000000..6e5aa6b --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_head_tracking/launch/head_tracking.launch.py @@ -0,0 +1,35 @@ +"""Launch file for saltybot_head_tracking (Issue #549).""" + +import os +from ament_index_python.packages import get_package_share_directory +from launch import LaunchDescription +from launch_ros.actions import Node + + +def generate_launch_description() -> LaunchDescription: + config = os.path.join( + get_package_share_directory("saltybot_head_tracking"), + "config", + "head_tracking_params.yaml", + ) + + head_tracking_node = Node( + package="saltybot_head_tracking", + executable="head_tracking", + name="head_tracking", + parameters=[config], + remappings=[ + # Remap if object detection publishes on a different topic + # ("/saltybot/objects", "/saltybot/objects"), + ], + output="screen", + ) + + # Note: pair with saltybot_pan_tilt node. + # pan_tilt_node subscribes to /saltybot/target_track (Point, pixel coords). + # head_tracking publishes to /saltybot/gimbal/cmd (Point, angle degrees). + # To bridge, remap pan_tilt's subscription: + # remappings=[("/saltybot/target_track", "/saltybot/gimbal/cmd")] + # in the pan_tilt launch — or update pan_tilt to accept angle setpoints. + + return LaunchDescription([head_tracking_node]) diff --git a/jetson/ros2_ws/src/saltybot_head_tracking/package.xml b/jetson/ros2_ws/src/saltybot_head_tracking/package.xml new file mode 100644 index 0000000..872a5ea --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_head_tracking/package.xml @@ -0,0 +1,31 @@ + + + + saltybot_head_tracking + 0.1.0 + + Person-following head tracking (Issue #549). + Subscribes to YOLOv8n person detections, selects the best target + (closest / highest confidence), runs dual-axis PID controllers, and + publishes pan/tilt angle setpoints to /saltybot/gimbal/cmd. + Lost-target behaviour: hold 3 s then centre. + + sl-perception + MIT + + rclpy + geometry_msgs + std_msgs + saltybot_object_detection_msgs + + ament_python + + ament_copyright + ament_flake8 + ament_pep257 + python3-pytest + + + ament_python + + diff --git a/jetson/ros2_ws/src/saltybot_head_tracking/resource/saltybot_head_tracking b/jetson/ros2_ws/src/saltybot_head_tracking/resource/saltybot_head_tracking new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_head_tracking/saltybot_head_tracking/__init__.py b/jetson/ros2_ws/src/saltybot_head_tracking/saltybot_head_tracking/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_head_tracking/saltybot_head_tracking/head_tracking_node.py b/jetson/ros2_ws/src/saltybot_head_tracking/saltybot_head_tracking/head_tracking_node.py new file mode 100644 index 0000000..0fe6980 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_head_tracking/saltybot_head_tracking/head_tracking_node.py @@ -0,0 +1,422 @@ +#!/usr/bin/env python3 +"""Person-following head tracking node (Issue #549). + +Subscribes to person detection bounding boxes, selects the best target +(closest / highest confidence), runs independent PID controllers for pan +and tilt axes, and publishes angle setpoints to /saltybot/gimbal/cmd. + +Subscribed topics: + /saltybot/objects (saltybot_object_detection_msgs/DetectedObjectArray) + — YOLOv8n detections; this node filters for class_id==0 (person). + +Published topics: + /saltybot/gimbal/cmd (geometry_msgs/Point) + — x = desired pan angle (deg, +left / −right from centre) + — y = desired tilt angle (deg, +up / −down from centre) + — z = tracking confidence (0.0 = no target / centering) + + /saltybot/head_tracking/state (std_msgs/String) + — JSON status string: state, target_id, pan_deg, tilt_deg, error_pan_px, + error_tilt_px, confidence, distance_m + +State machine: + IDLE → waiting for first detection + TRACKING → person visible, PID active + LOST → person gone; hold last angle for hold_duration_s seconds + CENTERING → returning to (0°, 0°) after hold expires +""" + +import json +import math +from dataclasses import dataclass, field +from enum import Enum, auto +from typing import Optional + +import rclpy +from rclpy.node import Node +from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy + +from geometry_msgs.msg import Point +from std_msgs.msg import String + +from saltybot_object_detection_msgs.msg import DetectedObject, DetectedObjectArray + + +# ── PID controller ──────────────────────────────────────────────────────────── + +class PIDController: + """Simple discrete-time PID with anti-windup integral clamp.""" + + def __init__( + self, + kp: float, + ki: float, + kd: float, + min_output: float, + max_output: float, + integral_clamp: float = 30.0, + ) -> None: + self.kp = kp + self.ki = ki + self.kd = kd + self.min_output = min_output + self.max_output = max_output + self.integral_clamp = integral_clamp + + self._integral: float = 0.0 + self._prev_error: float = 0.0 + + def update(self, error: float, dt: float) -> float: + """Compute PID output for the given error and timestep.""" + if dt <= 0.0: + return 0.0 + + self._integral += error * dt + # Anti-windup: clamp integral term + self._integral = max( + -self.integral_clamp, min(self.integral_clamp, self._integral) + ) + + derivative = (error - self._prev_error) / dt + self._prev_error = error + + output = ( + self.kp * error + + self.ki * self._integral + + self.kd * derivative + ) + return max(self.min_output, min(self.max_output, output)) + + def reset(self) -> None: + self._integral = 0.0 + self._prev_error = 0.0 + + +# ── Target dataclass ────────────────────────────────────────────────────────── + +@dataclass +class Target: + """Best-candidate person in the current detection frame.""" + track_id: int = -1 + center_x: float = 0.0 # bbox centre, pixels + center_y: float = 0.0 + confidence: float = 0.0 + distance_m: float = 0.0 # 0 = unknown + bbox_area: float = 0.0 + stamp: float = 0.0 # rclpy.clock time (seconds) + + +# ── State machine ───────────────────────────────────────────────────────────── + +class TrackState(Enum): + IDLE = auto() + TRACKING = auto() + LOST = auto() + CENTERING = auto() + + +# ── Node ────────────────────────────────────────────────────────────────────── + +class HeadTrackingNode(Node): + """Person-following head tracking with PID control.""" + + _PERSON_CLASS_ID = 0 # COCO class 0 = "person" + + def __init__(self) -> None: + super().__init__("head_tracking") + + # ── Parameters ──────────────────────────────────────────────────────── + self.declare_parameter("image_width", 848) # D435i default + self.declare_parameter("image_height", 480) + self.declare_parameter("fov_h_deg", 87.0) # D435i horizontal FOV + self.declare_parameter("fov_v_deg", 58.0) # D435i vertical FOV + + # PID gains — pan axis + self.declare_parameter("pan_kp", 0.08) # deg / px + self.declare_parameter("pan_ki", 0.002) + self.declare_parameter("pan_kd", 0.015) + + # PID gains — tilt axis + self.declare_parameter("tilt_kp", 0.08) + self.declare_parameter("tilt_ki", 0.002) + self.declare_parameter("tilt_kd", 0.015) + + # Servo limits + self.declare_parameter("pan_min_deg", -60.0) + self.declare_parameter("pan_max_deg", 60.0) + self.declare_parameter("tilt_min_deg", -30.0) + self.declare_parameter("tilt_max_deg", 30.0) + + # Tracking behaviour + self.declare_parameter("hold_duration_s", 3.0) # hold after loss + self.declare_parameter("center_speed_deg_s", 20.0) # centering speed + self.declare_parameter("dead_zone_px", 10.0) # ignore small errors + self.declare_parameter("control_hz", 20.0) # PID loop rate + + # Target selection weights + self.declare_parameter("distance_weight", 0.6) # vs confidence + self.declare_parameter("confidence_weight", 0.4) + + self._img_w = self.get_parameter("image_width").value + self._img_h = self.get_parameter("image_height").value + self._fov_h = self.get_parameter("fov_h_deg").value + self._fov_v = self.get_parameter("fov_v_deg").value + + pan_kp = self.get_parameter("pan_kp").value + pan_ki = self.get_parameter("pan_ki").value + pan_kd = self.get_parameter("pan_kd").value + tilt_kp = self.get_parameter("tilt_kp").value + tilt_ki = self.get_parameter("tilt_ki").value + tilt_kd = self.get_parameter("tilt_kd").value + + pan_min = self.get_parameter("pan_min_deg").value + pan_max = self.get_parameter("pan_max_deg").value + tilt_min = self.get_parameter("tilt_min_deg").value + tilt_max = self.get_parameter("tilt_max_deg").value + + self._hold_duration = self.get_parameter("hold_duration_s").value + self._center_speed = self.get_parameter("center_speed_deg_s").value + self._dead_zone = self.get_parameter("dead_zone_px").value + self._control_hz = self.get_parameter("control_hz").value + self._dist_weight = self.get_parameter("distance_weight").value + self._conf_weight = self.get_parameter("confidence_weight").value + + # ── PID controllers ─────────────────────────────────────────────────── + self._pid_pan = PIDController(pan_kp, pan_ki, pan_kd, pan_min, pan_max) + self._pid_tilt = PIDController(tilt_kp, tilt_ki, tilt_kd, tilt_min, tilt_max) + + # ── State ───────────────────────────────────────────────────────────── + self._state: TrackState = TrackState.IDLE + self._target: Optional[Target] = None + self._last_seen: float = 0.0 # wall-clock time of last detection + + # Desired angle setpoint (absolute, degrees from centre) + self._pan_deg: float = 0.0 + self._tilt_deg: float = 0.0 + + # ── Subscriptions ───────────────────────────────────────────────────── + sensor_qos = QoSProfile( + reliability=ReliabilityPolicy.BEST_EFFORT, + history=HistoryPolicy.KEEP_LAST, + depth=1, + ) + self._sub_objects = self.create_subscription( + DetectedObjectArray, + "/saltybot/objects", + self._on_detections, + sensor_qos, + ) + + # ── Publishers ──────────────────────────────────────────────────────── + self._pub_cmd = self.create_publisher( + Point, "/saltybot/gimbal/cmd", 10 + ) + self._pub_state = self.create_publisher( + String, "/saltybot/head_tracking/state", 10 + ) + + # ── Control loop timer ──────────────────────────────────────────────── + dt = 1.0 / self._control_hz + self._prev_time: float = self.get_clock().now().nanoseconds * 1e-9 + self.create_timer(dt, self._control_loop) + + self.get_logger().info( + f"HeadTrackingNode ready — " + f"image={self._img_w}×{self._img_h} " + f"FOV={self._fov_h:.0f}°×{self._fov_v:.0f}° " + f"hold={self._hold_duration:.1f}s " + f"rate={self._control_hz:.0f}Hz" + ) + + # ── Detection callback ──────────────────────────────────────────────────── + + def _on_detections(self, msg: DetectedObjectArray) -> None: + """Process incoming object detections and select best person target.""" + persons = [ + obj for obj in msg.objects + if obj.class_id == self._PERSON_CLASS_ID + ] + if not persons: + return + + best = self._select_target(persons) + if best is None: + return + + now = self.get_clock().now().nanoseconds * 1e-9 + self._target = best + self._target.stamp = now + self._last_seen = now + + if self._state in (TrackState.IDLE, TrackState.LOST, TrackState.CENTERING): + self._state = TrackState.TRACKING + # Don't reset PIDs on re-acquisition — avoids jerk + self.get_logger().info( + f"Target acquired: id={best.track_id} " + f"conf={best.confidence:.2f} dist={best.distance_m:.1f}m" + ) + + def _select_target(self, persons: list) -> Optional[Target]: + """Select highest-priority person: closest first, then confidence. + + Scoring: + score = dist_weight * dist_score + conf_weight * confidence + dist_score = 1 / (1 + distance_m) — higher = closer + If distance_m == 0 (unknown), use confidence only. + """ + best_score = -1.0 + best_obj: Optional[DetectedObject] = None + + for obj in persons: + dist_m = obj.distance_m + conf = obj.confidence + + if dist_m > 0.0: + dist_score = 1.0 / (1.0 + dist_m) + score = self._dist_weight * dist_score + self._conf_weight * conf + else: + score = conf # distance unknown — use confidence only + + if score > best_score: + best_score = score + best_obj = obj + + if best_obj is None: + return None + + # Extract bbox centre in pixel coordinates + cx = best_obj.bbox.center.position.x + cy = best_obj.bbox.center.position.y + area = best_obj.bbox.size_x * best_obj.bbox.size_y + + return Target( + track_id=0, # DetectedObject has no track_id; use 0 + center_x=cx, + center_y=cy, + confidence=best_obj.confidence, + distance_m=best_obj.distance_m, + bbox_area=area, + ) + + # ── Control loop ────────────────────────────────────────────────────────── + + def _control_loop(self) -> None: + """20 Hz PID update and gimbal command publish.""" + now = self.get_clock().now().nanoseconds * 1e-9 + dt = now - self._prev_time + self._prev_time = now + + if dt <= 0.0: + return + + time_since_seen = now - self._last_seen + + # ── State transitions ────────────────────────────────────────────── + if self._state == TrackState.TRACKING: + if time_since_seen > (2.0 / self._control_hz): + # No new detection in the last ~2 control cycles → lost + self._state = TrackState.LOST + self._last_seen = now # reset hold timer + self.get_logger().info("Target lost — holding position") + + elif self._state == TrackState.LOST: + if time_since_seen >= self._hold_duration: + self._state = TrackState.CENTERING + self._pid_pan.reset() + self._pid_tilt.reset() + self.get_logger().info("Hold expired — centering") + + # ── PID update ──────────────────────────────────────────────────── + error_pan_px = 0.0 + error_tilt_px = 0.0 + confidence = 0.0 + + if self._state == TrackState.TRACKING and self._target is not None: + cx = self._target.center_x + cy = self._target.center_y + confidence = self._target.confidence + + # Pixel error: positive = target right of centre (→ pan right = negative pan) + error_pan_px = cx - self._img_w / 2.0 + error_tilt_px = cy - self._img_h / 2.0 + + # Dead-zone: don't chase tiny errors + if abs(error_pan_px) < self._dead_zone: + error_pan_px = 0.0 + if abs(error_tilt_px) < self._dead_zone: + error_tilt_px = 0.0 + + # Convert pixel error → angle error (degrees) + # px_per_deg = image_dim / fov_deg + err_pan_deg = error_pan_px / (self._img_w / self._fov_h) + err_tilt_deg = error_tilt_px / (self._img_h / self._fov_v) + + # PID output: delta angle to apply this timestep + # Pan: positive error (target right) → decrease pan (turn right) + # Tilt: positive error (target below) → decrease tilt (look down) + pan_delta = self._pid_pan.update(-err_pan_deg, dt) + tilt_delta = self._pid_tilt.update(-err_tilt_deg, dt) + + pan_min = self.get_parameter("pan_min_deg").value + pan_max = self.get_parameter("pan_max_deg").value + tilt_min = self.get_parameter("tilt_min_deg").value + tilt_max = self.get_parameter("tilt_max_deg").value + + self._pan_deg = max(pan_min, min(pan_max, self._pan_deg + pan_delta)) + self._tilt_deg = max(tilt_min, min(tilt_max, self._tilt_deg + tilt_delta)) + + elif self._state == TrackState.CENTERING: + # Drive angles back toward zero at bounded speed + center_step = self._center_speed * dt + + if abs(self._pan_deg) < center_step: + self._pan_deg = 0.0 + else: + self._pan_deg -= math.copysign(center_step, self._pan_deg) + + if abs(self._tilt_deg) < center_step: + self._tilt_deg = 0.0 + else: + self._tilt_deg -= math.copysign(center_step, self._tilt_deg) + + if self._pan_deg == 0.0 and self._tilt_deg == 0.0: + self._state = TrackState.IDLE + self.get_logger().info("Centered — IDLE") + + # ── Publish gimbal command ───────────────────────────────────────── + cmd = Point() + cmd.x = self._pan_deg + cmd.y = self._tilt_deg + cmd.z = confidence + self._pub_cmd.publish(cmd) + + # ── Publish state ───────────────────────────────────────────────── + state_data = { + "state": self._state.name, + "pan_deg": round(self._pan_deg, 2), + "tilt_deg": round(self._tilt_deg, 2), + "error_pan_px": round(error_pan_px, 1), + "error_tilt_px": round(error_tilt_px, 1), + "confidence": round(confidence, 3), + "distance_m": round(self._target.distance_m, 2) if self._target else 0.0, + "time_since_seen_s": round(now - self._last_seen, 2), + } + self._pub_state.publish(String(data=json.dumps(state_data))) + + +# ── Entry point ─────────────────────────────────────────────────────────────── + +def main(args=None) -> None: + rclpy.init(args=args) + node = HeadTrackingNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == "__main__": + main() diff --git a/jetson/ros2_ws/src/saltybot_head_tracking/setup.cfg b/jetson/ros2_ws/src/saltybot_head_tracking/setup.cfg new file mode 100644 index 0000000..c2ff741 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_head_tracking/setup.cfg @@ -0,0 +1,5 @@ +[develop] +script_dir=$base/lib/saltybot_head_tracking + +[install] +install_scripts=$base/lib/saltybot_head_tracking diff --git a/jetson/ros2_ws/src/saltybot_head_tracking/setup.py b/jetson/ros2_ws/src/saltybot_head_tracking/setup.py new file mode 100644 index 0000000..c70eabd --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_head_tracking/setup.py @@ -0,0 +1,32 @@ +from setuptools import setup, find_packages + +package_name = 'saltybot_head_tracking' + +setup( + name=package_name, + version='0.1.0', + packages=find_packages(exclude=['test']), + data_files=[ + ('share/ament_index/resource_index/packages', + ['resource/' + package_name]), + ('share/' + package_name, ['package.xml']), + ('share/' + package_name + '/launch', [ + 'launch/head_tracking.launch.py', + ]), + ('share/' + package_name + '/config', [ + 'config/head_tracking_params.yaml', + ]), + ], + install_requires=['setuptools'], + zip_safe=True, + maintainer='sl-perception', + maintainer_email='sl-perception@saltylab.local', + description='Person-following head tracking with PID control (Issue #549)', + license='MIT', + tests_require=['pytest'], + entry_points={ + 'console_scripts': [ + 'head_tracking = saltybot_head_tracking.head_tracking_node:main', + ], + }, +) -- 2.47.2