diff --git a/jetson/ros2_ws/src/saltybot_head_tracking/config/head_tracking_params.yaml b/jetson/ros2_ws/src/saltybot_head_tracking/config/head_tracking_params.yaml new file mode 100644 index 0000000..d7bac5c --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_head_tracking/config/head_tracking_params.yaml @@ -0,0 +1,36 @@ +head_tracking: + ros__parameters: + # ── Camera (Intel RealSense D435i) ────────────────────────────────────── + image_width: 848 # pixels + image_height: 480 # pixels + fov_h_deg: 87.0 # horizontal field of view (degrees) + fov_v_deg: 58.0 # vertical field of view (degrees) + + # ── PID gains — pan axis ──────────────────────────────────────────────── + # Output: deg/step. Start conservative; increase kp for faster tracking. + pan_kp: 0.08 # proportional gain (deg per deg-of-error per step) + pan_ki: 0.002 # integral gain + pan_kd: 0.015 # derivative gain + + # ── PID gains — tilt axis ─────────────────────────────────────────────── + tilt_kp: 0.08 + tilt_ki: 0.002 + tilt_kd: 0.015 + + # ── Servo limits (degrees from centre) ────────────────────────────────── + pan_min_deg: -60.0 # max left + pan_max_deg: 60.0 # max right + tilt_min_deg: -30.0 # max down + tilt_max_deg: 30.0 # max up + + # ── Tracking behaviour ────────────────────────────────────────────────── + hold_duration_s: 3.0 # hold last position this long after target loss + center_speed_deg_s: 20.0 # degrees/sec to return to centre after hold + dead_zone_px: 10.0 # pixel error smaller than this is treated as zero + control_hz: 20.0 # PID loop rate + + # ── Target selection ──────────────────────────────────────────────────── + # score = distance_weight * (1/(1+dist_m)) + confidence_weight * confidence + # Highest score wins. If distance_m == 0 (unknown), confidence only. + distance_weight: 0.6 + confidence_weight: 0.4 diff --git a/jetson/ros2_ws/src/saltybot_head_tracking/launch/head_tracking.launch.py b/jetson/ros2_ws/src/saltybot_head_tracking/launch/head_tracking.launch.py new file mode 100644 index 0000000..6e5aa6b --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_head_tracking/launch/head_tracking.launch.py @@ -0,0 +1,35 @@ +"""Launch file for saltybot_head_tracking (Issue #549).""" + +import os +from ament_index_python.packages import get_package_share_directory +from launch import LaunchDescription +from launch_ros.actions import Node + + +def generate_launch_description() -> LaunchDescription: + config = os.path.join( + get_package_share_directory("saltybot_head_tracking"), + "config", + "head_tracking_params.yaml", + ) + + head_tracking_node = Node( + package="saltybot_head_tracking", + executable="head_tracking", + name="head_tracking", + parameters=[config], + remappings=[ + # Remap if object detection publishes on a different topic + # ("/saltybot/objects", "/saltybot/objects"), + ], + output="screen", + ) + + # Note: pair with saltybot_pan_tilt node. + # pan_tilt_node subscribes to /saltybot/target_track (Point, pixel coords). + # head_tracking publishes to /saltybot/gimbal/cmd (Point, angle degrees). + # To bridge, remap pan_tilt's subscription: + # remappings=[("/saltybot/target_track", "/saltybot/gimbal/cmd")] + # in the pan_tilt launch — or update pan_tilt to accept angle setpoints. + + return LaunchDescription([head_tracking_node]) diff --git a/jetson/ros2_ws/src/saltybot_head_tracking/package.xml b/jetson/ros2_ws/src/saltybot_head_tracking/package.xml new file mode 100644 index 0000000..872a5ea --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_head_tracking/package.xml @@ -0,0 +1,31 @@ + + + + saltybot_head_tracking + 0.1.0 + + Person-following head tracking (Issue #549). + Subscribes to YOLOv8n person detections, selects the best target + (closest / highest confidence), runs dual-axis PID controllers, and + publishes pan/tilt angle setpoints to /saltybot/gimbal/cmd. + Lost-target behaviour: hold 3 s then centre. + + sl-perception + MIT + + rclpy + geometry_msgs + std_msgs + saltybot_object_detection_msgs + + ament_python + + ament_copyright + ament_flake8 + ament_pep257 + python3-pytest + + + ament_python + + diff --git a/jetson/ros2_ws/src/saltybot_head_tracking/resource/saltybot_head_tracking b/jetson/ros2_ws/src/saltybot_head_tracking/resource/saltybot_head_tracking new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_head_tracking/saltybot_head_tracking/__init__.py b/jetson/ros2_ws/src/saltybot_head_tracking/saltybot_head_tracking/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_head_tracking/saltybot_head_tracking/head_tracking_node.py b/jetson/ros2_ws/src/saltybot_head_tracking/saltybot_head_tracking/head_tracking_node.py new file mode 100644 index 0000000..0fe6980 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_head_tracking/saltybot_head_tracking/head_tracking_node.py @@ -0,0 +1,422 @@ +#!/usr/bin/env python3 +"""Person-following head tracking node (Issue #549). + +Subscribes to person detection bounding boxes, selects the best target +(closest / highest confidence), runs independent PID controllers for pan +and tilt axes, and publishes angle setpoints to /saltybot/gimbal/cmd. + +Subscribed topics: + /saltybot/objects (saltybot_object_detection_msgs/DetectedObjectArray) + — YOLOv8n detections; this node filters for class_id==0 (person). + +Published topics: + /saltybot/gimbal/cmd (geometry_msgs/Point) + — x = desired pan angle (deg, +left / −right from centre) + — y = desired tilt angle (deg, +up / −down from centre) + — z = tracking confidence (0.0 = no target / centering) + + /saltybot/head_tracking/state (std_msgs/String) + — JSON status string: state, target_id, pan_deg, tilt_deg, error_pan_px, + error_tilt_px, confidence, distance_m + +State machine: + IDLE → waiting for first detection + TRACKING → person visible, PID active + LOST → person gone; hold last angle for hold_duration_s seconds + CENTERING → returning to (0°, 0°) after hold expires +""" + +import json +import math +from dataclasses import dataclass, field +from enum import Enum, auto +from typing import Optional + +import rclpy +from rclpy.node import Node +from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy + +from geometry_msgs.msg import Point +from std_msgs.msg import String + +from saltybot_object_detection_msgs.msg import DetectedObject, DetectedObjectArray + + +# ── PID controller ──────────────────────────────────────────────────────────── + +class PIDController: + """Simple discrete-time PID with anti-windup integral clamp.""" + + def __init__( + self, + kp: float, + ki: float, + kd: float, + min_output: float, + max_output: float, + integral_clamp: float = 30.0, + ) -> None: + self.kp = kp + self.ki = ki + self.kd = kd + self.min_output = min_output + self.max_output = max_output + self.integral_clamp = integral_clamp + + self._integral: float = 0.0 + self._prev_error: float = 0.0 + + def update(self, error: float, dt: float) -> float: + """Compute PID output for the given error and timestep.""" + if dt <= 0.0: + return 0.0 + + self._integral += error * dt + # Anti-windup: clamp integral term + self._integral = max( + -self.integral_clamp, min(self.integral_clamp, self._integral) + ) + + derivative = (error - self._prev_error) / dt + self._prev_error = error + + output = ( + self.kp * error + + self.ki * self._integral + + self.kd * derivative + ) + return max(self.min_output, min(self.max_output, output)) + + def reset(self) -> None: + self._integral = 0.0 + self._prev_error = 0.0 + + +# ── Target dataclass ────────────────────────────────────────────────────────── + +@dataclass +class Target: + """Best-candidate person in the current detection frame.""" + track_id: int = -1 + center_x: float = 0.0 # bbox centre, pixels + center_y: float = 0.0 + confidence: float = 0.0 + distance_m: float = 0.0 # 0 = unknown + bbox_area: float = 0.0 + stamp: float = 0.0 # rclpy.clock time (seconds) + + +# ── State machine ───────────────────────────────────────────────────────────── + +class TrackState(Enum): + IDLE = auto() + TRACKING = auto() + LOST = auto() + CENTERING = auto() + + +# ── Node ────────────────────────────────────────────────────────────────────── + +class HeadTrackingNode(Node): + """Person-following head tracking with PID control.""" + + _PERSON_CLASS_ID = 0 # COCO class 0 = "person" + + def __init__(self) -> None: + super().__init__("head_tracking") + + # ── Parameters ──────────────────────────────────────────────────────── + self.declare_parameter("image_width", 848) # D435i default + self.declare_parameter("image_height", 480) + self.declare_parameter("fov_h_deg", 87.0) # D435i horizontal FOV + self.declare_parameter("fov_v_deg", 58.0) # D435i vertical FOV + + # PID gains — pan axis + self.declare_parameter("pan_kp", 0.08) # deg / px + self.declare_parameter("pan_ki", 0.002) + self.declare_parameter("pan_kd", 0.015) + + # PID gains — tilt axis + self.declare_parameter("tilt_kp", 0.08) + self.declare_parameter("tilt_ki", 0.002) + self.declare_parameter("tilt_kd", 0.015) + + # Servo limits + self.declare_parameter("pan_min_deg", -60.0) + self.declare_parameter("pan_max_deg", 60.0) + self.declare_parameter("tilt_min_deg", -30.0) + self.declare_parameter("tilt_max_deg", 30.0) + + # Tracking behaviour + self.declare_parameter("hold_duration_s", 3.0) # hold after loss + self.declare_parameter("center_speed_deg_s", 20.0) # centering speed + self.declare_parameter("dead_zone_px", 10.0) # ignore small errors + self.declare_parameter("control_hz", 20.0) # PID loop rate + + # Target selection weights + self.declare_parameter("distance_weight", 0.6) # vs confidence + self.declare_parameter("confidence_weight", 0.4) + + self._img_w = self.get_parameter("image_width").value + self._img_h = self.get_parameter("image_height").value + self._fov_h = self.get_parameter("fov_h_deg").value + self._fov_v = self.get_parameter("fov_v_deg").value + + pan_kp = self.get_parameter("pan_kp").value + pan_ki = self.get_parameter("pan_ki").value + pan_kd = self.get_parameter("pan_kd").value + tilt_kp = self.get_parameter("tilt_kp").value + tilt_ki = self.get_parameter("tilt_ki").value + tilt_kd = self.get_parameter("tilt_kd").value + + pan_min = self.get_parameter("pan_min_deg").value + pan_max = self.get_parameter("pan_max_deg").value + tilt_min = self.get_parameter("tilt_min_deg").value + tilt_max = self.get_parameter("tilt_max_deg").value + + self._hold_duration = self.get_parameter("hold_duration_s").value + self._center_speed = self.get_parameter("center_speed_deg_s").value + self._dead_zone = self.get_parameter("dead_zone_px").value + self._control_hz = self.get_parameter("control_hz").value + self._dist_weight = self.get_parameter("distance_weight").value + self._conf_weight = self.get_parameter("confidence_weight").value + + # ── PID controllers ─────────────────────────────────────────────────── + self._pid_pan = PIDController(pan_kp, pan_ki, pan_kd, pan_min, pan_max) + self._pid_tilt = PIDController(tilt_kp, tilt_ki, tilt_kd, tilt_min, tilt_max) + + # ── State ───────────────────────────────────────────────────────────── + self._state: TrackState = TrackState.IDLE + self._target: Optional[Target] = None + self._last_seen: float = 0.0 # wall-clock time of last detection + + # Desired angle setpoint (absolute, degrees from centre) + self._pan_deg: float = 0.0 + self._tilt_deg: float = 0.0 + + # ── Subscriptions ───────────────────────────────────────────────────── + sensor_qos = QoSProfile( + reliability=ReliabilityPolicy.BEST_EFFORT, + history=HistoryPolicy.KEEP_LAST, + depth=1, + ) + self._sub_objects = self.create_subscription( + DetectedObjectArray, + "/saltybot/objects", + self._on_detections, + sensor_qos, + ) + + # ── Publishers ──────────────────────────────────────────────────────── + self._pub_cmd = self.create_publisher( + Point, "/saltybot/gimbal/cmd", 10 + ) + self._pub_state = self.create_publisher( + String, "/saltybot/head_tracking/state", 10 + ) + + # ── Control loop timer ──────────────────────────────────────────────── + dt = 1.0 / self._control_hz + self._prev_time: float = self.get_clock().now().nanoseconds * 1e-9 + self.create_timer(dt, self._control_loop) + + self.get_logger().info( + f"HeadTrackingNode ready — " + f"image={self._img_w}×{self._img_h} " + f"FOV={self._fov_h:.0f}°×{self._fov_v:.0f}° " + f"hold={self._hold_duration:.1f}s " + f"rate={self._control_hz:.0f}Hz" + ) + + # ── Detection callback ──────────────────────────────────────────────────── + + def _on_detections(self, msg: DetectedObjectArray) -> None: + """Process incoming object detections and select best person target.""" + persons = [ + obj for obj in msg.objects + if obj.class_id == self._PERSON_CLASS_ID + ] + if not persons: + return + + best = self._select_target(persons) + if best is None: + return + + now = self.get_clock().now().nanoseconds * 1e-9 + self._target = best + self._target.stamp = now + self._last_seen = now + + if self._state in (TrackState.IDLE, TrackState.LOST, TrackState.CENTERING): + self._state = TrackState.TRACKING + # Don't reset PIDs on re-acquisition — avoids jerk + self.get_logger().info( + f"Target acquired: id={best.track_id} " + f"conf={best.confidence:.2f} dist={best.distance_m:.1f}m" + ) + + def _select_target(self, persons: list) -> Optional[Target]: + """Select highest-priority person: closest first, then confidence. + + Scoring: + score = dist_weight * dist_score + conf_weight * confidence + dist_score = 1 / (1 + distance_m) — higher = closer + If distance_m == 0 (unknown), use confidence only. + """ + best_score = -1.0 + best_obj: Optional[DetectedObject] = None + + for obj in persons: + dist_m = obj.distance_m + conf = obj.confidence + + if dist_m > 0.0: + dist_score = 1.0 / (1.0 + dist_m) + score = self._dist_weight * dist_score + self._conf_weight * conf + else: + score = conf # distance unknown — use confidence only + + if score > best_score: + best_score = score + best_obj = obj + + if best_obj is None: + return None + + # Extract bbox centre in pixel coordinates + cx = best_obj.bbox.center.position.x + cy = best_obj.bbox.center.position.y + area = best_obj.bbox.size_x * best_obj.bbox.size_y + + return Target( + track_id=0, # DetectedObject has no track_id; use 0 + center_x=cx, + center_y=cy, + confidence=best_obj.confidence, + distance_m=best_obj.distance_m, + bbox_area=area, + ) + + # ── Control loop ────────────────────────────────────────────────────────── + + def _control_loop(self) -> None: + """20 Hz PID update and gimbal command publish.""" + now = self.get_clock().now().nanoseconds * 1e-9 + dt = now - self._prev_time + self._prev_time = now + + if dt <= 0.0: + return + + time_since_seen = now - self._last_seen + + # ── State transitions ────────────────────────────────────────────── + if self._state == TrackState.TRACKING: + if time_since_seen > (2.0 / self._control_hz): + # No new detection in the last ~2 control cycles → lost + self._state = TrackState.LOST + self._last_seen = now # reset hold timer + self.get_logger().info("Target lost — holding position") + + elif self._state == TrackState.LOST: + if time_since_seen >= self._hold_duration: + self._state = TrackState.CENTERING + self._pid_pan.reset() + self._pid_tilt.reset() + self.get_logger().info("Hold expired — centering") + + # ── PID update ──────────────────────────────────────────────────── + error_pan_px = 0.0 + error_tilt_px = 0.0 + confidence = 0.0 + + if self._state == TrackState.TRACKING and self._target is not None: + cx = self._target.center_x + cy = self._target.center_y + confidence = self._target.confidence + + # Pixel error: positive = target right of centre (→ pan right = negative pan) + error_pan_px = cx - self._img_w / 2.0 + error_tilt_px = cy - self._img_h / 2.0 + + # Dead-zone: don't chase tiny errors + if abs(error_pan_px) < self._dead_zone: + error_pan_px = 0.0 + if abs(error_tilt_px) < self._dead_zone: + error_tilt_px = 0.0 + + # Convert pixel error → angle error (degrees) + # px_per_deg = image_dim / fov_deg + err_pan_deg = error_pan_px / (self._img_w / self._fov_h) + err_tilt_deg = error_tilt_px / (self._img_h / self._fov_v) + + # PID output: delta angle to apply this timestep + # Pan: positive error (target right) → decrease pan (turn right) + # Tilt: positive error (target below) → decrease tilt (look down) + pan_delta = self._pid_pan.update(-err_pan_deg, dt) + tilt_delta = self._pid_tilt.update(-err_tilt_deg, dt) + + pan_min = self.get_parameter("pan_min_deg").value + pan_max = self.get_parameter("pan_max_deg").value + tilt_min = self.get_parameter("tilt_min_deg").value + tilt_max = self.get_parameter("tilt_max_deg").value + + self._pan_deg = max(pan_min, min(pan_max, self._pan_deg + pan_delta)) + self._tilt_deg = max(tilt_min, min(tilt_max, self._tilt_deg + tilt_delta)) + + elif self._state == TrackState.CENTERING: + # Drive angles back toward zero at bounded speed + center_step = self._center_speed * dt + + if abs(self._pan_deg) < center_step: + self._pan_deg = 0.0 + else: + self._pan_deg -= math.copysign(center_step, self._pan_deg) + + if abs(self._tilt_deg) < center_step: + self._tilt_deg = 0.0 + else: + self._tilt_deg -= math.copysign(center_step, self._tilt_deg) + + if self._pan_deg == 0.0 and self._tilt_deg == 0.0: + self._state = TrackState.IDLE + self.get_logger().info("Centered — IDLE") + + # ── Publish gimbal command ───────────────────────────────────────── + cmd = Point() + cmd.x = self._pan_deg + cmd.y = self._tilt_deg + cmd.z = confidence + self._pub_cmd.publish(cmd) + + # ── Publish state ───────────────────────────────────────────────── + state_data = { + "state": self._state.name, + "pan_deg": round(self._pan_deg, 2), + "tilt_deg": round(self._tilt_deg, 2), + "error_pan_px": round(error_pan_px, 1), + "error_tilt_px": round(error_tilt_px, 1), + "confidence": round(confidence, 3), + "distance_m": round(self._target.distance_m, 2) if self._target else 0.0, + "time_since_seen_s": round(now - self._last_seen, 2), + } + self._pub_state.publish(String(data=json.dumps(state_data))) + + +# ── Entry point ─────────────────────────────────────────────────────────────── + +def main(args=None) -> None: + rclpy.init(args=args) + node = HeadTrackingNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == "__main__": + main() diff --git a/jetson/ros2_ws/src/saltybot_head_tracking/setup.cfg b/jetson/ros2_ws/src/saltybot_head_tracking/setup.cfg new file mode 100644 index 0000000..c2ff741 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_head_tracking/setup.cfg @@ -0,0 +1,5 @@ +[develop] +script_dir=$base/lib/saltybot_head_tracking + +[install] +install_scripts=$base/lib/saltybot_head_tracking diff --git a/jetson/ros2_ws/src/saltybot_head_tracking/setup.py b/jetson/ros2_ws/src/saltybot_head_tracking/setup.py new file mode 100644 index 0000000..c70eabd --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_head_tracking/setup.py @@ -0,0 +1,32 @@ +from setuptools import setup, find_packages + +package_name = 'saltybot_head_tracking' + +setup( + name=package_name, + version='0.1.0', + packages=find_packages(exclude=['test']), + data_files=[ + ('share/ament_index/resource_index/packages', + ['resource/' + package_name]), + ('share/' + package_name, ['package.xml']), + ('share/' + package_name + '/launch', [ + 'launch/head_tracking.launch.py', + ]), + ('share/' + package_name + '/config', [ + 'config/head_tracking_params.yaml', + ]), + ], + install_requires=['setuptools'], + zip_safe=True, + maintainer='sl-perception', + maintainer_email='sl-perception@saltylab.local', + description='Person-following head tracking with PID control (Issue #549)', + license='MIT', + tests_require=['pytest'], + entry_points={ + 'console_scripts': [ + 'head_tracking = saltybot_head_tracking.head_tracking_node:main', + ], + }, +)