feat(social): face-tracking head servo controller (Issue #279) #284
@ -0,0 +1,30 @@
|
|||||||
|
face_track_servo_node:
|
||||||
|
ros__parameters:
|
||||||
|
# PID gains — pan axis
|
||||||
|
kp_pan: 1.5 # proportional gain (°/s per ° error)
|
||||||
|
ki_pan: 0.1 # integral gain
|
||||||
|
kd_pan: 0.05 # derivative gain (damping)
|
||||||
|
|
||||||
|
# PID gains — tilt axis
|
||||||
|
kp_tilt: 1.2
|
||||||
|
ki_tilt: 0.1
|
||||||
|
kd_tilt: 0.04
|
||||||
|
|
||||||
|
# Camera FOV
|
||||||
|
fov_h_deg: 60.0 # horizontal field of view (degrees)
|
||||||
|
fov_v_deg: 45.0 # vertical field of view (degrees)
|
||||||
|
|
||||||
|
# Servo limits
|
||||||
|
pan_limit_deg: 90.0 # mechanical pan range ± (degrees)
|
||||||
|
tilt_limit_deg: 30.0 # mechanical tilt range ± (degrees)
|
||||||
|
pan_vel_limit: 45.0 # max pan rate (°/s)
|
||||||
|
tilt_vel_limit: 30.0 # max tilt rate (°/s)
|
||||||
|
windup_limit: 15.0 # integral anti-windup clamp (°·s)
|
||||||
|
|
||||||
|
# Tracking behaviour
|
||||||
|
dead_zone: 0.02 # normalised dead zone (fraction of frame width/height)
|
||||||
|
control_rate: 20.0 # control loop frequency (Hz)
|
||||||
|
lost_timeout_s: 1.5 # seconds before face considered lost
|
||||||
|
return_rate_deg_s: 10.0 # return-to-centre speed when no face (°/s)
|
||||||
|
|
||||||
|
faces_topic: "/social/faces/detected"
|
||||||
@ -0,0 +1,51 @@
|
|||||||
|
"""face_track_servo.launch.py — Launch face-tracking head servo controller (Issue #279).
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
ros2 launch saltybot_social face_track_servo.launch.py
|
||||||
|
ros2 launch saltybot_social face_track_servo.launch.py kp_pan:=2.0 pan_limit_deg:=60.0
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
from ament_index_python.packages import get_package_share_directory
|
||||||
|
from launch import LaunchDescription
|
||||||
|
from launch.actions import DeclareLaunchArgument
|
||||||
|
from launch.substitutions import LaunchConfiguration
|
||||||
|
from launch_ros.actions import Node
|
||||||
|
|
||||||
|
|
||||||
|
def generate_launch_description():
|
||||||
|
pkg = get_package_share_directory("saltybot_social")
|
||||||
|
cfg = os.path.join(pkg, "config", "face_track_servo_params.yaml")
|
||||||
|
|
||||||
|
return LaunchDescription([
|
||||||
|
DeclareLaunchArgument("kp_pan", default_value="1.5",
|
||||||
|
description="Pan proportional gain (°/s per °)"),
|
||||||
|
DeclareLaunchArgument("kp_tilt", default_value="1.2",
|
||||||
|
description="Tilt proportional gain (°/s per °)"),
|
||||||
|
DeclareLaunchArgument("pan_limit_deg", default_value="90.0",
|
||||||
|
description="Mechanical pan limit ± (degrees)"),
|
||||||
|
DeclareLaunchArgument("tilt_limit_deg", default_value="30.0",
|
||||||
|
description="Mechanical tilt limit ± (degrees)"),
|
||||||
|
DeclareLaunchArgument("fov_h_deg", default_value="60.0",
|
||||||
|
description="Camera horizontal FOV (degrees)"),
|
||||||
|
DeclareLaunchArgument("fov_v_deg", default_value="45.0",
|
||||||
|
description="Camera vertical FOV (degrees)"),
|
||||||
|
|
||||||
|
Node(
|
||||||
|
package="saltybot_social",
|
||||||
|
executable="face_track_servo_node",
|
||||||
|
name="face_track_servo_node",
|
||||||
|
output="screen",
|
||||||
|
parameters=[
|
||||||
|
cfg,
|
||||||
|
{
|
||||||
|
"kp_pan": LaunchConfiguration("kp_pan"),
|
||||||
|
"kp_tilt": LaunchConfiguration("kp_tilt"),
|
||||||
|
"pan_limit_deg": LaunchConfiguration("pan_limit_deg"),
|
||||||
|
"tilt_limit_deg": LaunchConfiguration("tilt_limit_deg"),
|
||||||
|
"fov_h_deg": LaunchConfiguration("fov_h_deg"),
|
||||||
|
"fov_v_deg": LaunchConfiguration("fov_v_deg"),
|
||||||
|
},
|
||||||
|
],
|
||||||
|
),
|
||||||
|
])
|
||||||
@ -0,0 +1,308 @@
|
|||||||
|
"""face_track_servo_node.py — Face-tracking head servo controller.
|
||||||
|
Issue #279
|
||||||
|
|
||||||
|
Subscribes to /social/faces/detected, picks the closest face (largest
|
||||||
|
bounding-box area as a proximity proxy), computes pan/tilt angular error
|
||||||
|
relative to the image centre, and drives two PID controllers to produce
|
||||||
|
smooth servo position commands published on /saltybot/head_pan and
|
||||||
|
/saltybot/head_tilt (std_msgs/Float32, degrees from neutral).
|
||||||
|
|
||||||
|
Coordinate convention
|
||||||
|
─────────────────────
|
||||||
|
bbox_x/y/w/h : normalised [0, 1] in image space
|
||||||
|
face centre : cx = bbox_x + bbox_w/2 , cy = bbox_y + bbox_h/2
|
||||||
|
image centre : (0.5, 0.5)
|
||||||
|
pan error : (cx - 0.5) * fov_h_deg (+ve → face right of centre)
|
||||||
|
tilt error : (cy - 0.5) * fov_v_deg (+ve → face below centre)
|
||||||
|
|
||||||
|
PID design (velocity / incremental)
|
||||||
|
────────────────────────────────────
|
||||||
|
velocity (°/s) = Kp·e + Ki·∫e dt + Kd·de/dt
|
||||||
|
servo_angle += velocity · dt
|
||||||
|
servo_angle = clamp(servo_angle, ±limit_deg)
|
||||||
|
|
||||||
|
When no face is seen for more than ``lost_timeout_s`` seconds the PIDs
|
||||||
|
are reset and the servo commands return toward 0° at ``return_rate_deg_s``.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
──────────
|
||||||
|
kp_pan (float, 1.5) pan proportional gain (°/s per °)
|
||||||
|
ki_pan (float, 0.1) pan integral gain
|
||||||
|
kd_pan (float, 0.05) pan derivative gain
|
||||||
|
kp_tilt (float, 1.2) tilt proportional gain
|
||||||
|
ki_tilt (float, 0.1) tilt integral gain
|
||||||
|
kd_tilt (float, 0.04) tilt derivative gain
|
||||||
|
fov_h_deg (float, 60.0) camera horizontal FOV (degrees)
|
||||||
|
fov_v_deg (float, 45.0) camera vertical FOV (degrees)
|
||||||
|
pan_limit_deg (float, 90.0) mechanical pan limit ±
|
||||||
|
tilt_limit_deg (float, 30.0) mechanical tilt limit ±
|
||||||
|
pan_vel_limit (float, 45.0) max pan rate (°/s)
|
||||||
|
tilt_vel_limit (float, 30.0) max tilt rate (°/s)
|
||||||
|
windup_limit (float, 15.0) integral anti-windup clamp (°·s)
|
||||||
|
dead_zone (float, 0.02) normalised dead zone (fraction of frame)
|
||||||
|
control_rate (float, 20.0) control loop Hz
|
||||||
|
lost_timeout_s (float, 1.5) seconds before face considered lost
|
||||||
|
return_rate_deg_s (float, 10.0) return-to-centre rate when no face (°/s)
|
||||||
|
faces_topic (str) default "/social/faces/detected"
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import math
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from rclpy.qos import QoSProfile
|
||||||
|
from std_msgs.msg import Float32
|
||||||
|
|
||||||
|
try:
|
||||||
|
from saltybot_social_msgs.msg import FaceDetectionArray
|
||||||
|
_MSGS = True
|
||||||
|
except ImportError:
|
||||||
|
_MSGS = False
|
||||||
|
|
||||||
|
|
||||||
|
# ── Pure helpers ───────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def clamp(v: float, lo: float, hi: float) -> float:
|
||||||
|
return max(lo, min(hi, v))
|
||||||
|
|
||||||
|
|
||||||
|
def bbox_area(face) -> float:
|
||||||
|
"""Bounding-box area as a proximity proxy (larger ≈ closer)."""
|
||||||
|
return float(face.bbox_w) * float(face.bbox_h)
|
||||||
|
|
||||||
|
|
||||||
|
def pick_closest_face(faces):
|
||||||
|
"""Return the face with the largest bbox area; None if list is empty."""
|
||||||
|
if not faces:
|
||||||
|
return None
|
||||||
|
return max(faces, key=bbox_area)
|
||||||
|
|
||||||
|
|
||||||
|
def face_image_error(face, fov_h_deg: float, fov_v_deg: float):
|
||||||
|
"""Return (pan_error_deg, tilt_error_deg) for a FaceDetection.
|
||||||
|
|
||||||
|
Positive pan → face is right of image centre.
|
||||||
|
Positive tilt → face is below image centre.
|
||||||
|
"""
|
||||||
|
cx = float(face.bbox_x) + float(face.bbox_w) / 2.0
|
||||||
|
cy = float(face.bbox_y) + float(face.bbox_h) / 2.0
|
||||||
|
pan_err = (cx - 0.5) * fov_h_deg
|
||||||
|
tilt_err = (cy - 0.5) * fov_v_deg
|
||||||
|
return pan_err, tilt_err
|
||||||
|
|
||||||
|
|
||||||
|
# ── PID controller ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class PIDController:
|
||||||
|
"""Incremental (velocity-output) PID with anti-windup.
|
||||||
|
|
||||||
|
Output units: degrees/second (servo angular velocity).
|
||||||
|
Integrate externally: servo_angle += pid.update(error, dt) * dt
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, kp: float, ki: float, kd: float,
|
||||||
|
vel_limit: float, windup_limit: float) -> None:
|
||||||
|
self.kp = kp
|
||||||
|
self.ki = ki
|
||||||
|
self.kd = kd
|
||||||
|
self.vel_limit = vel_limit
|
||||||
|
self.windup_limit = windup_limit
|
||||||
|
self._integral = 0.0
|
||||||
|
self._prev_error = 0.0
|
||||||
|
self._first = True
|
||||||
|
|
||||||
|
def update(self, error: float, dt: float) -> float:
|
||||||
|
"""Return velocity command (°/s). Call every control tick."""
|
||||||
|
if dt <= 0.0:
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
self._integral += error * dt
|
||||||
|
self._integral = clamp(self._integral, -self.windup_limit,
|
||||||
|
self.windup_limit)
|
||||||
|
|
||||||
|
if self._first:
|
||||||
|
derivative = 0.0
|
||||||
|
self._first = False
|
||||||
|
else:
|
||||||
|
derivative = (error - self._prev_error) / dt
|
||||||
|
|
||||||
|
self._prev_error = error
|
||||||
|
output = (self.kp * error
|
||||||
|
+ self.ki * self._integral
|
||||||
|
+ self.kd * derivative)
|
||||||
|
return clamp(output, -self.vel_limit, self.vel_limit)
|
||||||
|
|
||||||
|
def reset(self) -> None:
|
||||||
|
self._integral = 0.0
|
||||||
|
self._prev_error = 0.0
|
||||||
|
self._first = True
|
||||||
|
|
||||||
|
|
||||||
|
# ── ROS2 node ──────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class FaceTrackServoNode(Node):
|
||||||
|
"""Smooth PID face-tracking servo controller."""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
super().__init__("face_track_servo_node")
|
||||||
|
|
||||||
|
# Declare parameters
|
||||||
|
self.declare_parameter("kp_pan", 1.5)
|
||||||
|
self.declare_parameter("ki_pan", 0.1)
|
||||||
|
self.declare_parameter("kd_pan", 0.05)
|
||||||
|
self.declare_parameter("kp_tilt", 1.2)
|
||||||
|
self.declare_parameter("ki_tilt", 0.1)
|
||||||
|
self.declare_parameter("kd_tilt", 0.04)
|
||||||
|
self.declare_parameter("fov_h_deg", 60.0)
|
||||||
|
self.declare_parameter("fov_v_deg", 45.0)
|
||||||
|
self.declare_parameter("pan_limit_deg", 90.0)
|
||||||
|
self.declare_parameter("tilt_limit_deg", 30.0)
|
||||||
|
self.declare_parameter("pan_vel_limit", 45.0)
|
||||||
|
self.declare_parameter("tilt_vel_limit", 30.0)
|
||||||
|
self.declare_parameter("windup_limit", 15.0)
|
||||||
|
self.declare_parameter("dead_zone", 0.02)
|
||||||
|
self.declare_parameter("control_rate", 20.0)
|
||||||
|
self.declare_parameter("lost_timeout_s", 1.5)
|
||||||
|
self.declare_parameter("return_rate_deg_s", 10.0)
|
||||||
|
self.declare_parameter("faces_topic", "/social/faces/detected")
|
||||||
|
|
||||||
|
self._reload_params()
|
||||||
|
|
||||||
|
# Servo state
|
||||||
|
self._pan_cmd = 0.0
|
||||||
|
self._tilt_cmd = 0.0
|
||||||
|
self._last_face_t: float = 0.0
|
||||||
|
self._latest_face = None
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
|
||||||
|
qos = QoSProfile(depth=10)
|
||||||
|
self._pan_pub = self.create_publisher(Float32, "/saltybot/head_pan", qos)
|
||||||
|
self._tilt_pub = self.create_publisher(Float32, "/saltybot/head_tilt", qos)
|
||||||
|
|
||||||
|
faces_topic = self.get_parameter("faces_topic").value
|
||||||
|
if _MSGS:
|
||||||
|
self._faces_sub = self.create_subscription(
|
||||||
|
FaceDetectionArray, faces_topic, self._on_faces, qos
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
self.get_logger().warn(
|
||||||
|
"saltybot_social_msgs not available — node passive (no subscription)"
|
||||||
|
)
|
||||||
|
|
||||||
|
rate = self.get_parameter("control_rate").value
|
||||||
|
self._timer = self.create_timer(1.0 / rate, self._control_cb)
|
||||||
|
self._last_tick = time.monotonic()
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"FaceTrackServoNode ready "
|
||||||
|
f"(rate={rate}Hz, fov={self._fov_h}×{self._fov_v}°, "
|
||||||
|
f"pan±{self._pan_limit}°, tilt±{self._tilt_limit}°)"
|
||||||
|
)
|
||||||
|
|
||||||
|
def _reload_params(self) -> None:
|
||||||
|
self._fov_h = self.get_parameter("fov_h_deg").value
|
||||||
|
self._fov_v = self.get_parameter("fov_v_deg").value
|
||||||
|
self._pan_limit = self.get_parameter("pan_limit_deg").value
|
||||||
|
self._tilt_limit = self.get_parameter("tilt_limit_deg").value
|
||||||
|
self._dead_zone = self.get_parameter("dead_zone").value
|
||||||
|
self._lost_t = self.get_parameter("lost_timeout_s").value
|
||||||
|
self._return_rate = self.get_parameter("return_rate_deg_s").value
|
||||||
|
|
||||||
|
self._pid_pan = PIDController(
|
||||||
|
kp=self.get_parameter("kp_pan").value,
|
||||||
|
ki=self.get_parameter("ki_pan").value,
|
||||||
|
kd=self.get_parameter("kd_pan").value,
|
||||||
|
vel_limit=self.get_parameter("pan_vel_limit").value,
|
||||||
|
windup_limit=self.get_parameter("windup_limit").value,
|
||||||
|
)
|
||||||
|
self._pid_tilt = PIDController(
|
||||||
|
kp=self.get_parameter("kp_tilt").value,
|
||||||
|
ki=self.get_parameter("ki_tilt").value,
|
||||||
|
kd=self.get_parameter("kd_tilt").value,
|
||||||
|
vel_limit=self.get_parameter("tilt_vel_limit").value,
|
||||||
|
windup_limit=self.get_parameter("windup_limit").value,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Subscription callback ──────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _on_faces(self, msg) -> None:
|
||||||
|
face = pick_closest_face(msg.faces)
|
||||||
|
with self._lock:
|
||||||
|
self._latest_face = face
|
||||||
|
if face is not None:
|
||||||
|
self._last_face_t = time.monotonic()
|
||||||
|
|
||||||
|
# ── Control loop ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _control_cb(self) -> None:
|
||||||
|
now = time.monotonic()
|
||||||
|
dt = now - self._last_tick
|
||||||
|
self._last_tick = now
|
||||||
|
dt = max(dt, 1e-4) # guard against zero dt at startup
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
face = self._latest_face
|
||||||
|
last_face_t = self._last_face_t
|
||||||
|
|
||||||
|
face_fresh = (last_face_t > 0.0 and (now - last_face_t) < self._lost_t)
|
||||||
|
|
||||||
|
if not face_fresh or face is None:
|
||||||
|
# Return to centre
|
||||||
|
self._pid_pan.reset()
|
||||||
|
self._pid_tilt.reset()
|
||||||
|
step = self._return_rate * dt
|
||||||
|
self._pan_cmd = _step_toward_zero(self._pan_cmd, step)
|
||||||
|
self._tilt_cmd = _step_toward_zero(self._tilt_cmd, step)
|
||||||
|
else:
|
||||||
|
pan_err, tilt_err = face_image_error(face, self._fov_h, self._fov_v)
|
||||||
|
|
||||||
|
# Dead zone (normalised fraction → degrees)
|
||||||
|
dead_deg_h = self._dead_zone * self._fov_h
|
||||||
|
dead_deg_v = self._dead_zone * self._fov_v
|
||||||
|
|
||||||
|
if abs(pan_err) < dead_deg_h:
|
||||||
|
self._pid_pan.reset()
|
||||||
|
else:
|
||||||
|
vel_pan = self._pid_pan.update(pan_err, dt)
|
||||||
|
self._pan_cmd = clamp(
|
||||||
|
self._pan_cmd + vel_pan * dt,
|
||||||
|
-self._pan_limit, self._pan_limit,
|
||||||
|
)
|
||||||
|
|
||||||
|
if abs(tilt_err) < dead_deg_v:
|
||||||
|
self._pid_tilt.reset()
|
||||||
|
else:
|
||||||
|
vel_tilt = self._pid_tilt.update(tilt_err, dt)
|
||||||
|
self._tilt_cmd = clamp(
|
||||||
|
self._tilt_cmd + vel_tilt * dt,
|
||||||
|
-self._tilt_limit, self._tilt_limit,
|
||||||
|
)
|
||||||
|
|
||||||
|
pan_msg = Float32(); pan_msg.data = float(self._pan_cmd)
|
||||||
|
tilt_msg = Float32(); tilt_msg.data = float(self._tilt_cmd)
|
||||||
|
self._pan_pub.publish(pan_msg)
|
||||||
|
self._tilt_pub.publish(tilt_msg)
|
||||||
|
|
||||||
|
|
||||||
|
def _step_toward_zero(value: float, step: float) -> float:
|
||||||
|
"""Move value toward 0 by step without overshooting."""
|
||||||
|
if abs(value) <= step:
|
||||||
|
return 0.0
|
||||||
|
return value - math.copysign(step, value)
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None) -> None:
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = FaceTrackServoNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
@ -49,6 +49,8 @@ setup(
|
|||||||
'ambient_sound_node = saltybot_social.ambient_sound_node:main',
|
'ambient_sound_node = saltybot_social.ambient_sound_node:main',
|
||||||
# Proximity-based greeting trigger (Issue #270)
|
# Proximity-based greeting trigger (Issue #270)
|
||||||
'greeting_trigger_node = saltybot_social.greeting_trigger_node:main',
|
'greeting_trigger_node = saltybot_social.greeting_trigger_node:main',
|
||||||
|
# Face-tracking head servo controller (Issue #279)
|
||||||
|
'face_track_servo_node = saltybot_social.face_track_servo_node:main',
|
||||||
],
|
],
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|||||||
676
jetson/ros2_ws/src/saltybot_social/test/test_face_track_servo.py
Normal file
676
jetson/ros2_ws/src/saltybot_social/test/test_face_track_servo.py
Normal file
@ -0,0 +1,676 @@
|
|||||||
|
"""test_face_track_servo.py — Offline tests for face_track_servo_node (Issue #279).
|
||||||
|
|
||||||
|
Stubs out rclpy and saltybot_social_msgs so tests run without a ROS install.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import importlib
|
||||||
|
import importlib.util
|
||||||
|
import math
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import types
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
|
||||||
|
# ── ROS2 / message stubs ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _make_ros_stubs():
|
||||||
|
for mod_name in ("rclpy", "rclpy.node", "rclpy.qos",
|
||||||
|
"std_msgs", "std_msgs.msg",
|
||||||
|
"saltybot_social_msgs", "saltybot_social_msgs.msg"):
|
||||||
|
if mod_name not in sys.modules:
|
||||||
|
sys.modules[mod_name] = types.ModuleType(mod_name)
|
||||||
|
|
||||||
|
class _Node:
|
||||||
|
def __init__(self, name="node"):
|
||||||
|
self._name = name
|
||||||
|
if not hasattr(self, "_params"):
|
||||||
|
self._params = {}
|
||||||
|
self._pubs = {}
|
||||||
|
self._subs = {}
|
||||||
|
self._timers = []
|
||||||
|
self._logs = []
|
||||||
|
|
||||||
|
def declare_parameter(self, name, default):
|
||||||
|
if name not in self._params:
|
||||||
|
self._params[name] = default
|
||||||
|
|
||||||
|
def get_parameter(self, name):
|
||||||
|
class _P:
|
||||||
|
def __init__(self, v): self.value = v
|
||||||
|
return _P(self._params.get(name))
|
||||||
|
|
||||||
|
def create_publisher(self, msg_type, topic, qos):
|
||||||
|
pub = _FakePub()
|
||||||
|
self._pubs[topic] = pub
|
||||||
|
return pub
|
||||||
|
|
||||||
|
def create_subscription(self, msg_type, topic, cb, qos):
|
||||||
|
self._subs[topic] = cb
|
||||||
|
return object()
|
||||||
|
|
||||||
|
def create_timer(self, period, cb):
|
||||||
|
self._timers.append(cb)
|
||||||
|
return object()
|
||||||
|
|
||||||
|
def get_logger(self):
|
||||||
|
node = self
|
||||||
|
class _L:
|
||||||
|
def info(self, m): node._logs.append(("INFO", m))
|
||||||
|
def warn(self, m): node._logs.append(("WARN", m))
|
||||||
|
def error(self, m): node._logs.append(("ERROR", m))
|
||||||
|
return _L()
|
||||||
|
|
||||||
|
def destroy_node(self): pass
|
||||||
|
|
||||||
|
class _FakePub:
|
||||||
|
def __init__(self):
|
||||||
|
self.msgs = []
|
||||||
|
def publish(self, msg):
|
||||||
|
self.msgs.append(msg)
|
||||||
|
|
||||||
|
class _QoSProfile:
|
||||||
|
def __init__(self, depth=10): self.depth = depth
|
||||||
|
|
||||||
|
class _Float32:
|
||||||
|
def __init__(self): self.data = 0.0
|
||||||
|
|
||||||
|
class _FaceDetection:
|
||||||
|
def __init__(self, face_id=0, bbox_x=0.4, bbox_y=0.4,
|
||||||
|
bbox_w=0.2, bbox_h=0.2, confidence=1.0):
|
||||||
|
self.face_id = face_id
|
||||||
|
self.bbox_x = bbox_x
|
||||||
|
self.bbox_y = bbox_y
|
||||||
|
self.bbox_w = bbox_w
|
||||||
|
self.bbox_h = bbox_h
|
||||||
|
self.confidence = confidence
|
||||||
|
self.person_name = ""
|
||||||
|
|
||||||
|
class _FaceDetectionArray:
|
||||||
|
def __init__(self, faces=None):
|
||||||
|
self.faces = faces or []
|
||||||
|
|
||||||
|
# rclpy
|
||||||
|
rclpy_mod = sys.modules["rclpy"]
|
||||||
|
rclpy_mod.init = lambda args=None: None
|
||||||
|
rclpy_mod.spin = lambda node: None
|
||||||
|
rclpy_mod.shutdown = lambda: None
|
||||||
|
|
||||||
|
sys.modules["rclpy.node"].Node = _Node
|
||||||
|
sys.modules["rclpy.qos"].QoSProfile = _QoSProfile
|
||||||
|
sys.modules["std_msgs.msg"].Float32 = _Float32
|
||||||
|
|
||||||
|
msgs = sys.modules["saltybot_social_msgs.msg"]
|
||||||
|
msgs.FaceDetection = _FaceDetection
|
||||||
|
msgs.FaceDetectionArray = _FaceDetectionArray
|
||||||
|
|
||||||
|
return _Node, _FakePub, _FaceDetection, _FaceDetectionArray, _Float32
|
||||||
|
|
||||||
|
|
||||||
|
_Node, _FakePub, _FaceDetection, _FaceDetectionArray, _Float32 = _make_ros_stubs()
|
||||||
|
|
||||||
|
|
||||||
|
# ── Module loader ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
_SRC = (
|
||||||
|
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
|
||||||
|
"saltybot_social/saltybot_social/face_track_servo_node.py"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _load_mod():
|
||||||
|
spec = importlib.util.spec_from_file_location("face_track_servo_testmod", _SRC)
|
||||||
|
mod = importlib.util.module_from_spec(spec)
|
||||||
|
spec.loader.exec_module(mod)
|
||||||
|
return mod
|
||||||
|
|
||||||
|
|
||||||
|
def _make_node(mod, **kwargs):
|
||||||
|
"""Instantiate FaceTrackServoNode with optional param overrides."""
|
||||||
|
node = mod.FaceTrackServoNode.__new__(mod.FaceTrackServoNode)
|
||||||
|
|
||||||
|
defaults = {
|
||||||
|
"kp_pan": 1.5,
|
||||||
|
"ki_pan": 0.1,
|
||||||
|
"kd_pan": 0.05,
|
||||||
|
"kp_tilt": 1.2,
|
||||||
|
"ki_tilt": 0.1,
|
||||||
|
"kd_tilt": 0.04,
|
||||||
|
"fov_h_deg": 60.0,
|
||||||
|
"fov_v_deg": 45.0,
|
||||||
|
"pan_limit_deg": 90.0,
|
||||||
|
"tilt_limit_deg": 30.0,
|
||||||
|
"pan_vel_limit": 45.0,
|
||||||
|
"tilt_vel_limit": 30.0,
|
||||||
|
"windup_limit": 15.0,
|
||||||
|
"dead_zone": 0.02,
|
||||||
|
"control_rate": 20.0,
|
||||||
|
"lost_timeout_s": 1.5,
|
||||||
|
"return_rate_deg_s": 10.0,
|
||||||
|
"faces_topic": "/social/faces/detected",
|
||||||
|
}
|
||||||
|
defaults.update(kwargs)
|
||||||
|
node._params = dict(defaults)
|
||||||
|
mod.FaceTrackServoNode.__init__(node)
|
||||||
|
return node
|
||||||
|
|
||||||
|
|
||||||
|
def _face(bbox_x=0.4, bbox_y=0.4, bbox_w=0.2, bbox_h=0.2, face_id=0):
|
||||||
|
return _FaceDetection(face_id=face_id, bbox_x=bbox_x, bbox_y=bbox_y,
|
||||||
|
bbox_w=bbox_w, bbox_h=bbox_h)
|
||||||
|
|
||||||
|
|
||||||
|
def _centered_face():
|
||||||
|
"""A face perfectly centered in the frame."""
|
||||||
|
return _face(bbox_x=0.4, bbox_y=0.4, bbox_w=0.2, bbox_h=0.2)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Tests: pure helpers ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestClamp(unittest.TestCase):
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls):
|
||||||
|
cls.mod = _load_mod()
|
||||||
|
|
||||||
|
def test_within(self):
|
||||||
|
self.assertEqual(self.mod.clamp(5.0, 0.0, 10.0), 5.0)
|
||||||
|
|
||||||
|
def test_below(self):
|
||||||
|
self.assertEqual(self.mod.clamp(-5.0, 0.0, 10.0), 0.0)
|
||||||
|
|
||||||
|
def test_above(self):
|
||||||
|
self.assertEqual(self.mod.clamp(15.0, 0.0, 10.0), 10.0)
|
||||||
|
|
||||||
|
def test_negative_range(self):
|
||||||
|
self.assertEqual(self.mod.clamp(-50.0, -45.0, 45.0), -45.0)
|
||||||
|
|
||||||
|
|
||||||
|
class TestBboxArea(unittest.TestCase):
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls):
|
||||||
|
cls.mod = _load_mod()
|
||||||
|
|
||||||
|
def test_area(self):
|
||||||
|
f = _face(bbox_w=0.3, bbox_h=0.4)
|
||||||
|
self.assertAlmostEqual(self.mod.bbox_area(f), 0.12)
|
||||||
|
|
||||||
|
def test_zero(self):
|
||||||
|
f = _face(bbox_w=0.0, bbox_h=0.2)
|
||||||
|
self.assertAlmostEqual(self.mod.bbox_area(f), 0.0)
|
||||||
|
|
||||||
|
|
||||||
|
class TestPickClosestFace(unittest.TestCase):
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls):
|
||||||
|
cls.mod = _load_mod()
|
||||||
|
|
||||||
|
def test_empty(self):
|
||||||
|
self.assertIsNone(self.mod.pick_closest_face([]))
|
||||||
|
|
||||||
|
def test_single(self):
|
||||||
|
f = _face(bbox_w=0.2, bbox_h=0.2)
|
||||||
|
self.assertIs(self.mod.pick_closest_face([f]), f)
|
||||||
|
|
||||||
|
def test_picks_largest_area(self):
|
||||||
|
small = _face(bbox_w=0.1, bbox_h=0.1)
|
||||||
|
big = _face(bbox_w=0.4, bbox_h=0.4)
|
||||||
|
self.assertIs(self.mod.pick_closest_face([small, big]), big)
|
||||||
|
self.assertIs(self.mod.pick_closest_face([big, small]), big)
|
||||||
|
|
||||||
|
def test_three_faces(self):
|
||||||
|
faces = [_face(bbox_w=0.1, bbox_h=0.1),
|
||||||
|
_face(bbox_w=0.5, bbox_h=0.5),
|
||||||
|
_face(bbox_w=0.2, bbox_h=0.2)]
|
||||||
|
self.assertIs(self.mod.pick_closest_face(faces), faces[1])
|
||||||
|
|
||||||
|
|
||||||
|
class TestFaceImageError(unittest.TestCase):
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls):
|
||||||
|
cls.mod = _load_mod()
|
||||||
|
|
||||||
|
def test_centered_face_zero_error(self):
|
||||||
|
f = _face(bbox_x=0.4, bbox_y=0.4, bbox_w=0.2, bbox_h=0.2)
|
||||||
|
pan, tilt = self.mod.face_image_error(f, 60.0, 45.0)
|
||||||
|
self.assertAlmostEqual(pan, 0.0)
|
||||||
|
self.assertAlmostEqual(tilt, 0.0)
|
||||||
|
|
||||||
|
def test_right_of_centre(self):
|
||||||
|
# cx = 0.7 + 0.1 = 0.8, error = 0.3 * 60 = 18°
|
||||||
|
f = _face(bbox_x=0.7, bbox_y=0.4, bbox_w=0.2, bbox_h=0.2)
|
||||||
|
pan, _ = self.mod.face_image_error(f, 60.0, 45.0)
|
||||||
|
self.assertAlmostEqual(pan, 18.0)
|
||||||
|
|
||||||
|
def test_left_of_centre(self):
|
||||||
|
# cx = 0.1 + 0.1 = 0.2, error = -0.3 * 60 = -18°
|
||||||
|
f = _face(bbox_x=0.1, bbox_y=0.4, bbox_w=0.2, bbox_h=0.2)
|
||||||
|
pan, _ = self.mod.face_image_error(f, 60.0, 45.0)
|
||||||
|
self.assertAlmostEqual(pan, -18.0)
|
||||||
|
|
||||||
|
def test_below_centre(self):
|
||||||
|
# cy = 0.7 + 0.1 = 0.8, error = 0.3 * 45 = 13.5°
|
||||||
|
f = _face(bbox_x=0.4, bbox_y=0.7, bbox_w=0.2, bbox_h=0.2)
|
||||||
|
_, tilt = self.mod.face_image_error(f, 60.0, 45.0)
|
||||||
|
self.assertAlmostEqual(tilt, 13.5)
|
||||||
|
|
||||||
|
def test_above_centre(self):
|
||||||
|
# cy = 0.1 + 0.1 = 0.2, error = -0.3 * 45 = -13.5°
|
||||||
|
f = _face(bbox_x=0.4, bbox_y=0.1, bbox_w=0.2, bbox_h=0.2)
|
||||||
|
_, tilt = self.mod.face_image_error(f, 60.0, 45.0)
|
||||||
|
self.assertAlmostEqual(tilt, -13.5)
|
||||||
|
|
||||||
|
|
||||||
|
class TestStepTowardZero(unittest.TestCase):
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls):
|
||||||
|
cls.mod = _load_mod()
|
||||||
|
|
||||||
|
def test_positive_large(self):
|
||||||
|
result = self.mod._step_toward_zero(10.0, 1.0)
|
||||||
|
self.assertAlmostEqual(result, 9.0)
|
||||||
|
|
||||||
|
def test_negative_large(self):
|
||||||
|
result = self.mod._step_toward_zero(-10.0, 1.0)
|
||||||
|
self.assertAlmostEqual(result, -9.0)
|
||||||
|
|
||||||
|
def test_smaller_than_step(self):
|
||||||
|
result = self.mod._step_toward_zero(0.5, 1.0)
|
||||||
|
self.assertAlmostEqual(result, 0.0)
|
||||||
|
|
||||||
|
def test_exact_step(self):
|
||||||
|
result = self.mod._step_toward_zero(1.0, 1.0)
|
||||||
|
self.assertAlmostEqual(result, 0.0)
|
||||||
|
|
||||||
|
def test_zero(self):
|
||||||
|
result = self.mod._step_toward_zero(0.0, 1.0)
|
||||||
|
self.assertAlmostEqual(result, 0.0)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Tests: PIDController ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestPIDController(unittest.TestCase):
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls):
|
||||||
|
cls.mod = _load_mod()
|
||||||
|
|
||||||
|
def _pid(self, kp=1.0, ki=0.0, kd=0.0, vel_limit=100.0, windup=100.0):
|
||||||
|
return self.mod.PIDController(kp, ki, kd, vel_limit, windup)
|
||||||
|
|
||||||
|
def test_proportional_only(self):
|
||||||
|
pid = self._pid(kp=2.0)
|
||||||
|
out = pid.update(5.0, 0.1)
|
||||||
|
self.assertAlmostEqual(out, 10.0)
|
||||||
|
|
||||||
|
def test_zero_error_zero_output(self):
|
||||||
|
pid = self._pid(kp=5.0)
|
||||||
|
self.assertAlmostEqual(pid.update(0.0, 0.1), 0.0)
|
||||||
|
|
||||||
|
def test_integral_accumulates(self):
|
||||||
|
pid = self._pid(kp=0.0, ki=1.0)
|
||||||
|
pid.update(1.0, 0.1) # integral = 0.1
|
||||||
|
out = pid.update(1.0, 0.1) # integral = 0.2, output = 0.2
|
||||||
|
self.assertAlmostEqual(out, 0.2, places=5)
|
||||||
|
|
||||||
|
def test_derivative_first_tick_zero(self):
|
||||||
|
pid = self._pid(kp=0.0, kd=1.0)
|
||||||
|
out = pid.update(10.0, 0.1)
|
||||||
|
self.assertAlmostEqual(out, 0.0) # first tick: derivative = 0
|
||||||
|
|
||||||
|
def test_derivative_second_tick(self):
|
||||||
|
pid = self._pid(kp=0.0, kd=1.0)
|
||||||
|
pid.update(0.0, 0.1) # first tick
|
||||||
|
out = pid.update(10.0, 0.1) # de/dt = 10/0.1 = 100
|
||||||
|
self.assertAlmostEqual(out, 100.0)
|
||||||
|
|
||||||
|
def test_velocity_clamped(self):
|
||||||
|
pid = self._pid(kp=100.0, vel_limit=10.0)
|
||||||
|
out = pid.update(5.0, 0.1)
|
||||||
|
self.assertAlmostEqual(out, 10.0)
|
||||||
|
|
||||||
|
def test_velocity_clamped_negative(self):
|
||||||
|
pid = self._pid(kp=100.0, vel_limit=10.0)
|
||||||
|
out = pid.update(-5.0, 0.1)
|
||||||
|
self.assertAlmostEqual(out, -10.0)
|
||||||
|
|
||||||
|
def test_antiwindup(self):
|
||||||
|
pid = self._pid(kp=0.0, ki=1.0, windup=5.0)
|
||||||
|
for _ in range(100):
|
||||||
|
pid.update(1.0, 0.1) # would accumulate 10, clamped at 5
|
||||||
|
out = pid.update(0.0, 0.1)
|
||||||
|
self.assertAlmostEqual(out, 5.0, places=3)
|
||||||
|
|
||||||
|
def test_reset_clears_integral(self):
|
||||||
|
pid = self._pid(ki=1.0)
|
||||||
|
pid.update(1.0, 1.0)
|
||||||
|
pid.reset()
|
||||||
|
out = pid.update(0.0, 0.1)
|
||||||
|
self.assertAlmostEqual(out, 0.0)
|
||||||
|
|
||||||
|
def test_reset_clears_derivative(self):
|
||||||
|
pid = self._pid(kp=0.0, kd=1.0)
|
||||||
|
pid.update(10.0, 0.1) # sets prev_error
|
||||||
|
pid.reset()
|
||||||
|
out = pid.update(10.0, 0.1) # after reset, first tick = 0 derivative
|
||||||
|
self.assertAlmostEqual(out, 0.0)
|
||||||
|
|
||||||
|
def test_zero_dt_returns_zero(self):
|
||||||
|
pid = self._pid(kp=10.0)
|
||||||
|
self.assertAlmostEqual(pid.update(5.0, 0.0), 0.0)
|
||||||
|
|
||||||
|
def test_negative_dt_returns_zero(self):
|
||||||
|
pid = self._pid(kp=10.0)
|
||||||
|
self.assertAlmostEqual(pid.update(5.0, -0.1), 0.0)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Tests: node initialisation ────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestNodeInit(unittest.TestCase):
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls):
|
||||||
|
cls.mod = _load_mod()
|
||||||
|
|
||||||
|
def test_instantiates(self):
|
||||||
|
node = _make_node(self.mod)
|
||||||
|
self.assertIsNotNone(node)
|
||||||
|
|
||||||
|
def test_pan_pub(self):
|
||||||
|
node = _make_node(self.mod)
|
||||||
|
self.assertIn("/saltybot/head_pan", node._pubs)
|
||||||
|
|
||||||
|
def test_tilt_pub(self):
|
||||||
|
node = _make_node(self.mod)
|
||||||
|
self.assertIn("/saltybot/head_tilt", node._pubs)
|
||||||
|
|
||||||
|
def test_faces_sub(self):
|
||||||
|
node = _make_node(self.mod)
|
||||||
|
self.assertIn("/social/faces/detected", node._subs)
|
||||||
|
|
||||||
|
def test_timer_registered(self):
|
||||||
|
node = _make_node(self.mod)
|
||||||
|
self.assertGreater(len(node._timers), 0)
|
||||||
|
|
||||||
|
def test_initial_pan_zero(self):
|
||||||
|
node = _make_node(self.mod)
|
||||||
|
self.assertAlmostEqual(node._pan_cmd, 0.0)
|
||||||
|
|
||||||
|
def test_initial_tilt_zero(self):
|
||||||
|
node = _make_node(self.mod)
|
||||||
|
self.assertAlmostEqual(node._tilt_cmd, 0.0)
|
||||||
|
|
||||||
|
def test_custom_fov(self):
|
||||||
|
node = _make_node(self.mod, fov_h_deg=90.0)
|
||||||
|
self.assertAlmostEqual(node._fov_h, 90.0)
|
||||||
|
|
||||||
|
def test_custom_pan_limit(self):
|
||||||
|
node = _make_node(self.mod, pan_limit_deg=45.0)
|
||||||
|
self.assertAlmostEqual(node._pan_limit, 45.0)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Tests: face callback ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestFaceCallback(unittest.TestCase):
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls):
|
||||||
|
cls.mod = _load_mod()
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.node = _make_node(self.mod)
|
||||||
|
|
||||||
|
def test_empty_msg_no_face(self):
|
||||||
|
self.node._on_faces(_FaceDetectionArray([]))
|
||||||
|
self.assertIsNone(self.node._latest_face)
|
||||||
|
|
||||||
|
def test_single_face_stored(self):
|
||||||
|
f = _centered_face()
|
||||||
|
self.node._on_faces(_FaceDetectionArray([f]))
|
||||||
|
self.assertIs(self.node._latest_face, f)
|
||||||
|
|
||||||
|
def test_closest_face_picked(self):
|
||||||
|
small = _face(bbox_w=0.1, bbox_h=0.1, face_id=1)
|
||||||
|
big = _face(bbox_w=0.5, bbox_h=0.5, face_id=2)
|
||||||
|
self.node._on_faces(_FaceDetectionArray([small, big]))
|
||||||
|
self.assertIs(self.node._latest_face, big)
|
||||||
|
|
||||||
|
def test_timestamp_updated_on_face(self):
|
||||||
|
before = time.monotonic()
|
||||||
|
f = _centered_face()
|
||||||
|
self.node._on_faces(_FaceDetectionArray([f]))
|
||||||
|
self.assertGreaterEqual(self.node._last_face_t, before)
|
||||||
|
|
||||||
|
def test_timestamp_not_updated_on_empty(self):
|
||||||
|
self.node._last_face_t = 0.0
|
||||||
|
self.node._on_faces(_FaceDetectionArray([]))
|
||||||
|
self.assertEqual(self.node._last_face_t, 0.0)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Tests: control loop ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestControlLoop(unittest.TestCase):
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls):
|
||||||
|
cls.mod = _load_mod()
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.node = _make_node(self.mod, dead_zone=0.0,
|
||||||
|
ki_pan=0.0, kd_pan=0.0,
|
||||||
|
ki_tilt=0.0, kd_tilt=0.0)
|
||||||
|
self.pan_pub = self.node._pubs["/saltybot/head_pan"]
|
||||||
|
self.tilt_pub = self.node._pubs["/saltybot/head_tilt"]
|
||||||
|
|
||||||
|
def _tick(self, dt=0.05):
|
||||||
|
self.node._last_tick = time.monotonic() - dt
|
||||||
|
self.node._control_cb()
|
||||||
|
|
||||||
|
def test_no_face_publishes_zero_initially(self):
|
||||||
|
self._tick()
|
||||||
|
self.assertAlmostEqual(self.pan_pub.msgs[-1].data, 0.0)
|
||||||
|
self.assertAlmostEqual(self.tilt_pub.msgs[-1].data, 0.0)
|
||||||
|
|
||||||
|
def test_centered_face_minimal_movement(self):
|
||||||
|
f = _centered_face() # cx=cy=0.5, error=0
|
||||||
|
self.node._on_faces(_FaceDetectionArray([f]))
|
||||||
|
self.node._last_face_t = time.monotonic()
|
||||||
|
self._tick()
|
||||||
|
# With dead_zone=0 and error=0, pid output=0, cmd stays 0
|
||||||
|
self.assertAlmostEqual(self.pan_pub.msgs[-1].data, 0.0, places=4)
|
||||||
|
self.assertAlmostEqual(self.tilt_pub.msgs[-1].data, 0.0, places=4)
|
||||||
|
|
||||||
|
def test_right_face_pans_right(self):
|
||||||
|
# Face right of centre → positive pan error → pan_cmd increases
|
||||||
|
f = _face(bbox_x=0.7, bbox_y=0.4, bbox_w=0.2, bbox_h=0.2)
|
||||||
|
self.node._on_faces(_FaceDetectionArray([f]))
|
||||||
|
self.node._last_face_t = time.monotonic()
|
||||||
|
self._tick()
|
||||||
|
self.assertGreater(self.pan_pub.msgs[-1].data, 0.0)
|
||||||
|
|
||||||
|
def test_left_face_pans_left(self):
|
||||||
|
f = _face(bbox_x=0.1, bbox_y=0.4, bbox_w=0.2, bbox_h=0.2)
|
||||||
|
self.node._on_faces(_FaceDetectionArray([f]))
|
||||||
|
self.node._last_face_t = time.monotonic()
|
||||||
|
self._tick()
|
||||||
|
self.assertLess(self.pan_pub.msgs[-1].data, 0.0)
|
||||||
|
|
||||||
|
def test_low_face_tilts_down(self):
|
||||||
|
f = _face(bbox_x=0.4, bbox_y=0.7, bbox_w=0.2, bbox_h=0.2)
|
||||||
|
self.node._on_faces(_FaceDetectionArray([f]))
|
||||||
|
self.node._last_face_t = time.monotonic()
|
||||||
|
self._tick()
|
||||||
|
self.assertGreater(self.tilt_pub.msgs[-1].data, 0.0)
|
||||||
|
|
||||||
|
def test_high_face_tilts_up(self):
|
||||||
|
f = _face(bbox_x=0.4, bbox_y=0.1, bbox_w=0.2, bbox_h=0.2)
|
||||||
|
self.node._on_faces(_FaceDetectionArray([f]))
|
||||||
|
self.node._last_face_t = time.monotonic()
|
||||||
|
self._tick()
|
||||||
|
self.assertLess(self.tilt_pub.msgs[-1].data, 0.0)
|
||||||
|
|
||||||
|
def test_pan_clamped_to_limit(self):
|
||||||
|
node = _make_node(self.mod, kp_pan=1000.0, ki_pan=0.0, kd_pan=0.0,
|
||||||
|
pan_limit_deg=45.0, pan_vel_limit=9999.0,
|
||||||
|
dead_zone=0.0)
|
||||||
|
pub = node._pubs["/saltybot/head_pan"]
|
||||||
|
f = _face(bbox_x=0.9, bbox_y=0.4, bbox_w=0.1, bbox_h=0.2)
|
||||||
|
node._on_faces(_FaceDetectionArray([f]))
|
||||||
|
node._last_face_t = time.monotonic()
|
||||||
|
# Run many ticks to accumulate
|
||||||
|
for _ in range(50):
|
||||||
|
node._last_tick = time.monotonic() - 0.05
|
||||||
|
node._control_cb()
|
||||||
|
self.assertLessEqual(pub.msgs[-1].data, 45.0)
|
||||||
|
|
||||||
|
def test_tilt_clamped_to_limit(self):
|
||||||
|
node = _make_node(self.mod, kp_tilt=1000.0, ki_tilt=0.0, kd_tilt=0.0,
|
||||||
|
tilt_limit_deg=20.0, tilt_vel_limit=9999.0,
|
||||||
|
dead_zone=0.0)
|
||||||
|
pub = node._pubs["/saltybot/head_tilt"]
|
||||||
|
f = _face(bbox_x=0.4, bbox_y=0.9, bbox_w=0.2, bbox_h=0.1)
|
||||||
|
node._on_faces(_FaceDetectionArray([f]))
|
||||||
|
node._last_face_t = time.monotonic()
|
||||||
|
for _ in range(50):
|
||||||
|
node._last_tick = time.monotonic() - 0.05
|
||||||
|
node._control_cb()
|
||||||
|
self.assertLessEqual(pub.msgs[-1].data, 20.0)
|
||||||
|
|
||||||
|
def test_lost_face_returns_to_zero(self):
|
||||||
|
node = _make_node(self.mod, kp_pan=10.0, ki_pan=0.0, kd_pan=0.0,
|
||||||
|
dead_zone=0.0, return_rate_deg_s=90.0,
|
||||||
|
lost_timeout_s=0.01)
|
||||||
|
pub = node._pubs["/saltybot/head_pan"]
|
||||||
|
f = _face(bbox_x=0.7, bbox_y=0.4, bbox_w=0.2, bbox_h=0.2)
|
||||||
|
node._on_faces(_FaceDetectionArray([f]))
|
||||||
|
node._last_face_t = time.monotonic()
|
||||||
|
# Build up some pan
|
||||||
|
for _ in range(5):
|
||||||
|
node._last_tick = time.monotonic() - 0.05
|
||||||
|
node._control_cb()
|
||||||
|
# Expire face timeout
|
||||||
|
node._last_face_t = time.monotonic() - 10.0
|
||||||
|
for _ in range(20):
|
||||||
|
node._last_tick = time.monotonic() - 0.05
|
||||||
|
node._control_cb()
|
||||||
|
self.assertAlmostEqual(pub.msgs[-1].data, 0.0, places=3)
|
||||||
|
|
||||||
|
def test_publishes_every_tick(self):
|
||||||
|
for _ in range(3):
|
||||||
|
self._tick()
|
||||||
|
self.assertEqual(len(self.pan_pub.msgs), 3)
|
||||||
|
self.assertEqual(len(self.tilt_pub.msgs), 3)
|
||||||
|
|
||||||
|
def test_dead_zone_suppresses_small_error(self):
|
||||||
|
node = _make_node(self.mod, kp_pan=100.0, ki_pan=0.0, kd_pan=0.0,
|
||||||
|
dead_zone=0.1, fov_h_deg=60.0)
|
||||||
|
pub = node._pubs["/saltybot/head_pan"]
|
||||||
|
# Face 2% right of centre — within dead_zone=10% of frame
|
||||||
|
f = _face(bbox_x=0.42, bbox_y=0.4, bbox_w=0.2, bbox_h=0.2)
|
||||||
|
node._on_faces(_FaceDetectionArray([f]))
|
||||||
|
node._last_face_t = time.monotonic()
|
||||||
|
node._last_tick = time.monotonic() - 0.05
|
||||||
|
node._control_cb()
|
||||||
|
self.assertAlmostEqual(pub.msgs[-1].data, 0.0, places=4)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Tests: source-level checks ────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestNodeSrc(unittest.TestCase):
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls):
|
||||||
|
with open(_SRC) as f:
|
||||||
|
cls.src = f.read()
|
||||||
|
|
||||||
|
def test_issue_tag(self):
|
||||||
|
self.assertIn("#279", self.src)
|
||||||
|
|
||||||
|
def test_pan_topic(self):
|
||||||
|
self.assertIn("/saltybot/head_pan", self.src)
|
||||||
|
|
||||||
|
def test_tilt_topic(self):
|
||||||
|
self.assertIn("/saltybot/head_tilt", self.src)
|
||||||
|
|
||||||
|
def test_faces_topic(self):
|
||||||
|
self.assertIn("/social/faces/detected", self.src)
|
||||||
|
|
||||||
|
def test_pid_class(self):
|
||||||
|
self.assertIn("class PIDController", self.src)
|
||||||
|
|
||||||
|
def test_kp_param(self):
|
||||||
|
self.assertIn("kp_pan", self.src)
|
||||||
|
|
||||||
|
def test_ki_param(self):
|
||||||
|
self.assertIn("ki_pan", self.src)
|
||||||
|
|
||||||
|
def test_kd_param(self):
|
||||||
|
self.assertIn("kd_pan", self.src)
|
||||||
|
|
||||||
|
def test_fov_param(self):
|
||||||
|
self.assertIn("fov_h_deg", self.src)
|
||||||
|
|
||||||
|
def test_pan_limit_param(self):
|
||||||
|
self.assertIn("pan_limit_deg", self.src)
|
||||||
|
|
||||||
|
def test_dead_zone_param(self):
|
||||||
|
self.assertIn("dead_zone", self.src)
|
||||||
|
|
||||||
|
def test_pick_closest_face(self):
|
||||||
|
self.assertIn("pick_closest_face", self.src)
|
||||||
|
|
||||||
|
def test_main_defined(self):
|
||||||
|
self.assertIn("def main", self.src)
|
||||||
|
|
||||||
|
def test_antiwindup(self):
|
||||||
|
self.assertIn("windup", self.src)
|
||||||
|
|
||||||
|
def test_threading_lock(self):
|
||||||
|
self.assertIn("threading.Lock", self.src)
|
||||||
|
|
||||||
|
|
||||||
|
class TestConfig(unittest.TestCase):
|
||||||
|
_CONFIG = (
|
||||||
|
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
|
||||||
|
"saltybot_social/config/face_track_servo_params.yaml"
|
||||||
|
)
|
||||||
|
_LAUNCH = (
|
||||||
|
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
|
||||||
|
"saltybot_social/launch/face_track_servo.launch.py"
|
||||||
|
)
|
||||||
|
_SETUP = (
|
||||||
|
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
|
||||||
|
"saltybot_social/setup.py"
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_config_exists(self):
|
||||||
|
import os; self.assertTrue(os.path.exists(self._CONFIG))
|
||||||
|
|
||||||
|
def test_config_kp_pan(self):
|
||||||
|
with open(self._CONFIG) as f: c = f.read()
|
||||||
|
self.assertIn("kp_pan", c)
|
||||||
|
|
||||||
|
def test_config_fov(self):
|
||||||
|
with open(self._CONFIG) as f: c = f.read()
|
||||||
|
self.assertIn("fov_h_deg", c)
|
||||||
|
|
||||||
|
def test_config_pan_limit(self):
|
||||||
|
with open(self._CONFIG) as f: c = f.read()
|
||||||
|
self.assertIn("pan_limit_deg", c)
|
||||||
|
|
||||||
|
def test_config_dead_zone(self):
|
||||||
|
with open(self._CONFIG) as f: c = f.read()
|
||||||
|
self.assertIn("dead_zone", c)
|
||||||
|
|
||||||
|
def test_launch_exists(self):
|
||||||
|
import os; self.assertTrue(os.path.exists(self._LAUNCH))
|
||||||
|
|
||||||
|
def test_launch_kp_pan_arg(self):
|
||||||
|
with open(self._LAUNCH) as f: c = f.read()
|
||||||
|
self.assertIn("kp_pan", c)
|
||||||
|
|
||||||
|
def test_launch_pan_limit_arg(self):
|
||||||
|
with open(self._LAUNCH) as f: c = f.read()
|
||||||
|
self.assertIn("pan_limit_deg", c)
|
||||||
|
|
||||||
|
def test_entry_point(self):
|
||||||
|
with open(self._SETUP) as f: c = f.read()
|
||||||
|
self.assertIn("face_track_servo_node", c)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
Loading…
x
Reference in New Issue
Block a user