From c3d36e9943c53372ac3095b4a03efda4706bec38 Mon Sep 17 00:00:00 2001 From: sl-jetson Date: Mon, 2 Mar 2026 17:37:57 -0500 Subject: [PATCH] =?UTF-8?q?feat(social):=20face-tracking=20head=20servo=20?= =?UTF-8?q?controller=20=E2=80=94=20Issue=20#279?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds face_track_servo_node to saltybot_social: - Subscribes /social/faces/detected (FaceDetectionArray) - Picks closest face by largest bbox area (proximity proxy) - Computes pan/tilt error from bbox centre vs image centre using configurable FOV (fov_h_deg=60°, fov_v_deg=45°) - Independent PID controllers for pan and tilt (velocity/incremental output with anti-windup); servo position integrates velocity*dt - Clamps commands to ±pan_limit_deg / ±tilt_limit_deg - Returns to centre at return_rate_deg_s when face lost >lost_timeout_s - Dead zone suppresses jitter for small errors - Publishes Float32 on /saltybot/head_pan and /saltybot/head_tilt - 81/81 tests passing Closes #279 --- .../config/face_track_servo_params.yaml | 30 + .../launch/face_track_servo.launch.py | 51 ++ .../saltybot_social/face_track_servo_node.py | 308 ++++++++ jetson/ros2_ws/src/saltybot_social/setup.py | 2 + .../test/test_face_track_servo.py | 676 ++++++++++++++++++ 5 files changed, 1067 insertions(+) create mode 100644 jetson/ros2_ws/src/saltybot_social/config/face_track_servo_params.yaml create mode 100644 jetson/ros2_ws/src/saltybot_social/launch/face_track_servo.launch.py create mode 100644 jetson/ros2_ws/src/saltybot_social/saltybot_social/face_track_servo_node.py create mode 100644 jetson/ros2_ws/src/saltybot_social/test/test_face_track_servo.py diff --git a/jetson/ros2_ws/src/saltybot_social/config/face_track_servo_params.yaml b/jetson/ros2_ws/src/saltybot_social/config/face_track_servo_params.yaml new file mode 100644 index 0000000..938d278 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_social/config/face_track_servo_params.yaml @@ -0,0 +1,30 @@ +face_track_servo_node: + ros__parameters: + # PID gains — pan axis + kp_pan: 1.5 # proportional gain (°/s per ° error) + ki_pan: 0.1 # integral gain + kd_pan: 0.05 # derivative gain (damping) + + # PID gains — tilt axis + kp_tilt: 1.2 + ki_tilt: 0.1 + kd_tilt: 0.04 + + # Camera FOV + fov_h_deg: 60.0 # horizontal field of view (degrees) + fov_v_deg: 45.0 # vertical field of view (degrees) + + # Servo limits + pan_limit_deg: 90.0 # mechanical pan range ± (degrees) + tilt_limit_deg: 30.0 # mechanical tilt range ± (degrees) + pan_vel_limit: 45.0 # max pan rate (°/s) + tilt_vel_limit: 30.0 # max tilt rate (°/s) + windup_limit: 15.0 # integral anti-windup clamp (°·s) + + # Tracking behaviour + dead_zone: 0.02 # normalised dead zone (fraction of frame width/height) + control_rate: 20.0 # control loop frequency (Hz) + lost_timeout_s: 1.5 # seconds before face considered lost + return_rate_deg_s: 10.0 # return-to-centre speed when no face (°/s) + + faces_topic: "/social/faces/detected" diff --git a/jetson/ros2_ws/src/saltybot_social/launch/face_track_servo.launch.py b/jetson/ros2_ws/src/saltybot_social/launch/face_track_servo.launch.py new file mode 100644 index 0000000..9232a64 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_social/launch/face_track_servo.launch.py @@ -0,0 +1,51 @@ +"""face_track_servo.launch.py — Launch face-tracking head servo controller (Issue #279). + +Usage: + ros2 launch saltybot_social face_track_servo.launch.py + ros2 launch saltybot_social face_track_servo.launch.py kp_pan:=2.0 pan_limit_deg:=60.0 +""" + +import os +from ament_index_python.packages import get_package_share_directory +from launch import LaunchDescription +from launch.actions import DeclareLaunchArgument +from launch.substitutions import LaunchConfiguration +from launch_ros.actions import Node + + +def generate_launch_description(): + pkg = get_package_share_directory("saltybot_social") + cfg = os.path.join(pkg, "config", "face_track_servo_params.yaml") + + return LaunchDescription([ + DeclareLaunchArgument("kp_pan", default_value="1.5", + description="Pan proportional gain (°/s per °)"), + DeclareLaunchArgument("kp_tilt", default_value="1.2", + description="Tilt proportional gain (°/s per °)"), + DeclareLaunchArgument("pan_limit_deg", default_value="90.0", + description="Mechanical pan limit ± (degrees)"), + DeclareLaunchArgument("tilt_limit_deg", default_value="30.0", + description="Mechanical tilt limit ± (degrees)"), + DeclareLaunchArgument("fov_h_deg", default_value="60.0", + description="Camera horizontal FOV (degrees)"), + DeclareLaunchArgument("fov_v_deg", default_value="45.0", + description="Camera vertical FOV (degrees)"), + + Node( + package="saltybot_social", + executable="face_track_servo_node", + name="face_track_servo_node", + output="screen", + parameters=[ + cfg, + { + "kp_pan": LaunchConfiguration("kp_pan"), + "kp_tilt": LaunchConfiguration("kp_tilt"), + "pan_limit_deg": LaunchConfiguration("pan_limit_deg"), + "tilt_limit_deg": LaunchConfiguration("tilt_limit_deg"), + "fov_h_deg": LaunchConfiguration("fov_h_deg"), + "fov_v_deg": LaunchConfiguration("fov_v_deg"), + }, + ], + ), + ]) diff --git a/jetson/ros2_ws/src/saltybot_social/saltybot_social/face_track_servo_node.py b/jetson/ros2_ws/src/saltybot_social/saltybot_social/face_track_servo_node.py new file mode 100644 index 0000000..054260e --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_social/saltybot_social/face_track_servo_node.py @@ -0,0 +1,308 @@ +"""face_track_servo_node.py — Face-tracking head servo controller. +Issue #279 + +Subscribes to /social/faces/detected, picks the closest face (largest +bounding-box area as a proximity proxy), computes pan/tilt angular error +relative to the image centre, and drives two PID controllers to produce +smooth servo position commands published on /saltybot/head_pan and +/saltybot/head_tilt (std_msgs/Float32, degrees from neutral). + +Coordinate convention +───────────────────── + bbox_x/y/w/h : normalised [0, 1] in image space + face centre : cx = bbox_x + bbox_w/2 , cy = bbox_y + bbox_h/2 + image centre : (0.5, 0.5) + pan error : (cx - 0.5) * fov_h_deg (+ve → face right of centre) + tilt error : (cy - 0.5) * fov_v_deg (+ve → face below centre) + +PID design (velocity / incremental) +──────────────────────────────────── + velocity (°/s) = Kp·e + Ki·∫e dt + Kd·de/dt + servo_angle += velocity · dt + servo_angle = clamp(servo_angle, ±limit_deg) + +When no face is seen for more than ``lost_timeout_s`` seconds the PIDs +are reset and the servo commands return toward 0° at ``return_rate_deg_s``. + +Parameters +────────── + kp_pan (float, 1.5) pan proportional gain (°/s per °) + ki_pan (float, 0.1) pan integral gain + kd_pan (float, 0.05) pan derivative gain + kp_tilt (float, 1.2) tilt proportional gain + ki_tilt (float, 0.1) tilt integral gain + kd_tilt (float, 0.04) tilt derivative gain + fov_h_deg (float, 60.0) camera horizontal FOV (degrees) + fov_v_deg (float, 45.0) camera vertical FOV (degrees) + pan_limit_deg (float, 90.0) mechanical pan limit ± + tilt_limit_deg (float, 30.0) mechanical tilt limit ± + pan_vel_limit (float, 45.0) max pan rate (°/s) + tilt_vel_limit (float, 30.0) max tilt rate (°/s) + windup_limit (float, 15.0) integral anti-windup clamp (°·s) + dead_zone (float, 0.02) normalised dead zone (fraction of frame) + control_rate (float, 20.0) control loop Hz + lost_timeout_s (float, 1.5) seconds before face considered lost + return_rate_deg_s (float, 10.0) return-to-centre rate when no face (°/s) + faces_topic (str) default "/social/faces/detected" +""" + +from __future__ import annotations + +import math +import time +import threading +from typing import Optional + +import rclpy +from rclpy.node import Node +from rclpy.qos import QoSProfile +from std_msgs.msg import Float32 + +try: + from saltybot_social_msgs.msg import FaceDetectionArray + _MSGS = True +except ImportError: + _MSGS = False + + +# ── Pure helpers ─────────────────────────────────────────────────────────────── + +def clamp(v: float, lo: float, hi: float) -> float: + return max(lo, min(hi, v)) + + +def bbox_area(face) -> float: + """Bounding-box area as a proximity proxy (larger ≈ closer).""" + return float(face.bbox_w) * float(face.bbox_h) + + +def pick_closest_face(faces): + """Return the face with the largest bbox area; None if list is empty.""" + if not faces: + return None + return max(faces, key=bbox_area) + + +def face_image_error(face, fov_h_deg: float, fov_v_deg: float): + """Return (pan_error_deg, tilt_error_deg) for a FaceDetection. + + Positive pan → face is right of image centre. + Positive tilt → face is below image centre. + """ + cx = float(face.bbox_x) + float(face.bbox_w) / 2.0 + cy = float(face.bbox_y) + float(face.bbox_h) / 2.0 + pan_err = (cx - 0.5) * fov_h_deg + tilt_err = (cy - 0.5) * fov_v_deg + return pan_err, tilt_err + + +# ── PID controller ───────────────────────────────────────────────────────────── + +class PIDController: + """Incremental (velocity-output) PID with anti-windup. + + Output units: degrees/second (servo angular velocity). + Integrate externally: servo_angle += pid.update(error, dt) * dt + """ + + def __init__(self, kp: float, ki: float, kd: float, + vel_limit: float, windup_limit: float) -> None: + self.kp = kp + self.ki = ki + self.kd = kd + self.vel_limit = vel_limit + self.windup_limit = windup_limit + self._integral = 0.0 + self._prev_error = 0.0 + self._first = True + + def update(self, error: float, dt: float) -> float: + """Return velocity command (°/s). Call every control tick.""" + if dt <= 0.0: + return 0.0 + + self._integral += error * dt + self._integral = clamp(self._integral, -self.windup_limit, + self.windup_limit) + + if self._first: + derivative = 0.0 + self._first = False + else: + derivative = (error - self._prev_error) / dt + + self._prev_error = error + output = (self.kp * error + + self.ki * self._integral + + self.kd * derivative) + return clamp(output, -self.vel_limit, self.vel_limit) + + def reset(self) -> None: + self._integral = 0.0 + self._prev_error = 0.0 + self._first = True + + +# ── ROS2 node ────────────────────────────────────────────────────────────────── + +class FaceTrackServoNode(Node): + """Smooth PID face-tracking servo controller.""" + + def __init__(self) -> None: + super().__init__("face_track_servo_node") + + # Declare parameters + self.declare_parameter("kp_pan", 1.5) + self.declare_parameter("ki_pan", 0.1) + self.declare_parameter("kd_pan", 0.05) + self.declare_parameter("kp_tilt", 1.2) + self.declare_parameter("ki_tilt", 0.1) + self.declare_parameter("kd_tilt", 0.04) + self.declare_parameter("fov_h_deg", 60.0) + self.declare_parameter("fov_v_deg", 45.0) + self.declare_parameter("pan_limit_deg", 90.0) + self.declare_parameter("tilt_limit_deg", 30.0) + self.declare_parameter("pan_vel_limit", 45.0) + self.declare_parameter("tilt_vel_limit", 30.0) + self.declare_parameter("windup_limit", 15.0) + self.declare_parameter("dead_zone", 0.02) + self.declare_parameter("control_rate", 20.0) + self.declare_parameter("lost_timeout_s", 1.5) + self.declare_parameter("return_rate_deg_s", 10.0) + self.declare_parameter("faces_topic", "/social/faces/detected") + + self._reload_params() + + # Servo state + self._pan_cmd = 0.0 + self._tilt_cmd = 0.0 + self._last_face_t: float = 0.0 + self._latest_face = None + self._lock = threading.Lock() + + qos = QoSProfile(depth=10) + self._pan_pub = self.create_publisher(Float32, "/saltybot/head_pan", qos) + self._tilt_pub = self.create_publisher(Float32, "/saltybot/head_tilt", qos) + + faces_topic = self.get_parameter("faces_topic").value + if _MSGS: + self._faces_sub = self.create_subscription( + FaceDetectionArray, faces_topic, self._on_faces, qos + ) + else: + self.get_logger().warn( + "saltybot_social_msgs not available — node passive (no subscription)" + ) + + rate = self.get_parameter("control_rate").value + self._timer = self.create_timer(1.0 / rate, self._control_cb) + self._last_tick = time.monotonic() + + self.get_logger().info( + f"FaceTrackServoNode ready " + f"(rate={rate}Hz, fov={self._fov_h}×{self._fov_v}°, " + f"pan±{self._pan_limit}°, tilt±{self._tilt_limit}°)" + ) + + def _reload_params(self) -> None: + self._fov_h = self.get_parameter("fov_h_deg").value + self._fov_v = self.get_parameter("fov_v_deg").value + self._pan_limit = self.get_parameter("pan_limit_deg").value + self._tilt_limit = self.get_parameter("tilt_limit_deg").value + self._dead_zone = self.get_parameter("dead_zone").value + self._lost_t = self.get_parameter("lost_timeout_s").value + self._return_rate = self.get_parameter("return_rate_deg_s").value + + self._pid_pan = PIDController( + kp=self.get_parameter("kp_pan").value, + ki=self.get_parameter("ki_pan").value, + kd=self.get_parameter("kd_pan").value, + vel_limit=self.get_parameter("pan_vel_limit").value, + windup_limit=self.get_parameter("windup_limit").value, + ) + self._pid_tilt = PIDController( + kp=self.get_parameter("kp_tilt").value, + ki=self.get_parameter("ki_tilt").value, + kd=self.get_parameter("kd_tilt").value, + vel_limit=self.get_parameter("tilt_vel_limit").value, + windup_limit=self.get_parameter("windup_limit").value, + ) + + # ── Subscription callback ────────────────────────────────────────────── + + def _on_faces(self, msg) -> None: + face = pick_closest_face(msg.faces) + with self._lock: + self._latest_face = face + if face is not None: + self._last_face_t = time.monotonic() + + # ── Control loop ─────────────────────────────────────────────────────── + + def _control_cb(self) -> None: + now = time.monotonic() + dt = now - self._last_tick + self._last_tick = now + dt = max(dt, 1e-4) # guard against zero dt at startup + + with self._lock: + face = self._latest_face + last_face_t = self._last_face_t + + face_fresh = (last_face_t > 0.0 and (now - last_face_t) < self._lost_t) + + if not face_fresh or face is None: + # Return to centre + self._pid_pan.reset() + self._pid_tilt.reset() + step = self._return_rate * dt + self._pan_cmd = _step_toward_zero(self._pan_cmd, step) + self._tilt_cmd = _step_toward_zero(self._tilt_cmd, step) + else: + pan_err, tilt_err = face_image_error(face, self._fov_h, self._fov_v) + + # Dead zone (normalised fraction → degrees) + dead_deg_h = self._dead_zone * self._fov_h + dead_deg_v = self._dead_zone * self._fov_v + + if abs(pan_err) < dead_deg_h: + self._pid_pan.reset() + else: + vel_pan = self._pid_pan.update(pan_err, dt) + self._pan_cmd = clamp( + self._pan_cmd + vel_pan * dt, + -self._pan_limit, self._pan_limit, + ) + + if abs(tilt_err) < dead_deg_v: + self._pid_tilt.reset() + else: + vel_tilt = self._pid_tilt.update(tilt_err, dt) + self._tilt_cmd = clamp( + self._tilt_cmd + vel_tilt * dt, + -self._tilt_limit, self._tilt_limit, + ) + + pan_msg = Float32(); pan_msg.data = float(self._pan_cmd) + tilt_msg = Float32(); tilt_msg.data = float(self._tilt_cmd) + self._pan_pub.publish(pan_msg) + self._tilt_pub.publish(tilt_msg) + + +def _step_toward_zero(value: float, step: float) -> float: + """Move value toward 0 by step without overshooting.""" + if abs(value) <= step: + return 0.0 + return value - math.copysign(step, value) + + +def main(args=None) -> None: + rclpy.init(args=args) + node = FaceTrackServoNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.shutdown() diff --git a/jetson/ros2_ws/src/saltybot_social/setup.py b/jetson/ros2_ws/src/saltybot_social/setup.py index c341b1f..a128723 100644 --- a/jetson/ros2_ws/src/saltybot_social/setup.py +++ b/jetson/ros2_ws/src/saltybot_social/setup.py @@ -49,6 +49,8 @@ setup( 'ambient_sound_node = saltybot_social.ambient_sound_node:main', # Proximity-based greeting trigger (Issue #270) 'greeting_trigger_node = saltybot_social.greeting_trigger_node:main', + # Face-tracking head servo controller (Issue #279) + 'face_track_servo_node = saltybot_social.face_track_servo_node:main', ], }, ) diff --git a/jetson/ros2_ws/src/saltybot_social/test/test_face_track_servo.py b/jetson/ros2_ws/src/saltybot_social/test/test_face_track_servo.py new file mode 100644 index 0000000..e6cfd4a --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_social/test/test_face_track_servo.py @@ -0,0 +1,676 @@ +"""test_face_track_servo.py — Offline tests for face_track_servo_node (Issue #279). + +Stubs out rclpy and saltybot_social_msgs so tests run without a ROS install. +""" + +import importlib +import importlib.util +import math +import sys +import time +import types +import unittest + + +# ── ROS2 / message stubs ────────────────────────────────────────────────────── + +def _make_ros_stubs(): + for mod_name in ("rclpy", "rclpy.node", "rclpy.qos", + "std_msgs", "std_msgs.msg", + "saltybot_social_msgs", "saltybot_social_msgs.msg"): + if mod_name not in sys.modules: + sys.modules[mod_name] = types.ModuleType(mod_name) + + class _Node: + def __init__(self, name="node"): + self._name = name + if not hasattr(self, "_params"): + self._params = {} + self._pubs = {} + self._subs = {} + self._timers = [] + self._logs = [] + + def declare_parameter(self, name, default): + if name not in self._params: + self._params[name] = default + + def get_parameter(self, name): + class _P: + def __init__(self, v): self.value = v + return _P(self._params.get(name)) + + def create_publisher(self, msg_type, topic, qos): + pub = _FakePub() + self._pubs[topic] = pub + return pub + + def create_subscription(self, msg_type, topic, cb, qos): + self._subs[topic] = cb + return object() + + def create_timer(self, period, cb): + self._timers.append(cb) + return object() + + def get_logger(self): + node = self + class _L: + def info(self, m): node._logs.append(("INFO", m)) + def warn(self, m): node._logs.append(("WARN", m)) + def error(self, m): node._logs.append(("ERROR", m)) + return _L() + + def destroy_node(self): pass + + class _FakePub: + def __init__(self): + self.msgs = [] + def publish(self, msg): + self.msgs.append(msg) + + class _QoSProfile: + def __init__(self, depth=10): self.depth = depth + + class _Float32: + def __init__(self): self.data = 0.0 + + class _FaceDetection: + def __init__(self, face_id=0, bbox_x=0.4, bbox_y=0.4, + bbox_w=0.2, bbox_h=0.2, confidence=1.0): + self.face_id = face_id + self.bbox_x = bbox_x + self.bbox_y = bbox_y + self.bbox_w = bbox_w + self.bbox_h = bbox_h + self.confidence = confidence + self.person_name = "" + + class _FaceDetectionArray: + def __init__(self, faces=None): + self.faces = faces or [] + + # rclpy + rclpy_mod = sys.modules["rclpy"] + rclpy_mod.init = lambda args=None: None + rclpy_mod.spin = lambda node: None + rclpy_mod.shutdown = lambda: None + + sys.modules["rclpy.node"].Node = _Node + sys.modules["rclpy.qos"].QoSProfile = _QoSProfile + sys.modules["std_msgs.msg"].Float32 = _Float32 + + msgs = sys.modules["saltybot_social_msgs.msg"] + msgs.FaceDetection = _FaceDetection + msgs.FaceDetectionArray = _FaceDetectionArray + + return _Node, _FakePub, _FaceDetection, _FaceDetectionArray, _Float32 + + +_Node, _FakePub, _FaceDetection, _FaceDetectionArray, _Float32 = _make_ros_stubs() + + +# ── Module loader ───────────────────────────────────────────────────────────── + +_SRC = ( + "/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/" + "saltybot_social/saltybot_social/face_track_servo_node.py" +) + + +def _load_mod(): + spec = importlib.util.spec_from_file_location("face_track_servo_testmod", _SRC) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + return mod + + +def _make_node(mod, **kwargs): + """Instantiate FaceTrackServoNode with optional param overrides.""" + node = mod.FaceTrackServoNode.__new__(mod.FaceTrackServoNode) + + defaults = { + "kp_pan": 1.5, + "ki_pan": 0.1, + "kd_pan": 0.05, + "kp_tilt": 1.2, + "ki_tilt": 0.1, + "kd_tilt": 0.04, + "fov_h_deg": 60.0, + "fov_v_deg": 45.0, + "pan_limit_deg": 90.0, + "tilt_limit_deg": 30.0, + "pan_vel_limit": 45.0, + "tilt_vel_limit": 30.0, + "windup_limit": 15.0, + "dead_zone": 0.02, + "control_rate": 20.0, + "lost_timeout_s": 1.5, + "return_rate_deg_s": 10.0, + "faces_topic": "/social/faces/detected", + } + defaults.update(kwargs) + node._params = dict(defaults) + mod.FaceTrackServoNode.__init__(node) + return node + + +def _face(bbox_x=0.4, bbox_y=0.4, bbox_w=0.2, bbox_h=0.2, face_id=0): + return _FaceDetection(face_id=face_id, bbox_x=bbox_x, bbox_y=bbox_y, + bbox_w=bbox_w, bbox_h=bbox_h) + + +def _centered_face(): + """A face perfectly centered in the frame.""" + return _face(bbox_x=0.4, bbox_y=0.4, bbox_w=0.2, bbox_h=0.2) + + +# ── Tests: pure helpers ─────────────────────────────────────────────────────── + +class TestClamp(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.mod = _load_mod() + + def test_within(self): + self.assertEqual(self.mod.clamp(5.0, 0.0, 10.0), 5.0) + + def test_below(self): + self.assertEqual(self.mod.clamp(-5.0, 0.0, 10.0), 0.0) + + def test_above(self): + self.assertEqual(self.mod.clamp(15.0, 0.0, 10.0), 10.0) + + def test_negative_range(self): + self.assertEqual(self.mod.clamp(-50.0, -45.0, 45.0), -45.0) + + +class TestBboxArea(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.mod = _load_mod() + + def test_area(self): + f = _face(bbox_w=0.3, bbox_h=0.4) + self.assertAlmostEqual(self.mod.bbox_area(f), 0.12) + + def test_zero(self): + f = _face(bbox_w=0.0, bbox_h=0.2) + self.assertAlmostEqual(self.mod.bbox_area(f), 0.0) + + +class TestPickClosestFace(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.mod = _load_mod() + + def test_empty(self): + self.assertIsNone(self.mod.pick_closest_face([])) + + def test_single(self): + f = _face(bbox_w=0.2, bbox_h=0.2) + self.assertIs(self.mod.pick_closest_face([f]), f) + + def test_picks_largest_area(self): + small = _face(bbox_w=0.1, bbox_h=0.1) + big = _face(bbox_w=0.4, bbox_h=0.4) + self.assertIs(self.mod.pick_closest_face([small, big]), big) + self.assertIs(self.mod.pick_closest_face([big, small]), big) + + def test_three_faces(self): + faces = [_face(bbox_w=0.1, bbox_h=0.1), + _face(bbox_w=0.5, bbox_h=0.5), + _face(bbox_w=0.2, bbox_h=0.2)] + self.assertIs(self.mod.pick_closest_face(faces), faces[1]) + + +class TestFaceImageError(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.mod = _load_mod() + + def test_centered_face_zero_error(self): + f = _face(bbox_x=0.4, bbox_y=0.4, bbox_w=0.2, bbox_h=0.2) + pan, tilt = self.mod.face_image_error(f, 60.0, 45.0) + self.assertAlmostEqual(pan, 0.0) + self.assertAlmostEqual(tilt, 0.0) + + def test_right_of_centre(self): + # cx = 0.7 + 0.1 = 0.8, error = 0.3 * 60 = 18° + f = _face(bbox_x=0.7, bbox_y=0.4, bbox_w=0.2, bbox_h=0.2) + pan, _ = self.mod.face_image_error(f, 60.0, 45.0) + self.assertAlmostEqual(pan, 18.0) + + def test_left_of_centre(self): + # cx = 0.1 + 0.1 = 0.2, error = -0.3 * 60 = -18° + f = _face(bbox_x=0.1, bbox_y=0.4, bbox_w=0.2, bbox_h=0.2) + pan, _ = self.mod.face_image_error(f, 60.0, 45.0) + self.assertAlmostEqual(pan, -18.0) + + def test_below_centre(self): + # cy = 0.7 + 0.1 = 0.8, error = 0.3 * 45 = 13.5° + f = _face(bbox_x=0.4, bbox_y=0.7, bbox_w=0.2, bbox_h=0.2) + _, tilt = self.mod.face_image_error(f, 60.0, 45.0) + self.assertAlmostEqual(tilt, 13.5) + + def test_above_centre(self): + # cy = 0.1 + 0.1 = 0.2, error = -0.3 * 45 = -13.5° + f = _face(bbox_x=0.4, bbox_y=0.1, bbox_w=0.2, bbox_h=0.2) + _, tilt = self.mod.face_image_error(f, 60.0, 45.0) + self.assertAlmostEqual(tilt, -13.5) + + +class TestStepTowardZero(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.mod = _load_mod() + + def test_positive_large(self): + result = self.mod._step_toward_zero(10.0, 1.0) + self.assertAlmostEqual(result, 9.0) + + def test_negative_large(self): + result = self.mod._step_toward_zero(-10.0, 1.0) + self.assertAlmostEqual(result, -9.0) + + def test_smaller_than_step(self): + result = self.mod._step_toward_zero(0.5, 1.0) + self.assertAlmostEqual(result, 0.0) + + def test_exact_step(self): + result = self.mod._step_toward_zero(1.0, 1.0) + self.assertAlmostEqual(result, 0.0) + + def test_zero(self): + result = self.mod._step_toward_zero(0.0, 1.0) + self.assertAlmostEqual(result, 0.0) + + +# ── Tests: PIDController ────────────────────────────────────────────────────── + +class TestPIDController(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.mod = _load_mod() + + def _pid(self, kp=1.0, ki=0.0, kd=0.0, vel_limit=100.0, windup=100.0): + return self.mod.PIDController(kp, ki, kd, vel_limit, windup) + + def test_proportional_only(self): + pid = self._pid(kp=2.0) + out = pid.update(5.0, 0.1) + self.assertAlmostEqual(out, 10.0) + + def test_zero_error_zero_output(self): + pid = self._pid(kp=5.0) + self.assertAlmostEqual(pid.update(0.0, 0.1), 0.0) + + def test_integral_accumulates(self): + pid = self._pid(kp=0.0, ki=1.0) + pid.update(1.0, 0.1) # integral = 0.1 + out = pid.update(1.0, 0.1) # integral = 0.2, output = 0.2 + self.assertAlmostEqual(out, 0.2, places=5) + + def test_derivative_first_tick_zero(self): + pid = self._pid(kp=0.0, kd=1.0) + out = pid.update(10.0, 0.1) + self.assertAlmostEqual(out, 0.0) # first tick: derivative = 0 + + def test_derivative_second_tick(self): + pid = self._pid(kp=0.0, kd=1.0) + pid.update(0.0, 0.1) # first tick + out = pid.update(10.0, 0.1) # de/dt = 10/0.1 = 100 + self.assertAlmostEqual(out, 100.0) + + def test_velocity_clamped(self): + pid = self._pid(kp=100.0, vel_limit=10.0) + out = pid.update(5.0, 0.1) + self.assertAlmostEqual(out, 10.0) + + def test_velocity_clamped_negative(self): + pid = self._pid(kp=100.0, vel_limit=10.0) + out = pid.update(-5.0, 0.1) + self.assertAlmostEqual(out, -10.0) + + def test_antiwindup(self): + pid = self._pid(kp=0.0, ki=1.0, windup=5.0) + for _ in range(100): + pid.update(1.0, 0.1) # would accumulate 10, clamped at 5 + out = pid.update(0.0, 0.1) + self.assertAlmostEqual(out, 5.0, places=3) + + def test_reset_clears_integral(self): + pid = self._pid(ki=1.0) + pid.update(1.0, 1.0) + pid.reset() + out = pid.update(0.0, 0.1) + self.assertAlmostEqual(out, 0.0) + + def test_reset_clears_derivative(self): + pid = self._pid(kp=0.0, kd=1.0) + pid.update(10.0, 0.1) # sets prev_error + pid.reset() + out = pid.update(10.0, 0.1) # after reset, first tick = 0 derivative + self.assertAlmostEqual(out, 0.0) + + def test_zero_dt_returns_zero(self): + pid = self._pid(kp=10.0) + self.assertAlmostEqual(pid.update(5.0, 0.0), 0.0) + + def test_negative_dt_returns_zero(self): + pid = self._pid(kp=10.0) + self.assertAlmostEqual(pid.update(5.0, -0.1), 0.0) + + +# ── Tests: node initialisation ──────────────────────────────────────────────── + +class TestNodeInit(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.mod = _load_mod() + + def test_instantiates(self): + node = _make_node(self.mod) + self.assertIsNotNone(node) + + def test_pan_pub(self): + node = _make_node(self.mod) + self.assertIn("/saltybot/head_pan", node._pubs) + + def test_tilt_pub(self): + node = _make_node(self.mod) + self.assertIn("/saltybot/head_tilt", node._pubs) + + def test_faces_sub(self): + node = _make_node(self.mod) + self.assertIn("/social/faces/detected", node._subs) + + def test_timer_registered(self): + node = _make_node(self.mod) + self.assertGreater(len(node._timers), 0) + + def test_initial_pan_zero(self): + node = _make_node(self.mod) + self.assertAlmostEqual(node._pan_cmd, 0.0) + + def test_initial_tilt_zero(self): + node = _make_node(self.mod) + self.assertAlmostEqual(node._tilt_cmd, 0.0) + + def test_custom_fov(self): + node = _make_node(self.mod, fov_h_deg=90.0) + self.assertAlmostEqual(node._fov_h, 90.0) + + def test_custom_pan_limit(self): + node = _make_node(self.mod, pan_limit_deg=45.0) + self.assertAlmostEqual(node._pan_limit, 45.0) + + +# ── Tests: face callback ────────────────────────────────────────────────────── + +class TestFaceCallback(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.mod = _load_mod() + + def setUp(self): + self.node = _make_node(self.mod) + + def test_empty_msg_no_face(self): + self.node._on_faces(_FaceDetectionArray([])) + self.assertIsNone(self.node._latest_face) + + def test_single_face_stored(self): + f = _centered_face() + self.node._on_faces(_FaceDetectionArray([f])) + self.assertIs(self.node._latest_face, f) + + def test_closest_face_picked(self): + small = _face(bbox_w=0.1, bbox_h=0.1, face_id=1) + big = _face(bbox_w=0.5, bbox_h=0.5, face_id=2) + self.node._on_faces(_FaceDetectionArray([small, big])) + self.assertIs(self.node._latest_face, big) + + def test_timestamp_updated_on_face(self): + before = time.monotonic() + f = _centered_face() + self.node._on_faces(_FaceDetectionArray([f])) + self.assertGreaterEqual(self.node._last_face_t, before) + + def test_timestamp_not_updated_on_empty(self): + self.node._last_face_t = 0.0 + self.node._on_faces(_FaceDetectionArray([])) + self.assertEqual(self.node._last_face_t, 0.0) + + +# ── Tests: control loop ─────────────────────────────────────────────────────── + +class TestControlLoop(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.mod = _load_mod() + + def setUp(self): + self.node = _make_node(self.mod, dead_zone=0.0, + ki_pan=0.0, kd_pan=0.0, + ki_tilt=0.0, kd_tilt=0.0) + self.pan_pub = self.node._pubs["/saltybot/head_pan"] + self.tilt_pub = self.node._pubs["/saltybot/head_tilt"] + + def _tick(self, dt=0.05): + self.node._last_tick = time.monotonic() - dt + self.node._control_cb() + + def test_no_face_publishes_zero_initially(self): + self._tick() + self.assertAlmostEqual(self.pan_pub.msgs[-1].data, 0.0) + self.assertAlmostEqual(self.tilt_pub.msgs[-1].data, 0.0) + + def test_centered_face_minimal_movement(self): + f = _centered_face() # cx=cy=0.5, error=0 + self.node._on_faces(_FaceDetectionArray([f])) + self.node._last_face_t = time.monotonic() + self._tick() + # With dead_zone=0 and error=0, pid output=0, cmd stays 0 + self.assertAlmostEqual(self.pan_pub.msgs[-1].data, 0.0, places=4) + self.assertAlmostEqual(self.tilt_pub.msgs[-1].data, 0.0, places=4) + + def test_right_face_pans_right(self): + # Face right of centre → positive pan error → pan_cmd increases + f = _face(bbox_x=0.7, bbox_y=0.4, bbox_w=0.2, bbox_h=0.2) + self.node._on_faces(_FaceDetectionArray([f])) + self.node._last_face_t = time.monotonic() + self._tick() + self.assertGreater(self.pan_pub.msgs[-1].data, 0.0) + + def test_left_face_pans_left(self): + f = _face(bbox_x=0.1, bbox_y=0.4, bbox_w=0.2, bbox_h=0.2) + self.node._on_faces(_FaceDetectionArray([f])) + self.node._last_face_t = time.monotonic() + self._tick() + self.assertLess(self.pan_pub.msgs[-1].data, 0.0) + + def test_low_face_tilts_down(self): + f = _face(bbox_x=0.4, bbox_y=0.7, bbox_w=0.2, bbox_h=0.2) + self.node._on_faces(_FaceDetectionArray([f])) + self.node._last_face_t = time.monotonic() + self._tick() + self.assertGreater(self.tilt_pub.msgs[-1].data, 0.0) + + def test_high_face_tilts_up(self): + f = _face(bbox_x=0.4, bbox_y=0.1, bbox_w=0.2, bbox_h=0.2) + self.node._on_faces(_FaceDetectionArray([f])) + self.node._last_face_t = time.monotonic() + self._tick() + self.assertLess(self.tilt_pub.msgs[-1].data, 0.0) + + def test_pan_clamped_to_limit(self): + node = _make_node(self.mod, kp_pan=1000.0, ki_pan=0.0, kd_pan=0.0, + pan_limit_deg=45.0, pan_vel_limit=9999.0, + dead_zone=0.0) + pub = node._pubs["/saltybot/head_pan"] + f = _face(bbox_x=0.9, bbox_y=0.4, bbox_w=0.1, bbox_h=0.2) + node._on_faces(_FaceDetectionArray([f])) + node._last_face_t = time.monotonic() + # Run many ticks to accumulate + for _ in range(50): + node._last_tick = time.monotonic() - 0.05 + node._control_cb() + self.assertLessEqual(pub.msgs[-1].data, 45.0) + + def test_tilt_clamped_to_limit(self): + node = _make_node(self.mod, kp_tilt=1000.0, ki_tilt=0.0, kd_tilt=0.0, + tilt_limit_deg=20.0, tilt_vel_limit=9999.0, + dead_zone=0.0) + pub = node._pubs["/saltybot/head_tilt"] + f = _face(bbox_x=0.4, bbox_y=0.9, bbox_w=0.2, bbox_h=0.1) + node._on_faces(_FaceDetectionArray([f])) + node._last_face_t = time.monotonic() + for _ in range(50): + node._last_tick = time.monotonic() - 0.05 + node._control_cb() + self.assertLessEqual(pub.msgs[-1].data, 20.0) + + def test_lost_face_returns_to_zero(self): + node = _make_node(self.mod, kp_pan=10.0, ki_pan=0.0, kd_pan=0.0, + dead_zone=0.0, return_rate_deg_s=90.0, + lost_timeout_s=0.01) + pub = node._pubs["/saltybot/head_pan"] + f = _face(bbox_x=0.7, bbox_y=0.4, bbox_w=0.2, bbox_h=0.2) + node._on_faces(_FaceDetectionArray([f])) + node._last_face_t = time.monotonic() + # Build up some pan + for _ in range(5): + node._last_tick = time.monotonic() - 0.05 + node._control_cb() + # Expire face timeout + node._last_face_t = time.monotonic() - 10.0 + for _ in range(20): + node._last_tick = time.monotonic() - 0.05 + node._control_cb() + self.assertAlmostEqual(pub.msgs[-1].data, 0.0, places=3) + + def test_publishes_every_tick(self): + for _ in range(3): + self._tick() + self.assertEqual(len(self.pan_pub.msgs), 3) + self.assertEqual(len(self.tilt_pub.msgs), 3) + + def test_dead_zone_suppresses_small_error(self): + node = _make_node(self.mod, kp_pan=100.0, ki_pan=0.0, kd_pan=0.0, + dead_zone=0.1, fov_h_deg=60.0) + pub = node._pubs["/saltybot/head_pan"] + # Face 2% right of centre — within dead_zone=10% of frame + f = _face(bbox_x=0.42, bbox_y=0.4, bbox_w=0.2, bbox_h=0.2) + node._on_faces(_FaceDetectionArray([f])) + node._last_face_t = time.monotonic() + node._last_tick = time.monotonic() - 0.05 + node._control_cb() + self.assertAlmostEqual(pub.msgs[-1].data, 0.0, places=4) + + +# ── Tests: source-level checks ──────────────────────────────────────────────── + +class TestNodeSrc(unittest.TestCase): + @classmethod + def setUpClass(cls): + with open(_SRC) as f: + cls.src = f.read() + + def test_issue_tag(self): + self.assertIn("#279", self.src) + + def test_pan_topic(self): + self.assertIn("/saltybot/head_pan", self.src) + + def test_tilt_topic(self): + self.assertIn("/saltybot/head_tilt", self.src) + + def test_faces_topic(self): + self.assertIn("/social/faces/detected", self.src) + + def test_pid_class(self): + self.assertIn("class PIDController", self.src) + + def test_kp_param(self): + self.assertIn("kp_pan", self.src) + + def test_ki_param(self): + self.assertIn("ki_pan", self.src) + + def test_kd_param(self): + self.assertIn("kd_pan", self.src) + + def test_fov_param(self): + self.assertIn("fov_h_deg", self.src) + + def test_pan_limit_param(self): + self.assertIn("pan_limit_deg", self.src) + + def test_dead_zone_param(self): + self.assertIn("dead_zone", self.src) + + def test_pick_closest_face(self): + self.assertIn("pick_closest_face", self.src) + + def test_main_defined(self): + self.assertIn("def main", self.src) + + def test_antiwindup(self): + self.assertIn("windup", self.src) + + def test_threading_lock(self): + self.assertIn("threading.Lock", self.src) + + +class TestConfig(unittest.TestCase): + _CONFIG = ( + "/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/" + "saltybot_social/config/face_track_servo_params.yaml" + ) + _LAUNCH = ( + "/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/" + "saltybot_social/launch/face_track_servo.launch.py" + ) + _SETUP = ( + "/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/" + "saltybot_social/setup.py" + ) + + def test_config_exists(self): + import os; self.assertTrue(os.path.exists(self._CONFIG)) + + def test_config_kp_pan(self): + with open(self._CONFIG) as f: c = f.read() + self.assertIn("kp_pan", c) + + def test_config_fov(self): + with open(self._CONFIG) as f: c = f.read() + self.assertIn("fov_h_deg", c) + + def test_config_pan_limit(self): + with open(self._CONFIG) as f: c = f.read() + self.assertIn("pan_limit_deg", c) + + def test_config_dead_zone(self): + with open(self._CONFIG) as f: c = f.read() + self.assertIn("dead_zone", c) + + def test_launch_exists(self): + import os; self.assertTrue(os.path.exists(self._LAUNCH)) + + def test_launch_kp_pan_arg(self): + with open(self._LAUNCH) as f: c = f.read() + self.assertIn("kp_pan", c) + + def test_launch_pan_limit_arg(self): + with open(self._LAUNCH) as f: c = f.read() + self.assertIn("pan_limit_deg", c) + + def test_entry_point(self): + with open(self._SETUP) as f: c = f.read() + self.assertIn("face_track_servo_node", c) + + +if __name__ == "__main__": + unittest.main() -- 2.47.2