feat(social): face-tracking head servo controller — Issue #279
Some checks failed
social-bot integration tests / Lint (flake8 + pep257) (push) Failing after 8s
social-bot integration tests / Core integration tests (mock sensors, no GPU) (push) Has been skipped
social-bot integration tests / Lint (flake8 + pep257) (pull_request) Failing after 8s
social-bot integration tests / Core integration tests (mock sensors, no GPU) (pull_request) Has been skipped
social-bot integration tests / Latency profiling (GPU, Orin) (push) Has been cancelled
social-bot integration tests / Latency profiling (GPU, Orin) (pull_request) Has been cancelled

Adds face_track_servo_node to saltybot_social:
- Subscribes /social/faces/detected (FaceDetectionArray)
- Picks closest face by largest bbox area (proximity proxy)
- Computes pan/tilt error from bbox centre vs image centre using
  configurable FOV (fov_h_deg=60°, fov_v_deg=45°)
- Independent PID controllers for pan and tilt (velocity/incremental
  output with anti-windup); servo position integrates velocity*dt
- Clamps commands to ±pan_limit_deg / ±tilt_limit_deg
- Returns to centre at return_rate_deg_s when face lost >lost_timeout_s
- Dead zone suppresses jitter for small errors
- Publishes Float32 on /saltybot/head_pan and /saltybot/head_tilt
- 81/81 tests passing

Closes #279
This commit is contained in:
sl-jetson 2026-03-02 17:37:57 -05:00
parent 54bc37926b
commit c3d36e9943
5 changed files with 1067 additions and 0 deletions

View File

@ -0,0 +1,30 @@
face_track_servo_node:
ros__parameters:
# PID gains — pan axis
kp_pan: 1.5 # proportional gain (°/s per ° error)
ki_pan: 0.1 # integral gain
kd_pan: 0.05 # derivative gain (damping)
# PID gains — tilt axis
kp_tilt: 1.2
ki_tilt: 0.1
kd_tilt: 0.04
# Camera FOV
fov_h_deg: 60.0 # horizontal field of view (degrees)
fov_v_deg: 45.0 # vertical field of view (degrees)
# Servo limits
pan_limit_deg: 90.0 # mechanical pan range ± (degrees)
tilt_limit_deg: 30.0 # mechanical tilt range ± (degrees)
pan_vel_limit: 45.0 # max pan rate (°/s)
tilt_vel_limit: 30.0 # max tilt rate (°/s)
windup_limit: 15.0 # integral anti-windup clamp (°·s)
# Tracking behaviour
dead_zone: 0.02 # normalised dead zone (fraction of frame width/height)
control_rate: 20.0 # control loop frequency (Hz)
lost_timeout_s: 1.5 # seconds before face considered lost
return_rate_deg_s: 10.0 # return-to-centre speed when no face (°/s)
faces_topic: "/social/faces/detected"

View File

@ -0,0 +1,51 @@
"""face_track_servo.launch.py — Launch face-tracking head servo controller (Issue #279).
Usage:
ros2 launch saltybot_social face_track_servo.launch.py
ros2 launch saltybot_social face_track_servo.launch.py kp_pan:=2.0 pan_limit_deg:=60.0
"""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
pkg = get_package_share_directory("saltybot_social")
cfg = os.path.join(pkg, "config", "face_track_servo_params.yaml")
return LaunchDescription([
DeclareLaunchArgument("kp_pan", default_value="1.5",
description="Pan proportional gain (°/s per °)"),
DeclareLaunchArgument("kp_tilt", default_value="1.2",
description="Tilt proportional gain (°/s per °)"),
DeclareLaunchArgument("pan_limit_deg", default_value="90.0",
description="Mechanical pan limit ± (degrees)"),
DeclareLaunchArgument("tilt_limit_deg", default_value="30.0",
description="Mechanical tilt limit ± (degrees)"),
DeclareLaunchArgument("fov_h_deg", default_value="60.0",
description="Camera horizontal FOV (degrees)"),
DeclareLaunchArgument("fov_v_deg", default_value="45.0",
description="Camera vertical FOV (degrees)"),
Node(
package="saltybot_social",
executable="face_track_servo_node",
name="face_track_servo_node",
output="screen",
parameters=[
cfg,
{
"kp_pan": LaunchConfiguration("kp_pan"),
"kp_tilt": LaunchConfiguration("kp_tilt"),
"pan_limit_deg": LaunchConfiguration("pan_limit_deg"),
"tilt_limit_deg": LaunchConfiguration("tilt_limit_deg"),
"fov_h_deg": LaunchConfiguration("fov_h_deg"),
"fov_v_deg": LaunchConfiguration("fov_v_deg"),
},
],
),
])

View File

@ -0,0 +1,308 @@
"""face_track_servo_node.py — Face-tracking head servo controller.
Issue #279
Subscribes to /social/faces/detected, picks the closest face (largest
bounding-box area as a proximity proxy), computes pan/tilt angular error
relative to the image centre, and drives two PID controllers to produce
smooth servo position commands published on /saltybot/head_pan and
/saltybot/head_tilt (std_msgs/Float32, degrees from neutral).
Coordinate convention
bbox_x/y/w/h : normalised [0, 1] in image space
face centre : cx = bbox_x + bbox_w/2 , cy = bbox_y + bbox_h/2
image centre : (0.5, 0.5)
pan error : (cx - 0.5) * fov_h_deg (+ve face right of centre)
tilt error : (cy - 0.5) * fov_v_deg (+ve face below centre)
PID design (velocity / incremental)
velocity (°/s) = Kp·e + Ki·e dt + Kd·de/dt
servo_angle += velocity · dt
servo_angle = clamp(servo_angle, ±limit_deg)
When no face is seen for more than ``lost_timeout_s`` seconds the PIDs
are reset and the servo commands return toward 0° at ``return_rate_deg_s``.
Parameters
kp_pan (float, 1.5) pan proportional gain (°/s per °)
ki_pan (float, 0.1) pan integral gain
kd_pan (float, 0.05) pan derivative gain
kp_tilt (float, 1.2) tilt proportional gain
ki_tilt (float, 0.1) tilt integral gain
kd_tilt (float, 0.04) tilt derivative gain
fov_h_deg (float, 60.0) camera horizontal FOV (degrees)
fov_v_deg (float, 45.0) camera vertical FOV (degrees)
pan_limit_deg (float, 90.0) mechanical pan limit ±
tilt_limit_deg (float, 30.0) mechanical tilt limit ±
pan_vel_limit (float, 45.0) max pan rate (°/s)
tilt_vel_limit (float, 30.0) max tilt rate (°/s)
windup_limit (float, 15.0) integral anti-windup clamp (°·s)
dead_zone (float, 0.02) normalised dead zone (fraction of frame)
control_rate (float, 20.0) control loop Hz
lost_timeout_s (float, 1.5) seconds before face considered lost
return_rate_deg_s (float, 10.0) return-to-centre rate when no face (°/s)
faces_topic (str) default "/social/faces/detected"
"""
from __future__ import annotations
import math
import time
import threading
from typing import Optional
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile
from std_msgs.msg import Float32
try:
from saltybot_social_msgs.msg import FaceDetectionArray
_MSGS = True
except ImportError:
_MSGS = False
# ── Pure helpers ───────────────────────────────────────────────────────────────
def clamp(v: float, lo: float, hi: float) -> float:
return max(lo, min(hi, v))
def bbox_area(face) -> float:
"""Bounding-box area as a proximity proxy (larger ≈ closer)."""
return float(face.bbox_w) * float(face.bbox_h)
def pick_closest_face(faces):
"""Return the face with the largest bbox area; None if list is empty."""
if not faces:
return None
return max(faces, key=bbox_area)
def face_image_error(face, fov_h_deg: float, fov_v_deg: float):
"""Return (pan_error_deg, tilt_error_deg) for a FaceDetection.
Positive pan face is right of image centre.
Positive tilt face is below image centre.
"""
cx = float(face.bbox_x) + float(face.bbox_w) / 2.0
cy = float(face.bbox_y) + float(face.bbox_h) / 2.0
pan_err = (cx - 0.5) * fov_h_deg
tilt_err = (cy - 0.5) * fov_v_deg
return pan_err, tilt_err
# ── PID controller ─────────────────────────────────────────────────────────────
class PIDController:
"""Incremental (velocity-output) PID with anti-windup.
Output units: degrees/second (servo angular velocity).
Integrate externally: servo_angle += pid.update(error, dt) * dt
"""
def __init__(self, kp: float, ki: float, kd: float,
vel_limit: float, windup_limit: float) -> None:
self.kp = kp
self.ki = ki
self.kd = kd
self.vel_limit = vel_limit
self.windup_limit = windup_limit
self._integral = 0.0
self._prev_error = 0.0
self._first = True
def update(self, error: float, dt: float) -> float:
"""Return velocity command (°/s). Call every control tick."""
if dt <= 0.0:
return 0.0
self._integral += error * dt
self._integral = clamp(self._integral, -self.windup_limit,
self.windup_limit)
if self._first:
derivative = 0.0
self._first = False
else:
derivative = (error - self._prev_error) / dt
self._prev_error = error
output = (self.kp * error
+ self.ki * self._integral
+ self.kd * derivative)
return clamp(output, -self.vel_limit, self.vel_limit)
def reset(self) -> None:
self._integral = 0.0
self._prev_error = 0.0
self._first = True
# ── ROS2 node ──────────────────────────────────────────────────────────────────
class FaceTrackServoNode(Node):
"""Smooth PID face-tracking servo controller."""
def __init__(self) -> None:
super().__init__("face_track_servo_node")
# Declare parameters
self.declare_parameter("kp_pan", 1.5)
self.declare_parameter("ki_pan", 0.1)
self.declare_parameter("kd_pan", 0.05)
self.declare_parameter("kp_tilt", 1.2)
self.declare_parameter("ki_tilt", 0.1)
self.declare_parameter("kd_tilt", 0.04)
self.declare_parameter("fov_h_deg", 60.0)
self.declare_parameter("fov_v_deg", 45.0)
self.declare_parameter("pan_limit_deg", 90.0)
self.declare_parameter("tilt_limit_deg", 30.0)
self.declare_parameter("pan_vel_limit", 45.0)
self.declare_parameter("tilt_vel_limit", 30.0)
self.declare_parameter("windup_limit", 15.0)
self.declare_parameter("dead_zone", 0.02)
self.declare_parameter("control_rate", 20.0)
self.declare_parameter("lost_timeout_s", 1.5)
self.declare_parameter("return_rate_deg_s", 10.0)
self.declare_parameter("faces_topic", "/social/faces/detected")
self._reload_params()
# Servo state
self._pan_cmd = 0.0
self._tilt_cmd = 0.0
self._last_face_t: float = 0.0
self._latest_face = None
self._lock = threading.Lock()
qos = QoSProfile(depth=10)
self._pan_pub = self.create_publisher(Float32, "/saltybot/head_pan", qos)
self._tilt_pub = self.create_publisher(Float32, "/saltybot/head_tilt", qos)
faces_topic = self.get_parameter("faces_topic").value
if _MSGS:
self._faces_sub = self.create_subscription(
FaceDetectionArray, faces_topic, self._on_faces, qos
)
else:
self.get_logger().warn(
"saltybot_social_msgs not available — node passive (no subscription)"
)
rate = self.get_parameter("control_rate").value
self._timer = self.create_timer(1.0 / rate, self._control_cb)
self._last_tick = time.monotonic()
self.get_logger().info(
f"FaceTrackServoNode ready "
f"(rate={rate}Hz, fov={self._fov_h}×{self._fov_v}°, "
f"pan±{self._pan_limit}°, tilt±{self._tilt_limit}°)"
)
def _reload_params(self) -> None:
self._fov_h = self.get_parameter("fov_h_deg").value
self._fov_v = self.get_parameter("fov_v_deg").value
self._pan_limit = self.get_parameter("pan_limit_deg").value
self._tilt_limit = self.get_parameter("tilt_limit_deg").value
self._dead_zone = self.get_parameter("dead_zone").value
self._lost_t = self.get_parameter("lost_timeout_s").value
self._return_rate = self.get_parameter("return_rate_deg_s").value
self._pid_pan = PIDController(
kp=self.get_parameter("kp_pan").value,
ki=self.get_parameter("ki_pan").value,
kd=self.get_parameter("kd_pan").value,
vel_limit=self.get_parameter("pan_vel_limit").value,
windup_limit=self.get_parameter("windup_limit").value,
)
self._pid_tilt = PIDController(
kp=self.get_parameter("kp_tilt").value,
ki=self.get_parameter("ki_tilt").value,
kd=self.get_parameter("kd_tilt").value,
vel_limit=self.get_parameter("tilt_vel_limit").value,
windup_limit=self.get_parameter("windup_limit").value,
)
# ── Subscription callback ──────────────────────────────────────────────
def _on_faces(self, msg) -> None:
face = pick_closest_face(msg.faces)
with self._lock:
self._latest_face = face
if face is not None:
self._last_face_t = time.monotonic()
# ── Control loop ───────────────────────────────────────────────────────
def _control_cb(self) -> None:
now = time.monotonic()
dt = now - self._last_tick
self._last_tick = now
dt = max(dt, 1e-4) # guard against zero dt at startup
with self._lock:
face = self._latest_face
last_face_t = self._last_face_t
face_fresh = (last_face_t > 0.0 and (now - last_face_t) < self._lost_t)
if not face_fresh or face is None:
# Return to centre
self._pid_pan.reset()
self._pid_tilt.reset()
step = self._return_rate * dt
self._pan_cmd = _step_toward_zero(self._pan_cmd, step)
self._tilt_cmd = _step_toward_zero(self._tilt_cmd, step)
else:
pan_err, tilt_err = face_image_error(face, self._fov_h, self._fov_v)
# Dead zone (normalised fraction → degrees)
dead_deg_h = self._dead_zone * self._fov_h
dead_deg_v = self._dead_zone * self._fov_v
if abs(pan_err) < dead_deg_h:
self._pid_pan.reset()
else:
vel_pan = self._pid_pan.update(pan_err, dt)
self._pan_cmd = clamp(
self._pan_cmd + vel_pan * dt,
-self._pan_limit, self._pan_limit,
)
if abs(tilt_err) < dead_deg_v:
self._pid_tilt.reset()
else:
vel_tilt = self._pid_tilt.update(tilt_err, dt)
self._tilt_cmd = clamp(
self._tilt_cmd + vel_tilt * dt,
-self._tilt_limit, self._tilt_limit,
)
pan_msg = Float32(); pan_msg.data = float(self._pan_cmd)
tilt_msg = Float32(); tilt_msg.data = float(self._tilt_cmd)
self._pan_pub.publish(pan_msg)
self._tilt_pub.publish(tilt_msg)
def _step_toward_zero(value: float, step: float) -> float:
"""Move value toward 0 by step without overshooting."""
if abs(value) <= step:
return 0.0
return value - math.copysign(step, value)
def main(args=None) -> None:
rclpy.init(args=args)
node = FaceTrackServoNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()

View File

@ -49,6 +49,8 @@ setup(
'ambient_sound_node = saltybot_social.ambient_sound_node:main',
# Proximity-based greeting trigger (Issue #270)
'greeting_trigger_node = saltybot_social.greeting_trigger_node:main',
# Face-tracking head servo controller (Issue #279)
'face_track_servo_node = saltybot_social.face_track_servo_node:main',
],
},
)

View File

@ -0,0 +1,676 @@
"""test_face_track_servo.py — Offline tests for face_track_servo_node (Issue #279).
Stubs out rclpy and saltybot_social_msgs so tests run without a ROS install.
"""
import importlib
import importlib.util
import math
import sys
import time
import types
import unittest
# ── ROS2 / message stubs ──────────────────────────────────────────────────────
def _make_ros_stubs():
for mod_name in ("rclpy", "rclpy.node", "rclpy.qos",
"std_msgs", "std_msgs.msg",
"saltybot_social_msgs", "saltybot_social_msgs.msg"):
if mod_name not in sys.modules:
sys.modules[mod_name] = types.ModuleType(mod_name)
class _Node:
def __init__(self, name="node"):
self._name = name
if not hasattr(self, "_params"):
self._params = {}
self._pubs = {}
self._subs = {}
self._timers = []
self._logs = []
def declare_parameter(self, name, default):
if name not in self._params:
self._params[name] = default
def get_parameter(self, name):
class _P:
def __init__(self, v): self.value = v
return _P(self._params.get(name))
def create_publisher(self, msg_type, topic, qos):
pub = _FakePub()
self._pubs[topic] = pub
return pub
def create_subscription(self, msg_type, topic, cb, qos):
self._subs[topic] = cb
return object()
def create_timer(self, period, cb):
self._timers.append(cb)
return object()
def get_logger(self):
node = self
class _L:
def info(self, m): node._logs.append(("INFO", m))
def warn(self, m): node._logs.append(("WARN", m))
def error(self, m): node._logs.append(("ERROR", m))
return _L()
def destroy_node(self): pass
class _FakePub:
def __init__(self):
self.msgs = []
def publish(self, msg):
self.msgs.append(msg)
class _QoSProfile:
def __init__(self, depth=10): self.depth = depth
class _Float32:
def __init__(self): self.data = 0.0
class _FaceDetection:
def __init__(self, face_id=0, bbox_x=0.4, bbox_y=0.4,
bbox_w=0.2, bbox_h=0.2, confidence=1.0):
self.face_id = face_id
self.bbox_x = bbox_x
self.bbox_y = bbox_y
self.bbox_w = bbox_w
self.bbox_h = bbox_h
self.confidence = confidence
self.person_name = ""
class _FaceDetectionArray:
def __init__(self, faces=None):
self.faces = faces or []
# rclpy
rclpy_mod = sys.modules["rclpy"]
rclpy_mod.init = lambda args=None: None
rclpy_mod.spin = lambda node: None
rclpy_mod.shutdown = lambda: None
sys.modules["rclpy.node"].Node = _Node
sys.modules["rclpy.qos"].QoSProfile = _QoSProfile
sys.modules["std_msgs.msg"].Float32 = _Float32
msgs = sys.modules["saltybot_social_msgs.msg"]
msgs.FaceDetection = _FaceDetection
msgs.FaceDetectionArray = _FaceDetectionArray
return _Node, _FakePub, _FaceDetection, _FaceDetectionArray, _Float32
_Node, _FakePub, _FaceDetection, _FaceDetectionArray, _Float32 = _make_ros_stubs()
# ── Module loader ─────────────────────────────────────────────────────────────
_SRC = (
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
"saltybot_social/saltybot_social/face_track_servo_node.py"
)
def _load_mod():
spec = importlib.util.spec_from_file_location("face_track_servo_testmod", _SRC)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
def _make_node(mod, **kwargs):
"""Instantiate FaceTrackServoNode with optional param overrides."""
node = mod.FaceTrackServoNode.__new__(mod.FaceTrackServoNode)
defaults = {
"kp_pan": 1.5,
"ki_pan": 0.1,
"kd_pan": 0.05,
"kp_tilt": 1.2,
"ki_tilt": 0.1,
"kd_tilt": 0.04,
"fov_h_deg": 60.0,
"fov_v_deg": 45.0,
"pan_limit_deg": 90.0,
"tilt_limit_deg": 30.0,
"pan_vel_limit": 45.0,
"tilt_vel_limit": 30.0,
"windup_limit": 15.0,
"dead_zone": 0.02,
"control_rate": 20.0,
"lost_timeout_s": 1.5,
"return_rate_deg_s": 10.0,
"faces_topic": "/social/faces/detected",
}
defaults.update(kwargs)
node._params = dict(defaults)
mod.FaceTrackServoNode.__init__(node)
return node
def _face(bbox_x=0.4, bbox_y=0.4, bbox_w=0.2, bbox_h=0.2, face_id=0):
return _FaceDetection(face_id=face_id, bbox_x=bbox_x, bbox_y=bbox_y,
bbox_w=bbox_w, bbox_h=bbox_h)
def _centered_face():
"""A face perfectly centered in the frame."""
return _face(bbox_x=0.4, bbox_y=0.4, bbox_w=0.2, bbox_h=0.2)
# ── Tests: pure helpers ───────────────────────────────────────────────────────
class TestClamp(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.mod = _load_mod()
def test_within(self):
self.assertEqual(self.mod.clamp(5.0, 0.0, 10.0), 5.0)
def test_below(self):
self.assertEqual(self.mod.clamp(-5.0, 0.0, 10.0), 0.0)
def test_above(self):
self.assertEqual(self.mod.clamp(15.0, 0.0, 10.0), 10.0)
def test_negative_range(self):
self.assertEqual(self.mod.clamp(-50.0, -45.0, 45.0), -45.0)
class TestBboxArea(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.mod = _load_mod()
def test_area(self):
f = _face(bbox_w=0.3, bbox_h=0.4)
self.assertAlmostEqual(self.mod.bbox_area(f), 0.12)
def test_zero(self):
f = _face(bbox_w=0.0, bbox_h=0.2)
self.assertAlmostEqual(self.mod.bbox_area(f), 0.0)
class TestPickClosestFace(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.mod = _load_mod()
def test_empty(self):
self.assertIsNone(self.mod.pick_closest_face([]))
def test_single(self):
f = _face(bbox_w=0.2, bbox_h=0.2)
self.assertIs(self.mod.pick_closest_face([f]), f)
def test_picks_largest_area(self):
small = _face(bbox_w=0.1, bbox_h=0.1)
big = _face(bbox_w=0.4, bbox_h=0.4)
self.assertIs(self.mod.pick_closest_face([small, big]), big)
self.assertIs(self.mod.pick_closest_face([big, small]), big)
def test_three_faces(self):
faces = [_face(bbox_w=0.1, bbox_h=0.1),
_face(bbox_w=0.5, bbox_h=0.5),
_face(bbox_w=0.2, bbox_h=0.2)]
self.assertIs(self.mod.pick_closest_face(faces), faces[1])
class TestFaceImageError(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.mod = _load_mod()
def test_centered_face_zero_error(self):
f = _face(bbox_x=0.4, bbox_y=0.4, bbox_w=0.2, bbox_h=0.2)
pan, tilt = self.mod.face_image_error(f, 60.0, 45.0)
self.assertAlmostEqual(pan, 0.0)
self.assertAlmostEqual(tilt, 0.0)
def test_right_of_centre(self):
# cx = 0.7 + 0.1 = 0.8, error = 0.3 * 60 = 18°
f = _face(bbox_x=0.7, bbox_y=0.4, bbox_w=0.2, bbox_h=0.2)
pan, _ = self.mod.face_image_error(f, 60.0, 45.0)
self.assertAlmostEqual(pan, 18.0)
def test_left_of_centre(self):
# cx = 0.1 + 0.1 = 0.2, error = -0.3 * 60 = -18°
f = _face(bbox_x=0.1, bbox_y=0.4, bbox_w=0.2, bbox_h=0.2)
pan, _ = self.mod.face_image_error(f, 60.0, 45.0)
self.assertAlmostEqual(pan, -18.0)
def test_below_centre(self):
# cy = 0.7 + 0.1 = 0.8, error = 0.3 * 45 = 13.5°
f = _face(bbox_x=0.4, bbox_y=0.7, bbox_w=0.2, bbox_h=0.2)
_, tilt = self.mod.face_image_error(f, 60.0, 45.0)
self.assertAlmostEqual(tilt, 13.5)
def test_above_centre(self):
# cy = 0.1 + 0.1 = 0.2, error = -0.3 * 45 = -13.5°
f = _face(bbox_x=0.4, bbox_y=0.1, bbox_w=0.2, bbox_h=0.2)
_, tilt = self.mod.face_image_error(f, 60.0, 45.0)
self.assertAlmostEqual(tilt, -13.5)
class TestStepTowardZero(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.mod = _load_mod()
def test_positive_large(self):
result = self.mod._step_toward_zero(10.0, 1.0)
self.assertAlmostEqual(result, 9.0)
def test_negative_large(self):
result = self.mod._step_toward_zero(-10.0, 1.0)
self.assertAlmostEqual(result, -9.0)
def test_smaller_than_step(self):
result = self.mod._step_toward_zero(0.5, 1.0)
self.assertAlmostEqual(result, 0.0)
def test_exact_step(self):
result = self.mod._step_toward_zero(1.0, 1.0)
self.assertAlmostEqual(result, 0.0)
def test_zero(self):
result = self.mod._step_toward_zero(0.0, 1.0)
self.assertAlmostEqual(result, 0.0)
# ── Tests: PIDController ──────────────────────────────────────────────────────
class TestPIDController(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.mod = _load_mod()
def _pid(self, kp=1.0, ki=0.0, kd=0.0, vel_limit=100.0, windup=100.0):
return self.mod.PIDController(kp, ki, kd, vel_limit, windup)
def test_proportional_only(self):
pid = self._pid(kp=2.0)
out = pid.update(5.0, 0.1)
self.assertAlmostEqual(out, 10.0)
def test_zero_error_zero_output(self):
pid = self._pid(kp=5.0)
self.assertAlmostEqual(pid.update(0.0, 0.1), 0.0)
def test_integral_accumulates(self):
pid = self._pid(kp=0.0, ki=1.0)
pid.update(1.0, 0.1) # integral = 0.1
out = pid.update(1.0, 0.1) # integral = 0.2, output = 0.2
self.assertAlmostEqual(out, 0.2, places=5)
def test_derivative_first_tick_zero(self):
pid = self._pid(kp=0.0, kd=1.0)
out = pid.update(10.0, 0.1)
self.assertAlmostEqual(out, 0.0) # first tick: derivative = 0
def test_derivative_second_tick(self):
pid = self._pid(kp=0.0, kd=1.0)
pid.update(0.0, 0.1) # first tick
out = pid.update(10.0, 0.1) # de/dt = 10/0.1 = 100
self.assertAlmostEqual(out, 100.0)
def test_velocity_clamped(self):
pid = self._pid(kp=100.0, vel_limit=10.0)
out = pid.update(5.0, 0.1)
self.assertAlmostEqual(out, 10.0)
def test_velocity_clamped_negative(self):
pid = self._pid(kp=100.0, vel_limit=10.0)
out = pid.update(-5.0, 0.1)
self.assertAlmostEqual(out, -10.0)
def test_antiwindup(self):
pid = self._pid(kp=0.0, ki=1.0, windup=5.0)
for _ in range(100):
pid.update(1.0, 0.1) # would accumulate 10, clamped at 5
out = pid.update(0.0, 0.1)
self.assertAlmostEqual(out, 5.0, places=3)
def test_reset_clears_integral(self):
pid = self._pid(ki=1.0)
pid.update(1.0, 1.0)
pid.reset()
out = pid.update(0.0, 0.1)
self.assertAlmostEqual(out, 0.0)
def test_reset_clears_derivative(self):
pid = self._pid(kp=0.0, kd=1.0)
pid.update(10.0, 0.1) # sets prev_error
pid.reset()
out = pid.update(10.0, 0.1) # after reset, first tick = 0 derivative
self.assertAlmostEqual(out, 0.0)
def test_zero_dt_returns_zero(self):
pid = self._pid(kp=10.0)
self.assertAlmostEqual(pid.update(5.0, 0.0), 0.0)
def test_negative_dt_returns_zero(self):
pid = self._pid(kp=10.0)
self.assertAlmostEqual(pid.update(5.0, -0.1), 0.0)
# ── Tests: node initialisation ────────────────────────────────────────────────
class TestNodeInit(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.mod = _load_mod()
def test_instantiates(self):
node = _make_node(self.mod)
self.assertIsNotNone(node)
def test_pan_pub(self):
node = _make_node(self.mod)
self.assertIn("/saltybot/head_pan", node._pubs)
def test_tilt_pub(self):
node = _make_node(self.mod)
self.assertIn("/saltybot/head_tilt", node._pubs)
def test_faces_sub(self):
node = _make_node(self.mod)
self.assertIn("/social/faces/detected", node._subs)
def test_timer_registered(self):
node = _make_node(self.mod)
self.assertGreater(len(node._timers), 0)
def test_initial_pan_zero(self):
node = _make_node(self.mod)
self.assertAlmostEqual(node._pan_cmd, 0.0)
def test_initial_tilt_zero(self):
node = _make_node(self.mod)
self.assertAlmostEqual(node._tilt_cmd, 0.0)
def test_custom_fov(self):
node = _make_node(self.mod, fov_h_deg=90.0)
self.assertAlmostEqual(node._fov_h, 90.0)
def test_custom_pan_limit(self):
node = _make_node(self.mod, pan_limit_deg=45.0)
self.assertAlmostEqual(node._pan_limit, 45.0)
# ── Tests: face callback ──────────────────────────────────────────────────────
class TestFaceCallback(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.mod = _load_mod()
def setUp(self):
self.node = _make_node(self.mod)
def test_empty_msg_no_face(self):
self.node._on_faces(_FaceDetectionArray([]))
self.assertIsNone(self.node._latest_face)
def test_single_face_stored(self):
f = _centered_face()
self.node._on_faces(_FaceDetectionArray([f]))
self.assertIs(self.node._latest_face, f)
def test_closest_face_picked(self):
small = _face(bbox_w=0.1, bbox_h=0.1, face_id=1)
big = _face(bbox_w=0.5, bbox_h=0.5, face_id=2)
self.node._on_faces(_FaceDetectionArray([small, big]))
self.assertIs(self.node._latest_face, big)
def test_timestamp_updated_on_face(self):
before = time.monotonic()
f = _centered_face()
self.node._on_faces(_FaceDetectionArray([f]))
self.assertGreaterEqual(self.node._last_face_t, before)
def test_timestamp_not_updated_on_empty(self):
self.node._last_face_t = 0.0
self.node._on_faces(_FaceDetectionArray([]))
self.assertEqual(self.node._last_face_t, 0.0)
# ── Tests: control loop ───────────────────────────────────────────────────────
class TestControlLoop(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.mod = _load_mod()
def setUp(self):
self.node = _make_node(self.mod, dead_zone=0.0,
ki_pan=0.0, kd_pan=0.0,
ki_tilt=0.0, kd_tilt=0.0)
self.pan_pub = self.node._pubs["/saltybot/head_pan"]
self.tilt_pub = self.node._pubs["/saltybot/head_tilt"]
def _tick(self, dt=0.05):
self.node._last_tick = time.monotonic() - dt
self.node._control_cb()
def test_no_face_publishes_zero_initially(self):
self._tick()
self.assertAlmostEqual(self.pan_pub.msgs[-1].data, 0.0)
self.assertAlmostEqual(self.tilt_pub.msgs[-1].data, 0.0)
def test_centered_face_minimal_movement(self):
f = _centered_face() # cx=cy=0.5, error=0
self.node._on_faces(_FaceDetectionArray([f]))
self.node._last_face_t = time.monotonic()
self._tick()
# With dead_zone=0 and error=0, pid output=0, cmd stays 0
self.assertAlmostEqual(self.pan_pub.msgs[-1].data, 0.0, places=4)
self.assertAlmostEqual(self.tilt_pub.msgs[-1].data, 0.0, places=4)
def test_right_face_pans_right(self):
# Face right of centre → positive pan error → pan_cmd increases
f = _face(bbox_x=0.7, bbox_y=0.4, bbox_w=0.2, bbox_h=0.2)
self.node._on_faces(_FaceDetectionArray([f]))
self.node._last_face_t = time.monotonic()
self._tick()
self.assertGreater(self.pan_pub.msgs[-1].data, 0.0)
def test_left_face_pans_left(self):
f = _face(bbox_x=0.1, bbox_y=0.4, bbox_w=0.2, bbox_h=0.2)
self.node._on_faces(_FaceDetectionArray([f]))
self.node._last_face_t = time.monotonic()
self._tick()
self.assertLess(self.pan_pub.msgs[-1].data, 0.0)
def test_low_face_tilts_down(self):
f = _face(bbox_x=0.4, bbox_y=0.7, bbox_w=0.2, bbox_h=0.2)
self.node._on_faces(_FaceDetectionArray([f]))
self.node._last_face_t = time.monotonic()
self._tick()
self.assertGreater(self.tilt_pub.msgs[-1].data, 0.0)
def test_high_face_tilts_up(self):
f = _face(bbox_x=0.4, bbox_y=0.1, bbox_w=0.2, bbox_h=0.2)
self.node._on_faces(_FaceDetectionArray([f]))
self.node._last_face_t = time.monotonic()
self._tick()
self.assertLess(self.tilt_pub.msgs[-1].data, 0.0)
def test_pan_clamped_to_limit(self):
node = _make_node(self.mod, kp_pan=1000.0, ki_pan=0.0, kd_pan=0.0,
pan_limit_deg=45.0, pan_vel_limit=9999.0,
dead_zone=0.0)
pub = node._pubs["/saltybot/head_pan"]
f = _face(bbox_x=0.9, bbox_y=0.4, bbox_w=0.1, bbox_h=0.2)
node._on_faces(_FaceDetectionArray([f]))
node._last_face_t = time.monotonic()
# Run many ticks to accumulate
for _ in range(50):
node._last_tick = time.monotonic() - 0.05
node._control_cb()
self.assertLessEqual(pub.msgs[-1].data, 45.0)
def test_tilt_clamped_to_limit(self):
node = _make_node(self.mod, kp_tilt=1000.0, ki_tilt=0.0, kd_tilt=0.0,
tilt_limit_deg=20.0, tilt_vel_limit=9999.0,
dead_zone=0.0)
pub = node._pubs["/saltybot/head_tilt"]
f = _face(bbox_x=0.4, bbox_y=0.9, bbox_w=0.2, bbox_h=0.1)
node._on_faces(_FaceDetectionArray([f]))
node._last_face_t = time.monotonic()
for _ in range(50):
node._last_tick = time.monotonic() - 0.05
node._control_cb()
self.assertLessEqual(pub.msgs[-1].data, 20.0)
def test_lost_face_returns_to_zero(self):
node = _make_node(self.mod, kp_pan=10.0, ki_pan=0.0, kd_pan=0.0,
dead_zone=0.0, return_rate_deg_s=90.0,
lost_timeout_s=0.01)
pub = node._pubs["/saltybot/head_pan"]
f = _face(bbox_x=0.7, bbox_y=0.4, bbox_w=0.2, bbox_h=0.2)
node._on_faces(_FaceDetectionArray([f]))
node._last_face_t = time.monotonic()
# Build up some pan
for _ in range(5):
node._last_tick = time.monotonic() - 0.05
node._control_cb()
# Expire face timeout
node._last_face_t = time.monotonic() - 10.0
for _ in range(20):
node._last_tick = time.monotonic() - 0.05
node._control_cb()
self.assertAlmostEqual(pub.msgs[-1].data, 0.0, places=3)
def test_publishes_every_tick(self):
for _ in range(3):
self._tick()
self.assertEqual(len(self.pan_pub.msgs), 3)
self.assertEqual(len(self.tilt_pub.msgs), 3)
def test_dead_zone_suppresses_small_error(self):
node = _make_node(self.mod, kp_pan=100.0, ki_pan=0.0, kd_pan=0.0,
dead_zone=0.1, fov_h_deg=60.0)
pub = node._pubs["/saltybot/head_pan"]
# Face 2% right of centre — within dead_zone=10% of frame
f = _face(bbox_x=0.42, bbox_y=0.4, bbox_w=0.2, bbox_h=0.2)
node._on_faces(_FaceDetectionArray([f]))
node._last_face_t = time.monotonic()
node._last_tick = time.monotonic() - 0.05
node._control_cb()
self.assertAlmostEqual(pub.msgs[-1].data, 0.0, places=4)
# ── Tests: source-level checks ────────────────────────────────────────────────
class TestNodeSrc(unittest.TestCase):
@classmethod
def setUpClass(cls):
with open(_SRC) as f:
cls.src = f.read()
def test_issue_tag(self):
self.assertIn("#279", self.src)
def test_pan_topic(self):
self.assertIn("/saltybot/head_pan", self.src)
def test_tilt_topic(self):
self.assertIn("/saltybot/head_tilt", self.src)
def test_faces_topic(self):
self.assertIn("/social/faces/detected", self.src)
def test_pid_class(self):
self.assertIn("class PIDController", self.src)
def test_kp_param(self):
self.assertIn("kp_pan", self.src)
def test_ki_param(self):
self.assertIn("ki_pan", self.src)
def test_kd_param(self):
self.assertIn("kd_pan", self.src)
def test_fov_param(self):
self.assertIn("fov_h_deg", self.src)
def test_pan_limit_param(self):
self.assertIn("pan_limit_deg", self.src)
def test_dead_zone_param(self):
self.assertIn("dead_zone", self.src)
def test_pick_closest_face(self):
self.assertIn("pick_closest_face", self.src)
def test_main_defined(self):
self.assertIn("def main", self.src)
def test_antiwindup(self):
self.assertIn("windup", self.src)
def test_threading_lock(self):
self.assertIn("threading.Lock", self.src)
class TestConfig(unittest.TestCase):
_CONFIG = (
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
"saltybot_social/config/face_track_servo_params.yaml"
)
_LAUNCH = (
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
"saltybot_social/launch/face_track_servo.launch.py"
)
_SETUP = (
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
"saltybot_social/setup.py"
)
def test_config_exists(self):
import os; self.assertTrue(os.path.exists(self._CONFIG))
def test_config_kp_pan(self):
with open(self._CONFIG) as f: c = f.read()
self.assertIn("kp_pan", c)
def test_config_fov(self):
with open(self._CONFIG) as f: c = f.read()
self.assertIn("fov_h_deg", c)
def test_config_pan_limit(self):
with open(self._CONFIG) as f: c = f.read()
self.assertIn("pan_limit_deg", c)
def test_config_dead_zone(self):
with open(self._CONFIG) as f: c = f.read()
self.assertIn("dead_zone", c)
def test_launch_exists(self):
import os; self.assertTrue(os.path.exists(self._LAUNCH))
def test_launch_kp_pan_arg(self):
with open(self._LAUNCH) as f: c = f.read()
self.assertIn("kp_pan", c)
def test_launch_pan_limit_arg(self):
with open(self._LAUNCH) as f: c = f.read()
self.assertIn("pan_limit_deg", c)
def test_entry_point(self):
with open(self._SETUP) as f: c = f.read()
self.assertIn("face_track_servo_node", c)
if __name__ == "__main__":
unittest.main()