Merge pull request 'feat: Person-following head tracking (Issue #549)' (#555) from sl-perception/issue-549-head-tracking into main

This commit is contained in:
sl-jetson 2026-03-14 11:36:29 -04:00
commit 8e21201dd4
8 changed files with 561 additions and 0 deletions

View File

@ -0,0 +1,36 @@
head_tracking:
ros__parameters:
# ── Camera (Intel RealSense D435i) ──────────────────────────────────────
image_width: 848 # pixels
image_height: 480 # pixels
fov_h_deg: 87.0 # horizontal field of view (degrees)
fov_v_deg: 58.0 # vertical field of view (degrees)
# ── PID gains — pan axis ────────────────────────────────────────────────
# Output: deg/step. Start conservative; increase kp for faster tracking.
pan_kp: 0.08 # proportional gain (deg per deg-of-error per step)
pan_ki: 0.002 # integral gain
pan_kd: 0.015 # derivative gain
# ── PID gains — tilt axis ───────────────────────────────────────────────
tilt_kp: 0.08
tilt_ki: 0.002
tilt_kd: 0.015
# ── Servo limits (degrees from centre) ──────────────────────────────────
pan_min_deg: -60.0 # max left
pan_max_deg: 60.0 # max right
tilt_min_deg: -30.0 # max down
tilt_max_deg: 30.0 # max up
# ── Tracking behaviour ──────────────────────────────────────────────────
hold_duration_s: 3.0 # hold last position this long after target loss
center_speed_deg_s: 20.0 # degrees/sec to return to centre after hold
dead_zone_px: 10.0 # pixel error smaller than this is treated as zero
control_hz: 20.0 # PID loop rate
# ── Target selection ────────────────────────────────────────────────────
# score = distance_weight * (1/(1+dist_m)) + confidence_weight * confidence
# Highest score wins. If distance_m == 0 (unknown), confidence only.
distance_weight: 0.6
confidence_weight: 0.4

View File

@ -0,0 +1,35 @@
"""Launch file for saltybot_head_tracking (Issue #549)."""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch_ros.actions import Node
def generate_launch_description() -> LaunchDescription:
config = os.path.join(
get_package_share_directory("saltybot_head_tracking"),
"config",
"head_tracking_params.yaml",
)
head_tracking_node = Node(
package="saltybot_head_tracking",
executable="head_tracking",
name="head_tracking",
parameters=[config],
remappings=[
# Remap if object detection publishes on a different topic
# ("/saltybot/objects", "/saltybot/objects"),
],
output="screen",
)
# Note: pair with saltybot_pan_tilt node.
# pan_tilt_node subscribes to /saltybot/target_track (Point, pixel coords).
# head_tracking publishes to /saltybot/gimbal/cmd (Point, angle degrees).
# To bridge, remap pan_tilt's subscription:
# remappings=[("/saltybot/target_track", "/saltybot/gimbal/cmd")]
# in the pan_tilt launch — or update pan_tilt to accept angle setpoints.
return LaunchDescription([head_tracking_node])

View File

@ -0,0 +1,31 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_head_tracking</name>
<version>0.1.0</version>
<description>
Person-following head tracking (Issue #549).
Subscribes to YOLOv8n person detections, selects the best target
(closest / highest confidence), runs dual-axis PID controllers, and
publishes pan/tilt angle setpoints to /saltybot/gimbal/cmd.
Lost-target behaviour: hold 3 s then centre.
</description>
<maintainer email="sl-perception@saltylab.local">sl-perception</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>geometry_msgs</depend>
<depend>std_msgs</depend>
<depend>saltybot_object_detection_msgs</depend>
<buildtool_depend>ament_python</buildtool_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,422 @@
#!/usr/bin/env python3
"""Person-following head tracking node (Issue #549).
Subscribes to person detection bounding boxes, selects the best target
(closest / highest confidence), runs independent PID controllers for pan
and tilt axes, and publishes angle setpoints to /saltybot/gimbal/cmd.
Subscribed topics:
/saltybot/objects (saltybot_object_detection_msgs/DetectedObjectArray)
YOLOv8n detections; this node filters for class_id==0 (person).
Published topics:
/saltybot/gimbal/cmd (geometry_msgs/Point)
x = desired pan angle (deg, +left / right from centre)
y = desired tilt angle (deg, +up / down from centre)
z = tracking confidence (0.0 = no target / centering)
/saltybot/head_tracking/state (std_msgs/String)
JSON status string: state, target_id, pan_deg, tilt_deg, error_pan_px,
error_tilt_px, confidence, distance_m
State machine:
IDLE waiting for first detection
TRACKING person visible, PID active
LOST person gone; hold last angle for hold_duration_s seconds
CENTERING returning to (0°, 0°) after hold expires
"""
import json
import math
from dataclasses import dataclass, field
from enum import Enum, auto
from typing import Optional
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from geometry_msgs.msg import Point
from std_msgs.msg import String
from saltybot_object_detection_msgs.msg import DetectedObject, DetectedObjectArray
# ── PID controller ────────────────────────────────────────────────────────────
class PIDController:
"""Simple discrete-time PID with anti-windup integral clamp."""
def __init__(
self,
kp: float,
ki: float,
kd: float,
min_output: float,
max_output: float,
integral_clamp: float = 30.0,
) -> None:
self.kp = kp
self.ki = ki
self.kd = kd
self.min_output = min_output
self.max_output = max_output
self.integral_clamp = integral_clamp
self._integral: float = 0.0
self._prev_error: float = 0.0
def update(self, error: float, dt: float) -> float:
"""Compute PID output for the given error and timestep."""
if dt <= 0.0:
return 0.0
self._integral += error * dt
# Anti-windup: clamp integral term
self._integral = max(
-self.integral_clamp, min(self.integral_clamp, self._integral)
)
derivative = (error - self._prev_error) / dt
self._prev_error = error
output = (
self.kp * error
+ self.ki * self._integral
+ self.kd * derivative
)
return max(self.min_output, min(self.max_output, output))
def reset(self) -> None:
self._integral = 0.0
self._prev_error = 0.0
# ── Target dataclass ──────────────────────────────────────────────────────────
@dataclass
class Target:
"""Best-candidate person in the current detection frame."""
track_id: int = -1
center_x: float = 0.0 # bbox centre, pixels
center_y: float = 0.0
confidence: float = 0.0
distance_m: float = 0.0 # 0 = unknown
bbox_area: float = 0.0
stamp: float = 0.0 # rclpy.clock time (seconds)
# ── State machine ─────────────────────────────────────────────────────────────
class TrackState(Enum):
IDLE = auto()
TRACKING = auto()
LOST = auto()
CENTERING = auto()
# ── Node ──────────────────────────────────────────────────────────────────────
class HeadTrackingNode(Node):
"""Person-following head tracking with PID control."""
_PERSON_CLASS_ID = 0 # COCO class 0 = "person"
def __init__(self) -> None:
super().__init__("head_tracking")
# ── Parameters ────────────────────────────────────────────────────────
self.declare_parameter("image_width", 848) # D435i default
self.declare_parameter("image_height", 480)
self.declare_parameter("fov_h_deg", 87.0) # D435i horizontal FOV
self.declare_parameter("fov_v_deg", 58.0) # D435i vertical FOV
# PID gains — pan axis
self.declare_parameter("pan_kp", 0.08) # deg / px
self.declare_parameter("pan_ki", 0.002)
self.declare_parameter("pan_kd", 0.015)
# PID gains — tilt axis
self.declare_parameter("tilt_kp", 0.08)
self.declare_parameter("tilt_ki", 0.002)
self.declare_parameter("tilt_kd", 0.015)
# Servo limits
self.declare_parameter("pan_min_deg", -60.0)
self.declare_parameter("pan_max_deg", 60.0)
self.declare_parameter("tilt_min_deg", -30.0)
self.declare_parameter("tilt_max_deg", 30.0)
# Tracking behaviour
self.declare_parameter("hold_duration_s", 3.0) # hold after loss
self.declare_parameter("center_speed_deg_s", 20.0) # centering speed
self.declare_parameter("dead_zone_px", 10.0) # ignore small errors
self.declare_parameter("control_hz", 20.0) # PID loop rate
# Target selection weights
self.declare_parameter("distance_weight", 0.6) # vs confidence
self.declare_parameter("confidence_weight", 0.4)
self._img_w = self.get_parameter("image_width").value
self._img_h = self.get_parameter("image_height").value
self._fov_h = self.get_parameter("fov_h_deg").value
self._fov_v = self.get_parameter("fov_v_deg").value
pan_kp = self.get_parameter("pan_kp").value
pan_ki = self.get_parameter("pan_ki").value
pan_kd = self.get_parameter("pan_kd").value
tilt_kp = self.get_parameter("tilt_kp").value
tilt_ki = self.get_parameter("tilt_ki").value
tilt_kd = self.get_parameter("tilt_kd").value
pan_min = self.get_parameter("pan_min_deg").value
pan_max = self.get_parameter("pan_max_deg").value
tilt_min = self.get_parameter("tilt_min_deg").value
tilt_max = self.get_parameter("tilt_max_deg").value
self._hold_duration = self.get_parameter("hold_duration_s").value
self._center_speed = self.get_parameter("center_speed_deg_s").value
self._dead_zone = self.get_parameter("dead_zone_px").value
self._control_hz = self.get_parameter("control_hz").value
self._dist_weight = self.get_parameter("distance_weight").value
self._conf_weight = self.get_parameter("confidence_weight").value
# ── PID controllers ───────────────────────────────────────────────────
self._pid_pan = PIDController(pan_kp, pan_ki, pan_kd, pan_min, pan_max)
self._pid_tilt = PIDController(tilt_kp, tilt_ki, tilt_kd, tilt_min, tilt_max)
# ── State ─────────────────────────────────────────────────────────────
self._state: TrackState = TrackState.IDLE
self._target: Optional[Target] = None
self._last_seen: float = 0.0 # wall-clock time of last detection
# Desired angle setpoint (absolute, degrees from centre)
self._pan_deg: float = 0.0
self._tilt_deg: float = 0.0
# ── Subscriptions ─────────────────────────────────────────────────────
sensor_qos = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=1,
)
self._sub_objects = self.create_subscription(
DetectedObjectArray,
"/saltybot/objects",
self._on_detections,
sensor_qos,
)
# ── Publishers ────────────────────────────────────────────────────────
self._pub_cmd = self.create_publisher(
Point, "/saltybot/gimbal/cmd", 10
)
self._pub_state = self.create_publisher(
String, "/saltybot/head_tracking/state", 10
)
# ── Control loop timer ────────────────────────────────────────────────
dt = 1.0 / self._control_hz
self._prev_time: float = self.get_clock().now().nanoseconds * 1e-9
self.create_timer(dt, self._control_loop)
self.get_logger().info(
f"HeadTrackingNode ready — "
f"image={self._img_w}×{self._img_h} "
f"FOV={self._fov_h:.0f}°×{self._fov_v:.0f}° "
f"hold={self._hold_duration:.1f}s "
f"rate={self._control_hz:.0f}Hz"
)
# ── Detection callback ────────────────────────────────────────────────────
def _on_detections(self, msg: DetectedObjectArray) -> None:
"""Process incoming object detections and select best person target."""
persons = [
obj for obj in msg.objects
if obj.class_id == self._PERSON_CLASS_ID
]
if not persons:
return
best = self._select_target(persons)
if best is None:
return
now = self.get_clock().now().nanoseconds * 1e-9
self._target = best
self._target.stamp = now
self._last_seen = now
if self._state in (TrackState.IDLE, TrackState.LOST, TrackState.CENTERING):
self._state = TrackState.TRACKING
# Don't reset PIDs on re-acquisition — avoids jerk
self.get_logger().info(
f"Target acquired: id={best.track_id} "
f"conf={best.confidence:.2f} dist={best.distance_m:.1f}m"
)
def _select_target(self, persons: list) -> Optional[Target]:
"""Select highest-priority person: closest first, then confidence.
Scoring:
score = dist_weight * dist_score + conf_weight * confidence
dist_score = 1 / (1 + distance_m) higher = closer
If distance_m == 0 (unknown), use confidence only.
"""
best_score = -1.0
best_obj: Optional[DetectedObject] = None
for obj in persons:
dist_m = obj.distance_m
conf = obj.confidence
if dist_m > 0.0:
dist_score = 1.0 / (1.0 + dist_m)
score = self._dist_weight * dist_score + self._conf_weight * conf
else:
score = conf # distance unknown — use confidence only
if score > best_score:
best_score = score
best_obj = obj
if best_obj is None:
return None
# Extract bbox centre in pixel coordinates
cx = best_obj.bbox.center.position.x
cy = best_obj.bbox.center.position.y
area = best_obj.bbox.size_x * best_obj.bbox.size_y
return Target(
track_id=0, # DetectedObject has no track_id; use 0
center_x=cx,
center_y=cy,
confidence=best_obj.confidence,
distance_m=best_obj.distance_m,
bbox_area=area,
)
# ── Control loop ──────────────────────────────────────────────────────────
def _control_loop(self) -> None:
"""20 Hz PID update and gimbal command publish."""
now = self.get_clock().now().nanoseconds * 1e-9
dt = now - self._prev_time
self._prev_time = now
if dt <= 0.0:
return
time_since_seen = now - self._last_seen
# ── State transitions ──────────────────────────────────────────────
if self._state == TrackState.TRACKING:
if time_since_seen > (2.0 / self._control_hz):
# No new detection in the last ~2 control cycles → lost
self._state = TrackState.LOST
self._last_seen = now # reset hold timer
self.get_logger().info("Target lost — holding position")
elif self._state == TrackState.LOST:
if time_since_seen >= self._hold_duration:
self._state = TrackState.CENTERING
self._pid_pan.reset()
self._pid_tilt.reset()
self.get_logger().info("Hold expired — centering")
# ── PID update ────────────────────────────────────────────────────
error_pan_px = 0.0
error_tilt_px = 0.0
confidence = 0.0
if self._state == TrackState.TRACKING and self._target is not None:
cx = self._target.center_x
cy = self._target.center_y
confidence = self._target.confidence
# Pixel error: positive = target right of centre (→ pan right = negative pan)
error_pan_px = cx - self._img_w / 2.0
error_tilt_px = cy - self._img_h / 2.0
# Dead-zone: don't chase tiny errors
if abs(error_pan_px) < self._dead_zone:
error_pan_px = 0.0
if abs(error_tilt_px) < self._dead_zone:
error_tilt_px = 0.0
# Convert pixel error → angle error (degrees)
# px_per_deg = image_dim / fov_deg
err_pan_deg = error_pan_px / (self._img_w / self._fov_h)
err_tilt_deg = error_tilt_px / (self._img_h / self._fov_v)
# PID output: delta angle to apply this timestep
# Pan: positive error (target right) → decrease pan (turn right)
# Tilt: positive error (target below) → decrease tilt (look down)
pan_delta = self._pid_pan.update(-err_pan_deg, dt)
tilt_delta = self._pid_tilt.update(-err_tilt_deg, dt)
pan_min = self.get_parameter("pan_min_deg").value
pan_max = self.get_parameter("pan_max_deg").value
tilt_min = self.get_parameter("tilt_min_deg").value
tilt_max = self.get_parameter("tilt_max_deg").value
self._pan_deg = max(pan_min, min(pan_max, self._pan_deg + pan_delta))
self._tilt_deg = max(tilt_min, min(tilt_max, self._tilt_deg + tilt_delta))
elif self._state == TrackState.CENTERING:
# Drive angles back toward zero at bounded speed
center_step = self._center_speed * dt
if abs(self._pan_deg) < center_step:
self._pan_deg = 0.0
else:
self._pan_deg -= math.copysign(center_step, self._pan_deg)
if abs(self._tilt_deg) < center_step:
self._tilt_deg = 0.0
else:
self._tilt_deg -= math.copysign(center_step, self._tilt_deg)
if self._pan_deg == 0.0 and self._tilt_deg == 0.0:
self._state = TrackState.IDLE
self.get_logger().info("Centered — IDLE")
# ── Publish gimbal command ─────────────────────────────────────────
cmd = Point()
cmd.x = self._pan_deg
cmd.y = self._tilt_deg
cmd.z = confidence
self._pub_cmd.publish(cmd)
# ── Publish state ─────────────────────────────────────────────────
state_data = {
"state": self._state.name,
"pan_deg": round(self._pan_deg, 2),
"tilt_deg": round(self._tilt_deg, 2),
"error_pan_px": round(error_pan_px, 1),
"error_tilt_px": round(error_tilt_px, 1),
"confidence": round(confidence, 3),
"distance_m": round(self._target.distance_m, 2) if self._target else 0.0,
"time_since_seen_s": round(now - self._last_seen, 2),
}
self._pub_state.publish(String(data=json.dumps(state_data)))
# ── Entry point ───────────────────────────────────────────────────────────────
def main(args=None) -> None:
rclpy.init(args=args)
node = HeadTrackingNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,5 @@
[develop]
script_dir=$base/lib/saltybot_head_tracking
[install]
install_scripts=$base/lib/saltybot_head_tracking

View File

@ -0,0 +1,32 @@
from setuptools import setup, find_packages
package_name = 'saltybot_head_tracking'
setup(
name=package_name,
version='0.1.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
('share/' + package_name + '/launch', [
'launch/head_tracking.launch.py',
]),
('share/' + package_name + '/config', [
'config/head_tracking_params.yaml',
]),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='sl-perception',
maintainer_email='sl-perception@saltylab.local',
description='Person-following head tracking with PID control (Issue #549)',
license='MIT',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'head_tracking = saltybot_head_tracking.head_tracking_node:main',
],
},
)