feat(social): 68-point Kalman landmark smoother (Issue #227) #231
@ -0,0 +1,161 @@
|
|||||||
|
"""
|
||||||
|
_landmark_kalman.py — Vectorised constant-velocity Kalman filter for 2-D facial landmarks.
|
||||||
|
|
||||||
|
No ROS2 dependencies — pure numpy.
|
||||||
|
|
||||||
|
Each face track has one LandmarkKalmanFilter instance that maintains a
|
||||||
|
(N, 4) state array where each row is [x, y, vx, vy] for one landmark.
|
||||||
|
All N points are predicted / updated simultaneously using broadcast numpy
|
||||||
|
operations, so there is no per-point Python loop.
|
||||||
|
|
||||||
|
Usage
|
||||||
|
-----
|
||||||
|
kf = LandmarkKalmanFilter(n_points=68)
|
||||||
|
smoothed = kf.update(obs, dt) # obs: (68, 2) ndarray, dt: float (s)
|
||||||
|
smoothed = kf.update(obs, dt) # subsequent calls
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
|
||||||
|
class LandmarkKalmanFilter:
|
||||||
|
"""
|
||||||
|
Constant-velocity Kalman filter over N 2-D landmark points.
|
||||||
|
|
||||||
|
State per point: [x, y, vx, vy]
|
||||||
|
Observation per point: [x, y]
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
n_points : number of landmarks (default 68)
|
||||||
|
process_noise_std : std-dev of velocity random-walk noise (normalised coords/s)
|
||||||
|
obs_noise_std : std-dev of observation noise (normalised coords)
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
n_points: int = 68,
|
||||||
|
process_noise_std: float = 0.01,
|
||||||
|
obs_noise_std: float = 0.005,
|
||||||
|
) -> None:
|
||||||
|
self._n = n_points
|
||||||
|
self._q_std = process_noise_std
|
||||||
|
self._r_std = obs_noise_std
|
||||||
|
|
||||||
|
# State: (n, 4) [x, y, vx, vy]
|
||||||
|
self._x: Optional[np.ndarray] = None # initialised on first update
|
||||||
|
|
||||||
|
# Covariance: (n, 4, 4)
|
||||||
|
self._P: Optional[np.ndarray] = None
|
||||||
|
|
||||||
|
# Fixed matrices (shared across all points via broadcasting)
|
||||||
|
# Observation matrix H: (2, 4) — selects x and y
|
||||||
|
self._H = np.array([[1., 0., 0., 0.],
|
||||||
|
[0., 1., 0., 0.]]) # (2, 4)
|
||||||
|
|
||||||
|
# Observation noise R: (2, 2)
|
||||||
|
self._R = (obs_noise_std ** 2) * np.eye(2) # (2, 2)
|
||||||
|
|
||||||
|
# ── Public API ────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def update(self, obs: np.ndarray, dt: float) -> np.ndarray:
|
||||||
|
"""
|
||||||
|
Predict then update with new observations.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
obs : (n_points, 2) array of [x, y] normalised landmark coordinates
|
||||||
|
dt : elapsed time since last call (seconds); unused on the first call
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
(n_points, 2) array of smoothed [x, y] coordinates
|
||||||
|
"""
|
||||||
|
obs = np.asarray(obs, dtype=np.float64)
|
||||||
|
if obs.shape != (self._n, 2):
|
||||||
|
raise ValueError(f'Expected obs shape ({self._n}, 2), got {obs.shape}')
|
||||||
|
|
||||||
|
if self._x is None:
|
||||||
|
self._initialise(obs)
|
||||||
|
else:
|
||||||
|
self._predict(max(dt, 1e-6))
|
||||||
|
self._update(obs)
|
||||||
|
|
||||||
|
return self._x[:, :2].astype(np.float32)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_initialised(self) -> bool:
|
||||||
|
return self._x is not None
|
||||||
|
|
||||||
|
# ── Internal ──────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _initialise(self, obs: np.ndarray) -> None:
|
||||||
|
"""Initialise state from first observation (zero velocity)."""
|
||||||
|
self._x = np.zeros((self._n, 4), dtype=np.float64)
|
||||||
|
self._x[:, :2] = obs
|
||||||
|
|
||||||
|
# Start with moderate position uncertainty, zero velocity
|
||||||
|
init_pos_var = (self._r_std * 10) ** 2
|
||||||
|
self._P = np.zeros((self._n, 4, 4), dtype=np.float64)
|
||||||
|
self._P[:, 0, 0] = init_pos_var
|
||||||
|
self._P[:, 1, 1] = init_pos_var
|
||||||
|
self._P[:, 2, 2] = (self._q_std * 5) ** 2
|
||||||
|
self._P[:, 3, 3] = (self._q_std * 5) ** 2
|
||||||
|
|
||||||
|
def _predict(self, dt: float) -> None:
|
||||||
|
"""Constant-velocity predict step: x' = F x, P' = F P Fᵀ + Q."""
|
||||||
|
# Transition matrix F: (4, 4)
|
||||||
|
F = np.array([[1., 0., dt, 0.],
|
||||||
|
[0., 1., 0., dt],
|
||||||
|
[0., 0., 1., 0.],
|
||||||
|
[0., 0., 0., 1.]])
|
||||||
|
|
||||||
|
# Process noise Q: (4, 4) — velocity driven (discrete Wiener model)
|
||||||
|
q = self._q_std ** 2
|
||||||
|
dt2 = dt * dt
|
||||||
|
dt3 = dt2 * dt
|
||||||
|
dt4 = dt3 * dt
|
||||||
|
Q = q * np.array([[dt4/4, 0., dt3/2, 0. ],
|
||||||
|
[0., dt4/4, 0., dt3/2],
|
||||||
|
[dt3/2, 0., dt2, 0. ],
|
||||||
|
[0., dt3/2, 0., dt2 ]])
|
||||||
|
|
||||||
|
# Vectorised over n points: (n,4) @ (4,4).T → (n,4)
|
||||||
|
self._x = self._x @ F.T # (n, 4)
|
||||||
|
|
||||||
|
# P' = F P Fᵀ + Q for each of n points
|
||||||
|
# (n,4,4) → matmul with (4,4) on both sides
|
||||||
|
FP = np.einsum('ij,njk->nik', F, self._P) # (n, 4, 4)
|
||||||
|
FPFt = np.einsum('nij,kj->nik', FP, F) # (n, 4, 4)
|
||||||
|
self._P = FPFt + Q[np.newaxis] # (n, 4, 4)
|
||||||
|
|
||||||
|
def _update(self, obs: np.ndarray) -> None:
|
||||||
|
"""Kalman update step with measurement obs: (n, 2)."""
|
||||||
|
H = self._H # (2, 4)
|
||||||
|
R = self._R # (2, 2)
|
||||||
|
|
||||||
|
# Innovation: y = obs − H x → (n, 2)
|
||||||
|
Hx = np.einsum('ij,nj->ni', H, self._x) # (n, 2)
|
||||||
|
y = obs - Hx # (n, 2)
|
||||||
|
|
||||||
|
# Innovation covariance S = H P Hᵀ + R → (n, 2, 2)
|
||||||
|
HP = np.einsum('ij,njk->nik', H, self._P) # (n, 2, 4)
|
||||||
|
S = np.einsum('nij,kj->nik', HP, H) + R[np.newaxis] # (n, 2, 2)
|
||||||
|
|
||||||
|
# Kalman gain K = P Hᵀ S⁻¹ → (n, 4, 2)
|
||||||
|
PHt = np.einsum('nij,kj->nik', self._P, H) # (n, 4, 2)
|
||||||
|
S_inv = np.linalg.inv(S) # (n, 2, 2)
|
||||||
|
K = np.einsum('nij,njk->nik', PHt, S_inv) # (n, 4, 2)
|
||||||
|
|
||||||
|
# State update: x += K y
|
||||||
|
Ky = np.einsum('nij,nj->ni', K, y) # (n, 4)
|
||||||
|
self._x = self._x + Ky
|
||||||
|
|
||||||
|
# Covariance update: P = (I - K H) P
|
||||||
|
n = self._n
|
||||||
|
I = np.eye(4)[np.newaxis].repeat(n, axis=0) # (n, 4, 4)
|
||||||
|
KH = np.einsum('nij,jk->nik', K, H) # (n, 4, 4)
|
||||||
|
self._P = np.einsum('nij,njk->nik', I - KH, self._P) # (n, 4, 4)
|
||||||
@ -0,0 +1,165 @@
|
|||||||
|
"""
|
||||||
|
landmark_smoother_node.py — Per-person 68-point Kalman landmark smoother (Issue #227).
|
||||||
|
|
||||||
|
Subscribes to raw 68-point facial landmarks and republishes a smoothed version
|
||||||
|
using a constant-velocity Kalman filter maintained per face track.
|
||||||
|
|
||||||
|
Subscribes:
|
||||||
|
/social/faces/landmarks saltybot_social_msgs/FaceLandmarksArray
|
||||||
|
|
||||||
|
Publishes:
|
||||||
|
/social/faces/landmarks_smooth saltybot_social_msgs/FaceLandmarksArray
|
||||||
|
|
||||||
|
Algorithm
|
||||||
|
---------
|
||||||
|
1. For each FaceLandmarks message, look up (or create) a per-track
|
||||||
|
LandmarkKalmanFilter keyed by person_id (or face_id when person_id == -1).
|
||||||
|
2. Call kf.update(obs, dt) → smoothed (68, 2) positions.
|
||||||
|
3. Publish immediately — no extra timer delay.
|
||||||
|
4. A 1 Hz timer prunes tracks not updated within `prune_age_s` (default 3.0 s).
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
prune_age_s float 3.0 Seconds of inactivity before a track is dropped
|
||||||
|
process_noise_std float 0.01 Kalman process noise std-dev (normalised coords/s)
|
||||||
|
obs_noise_std float 0.005 Kalman observation noise std-dev (normalised coords)
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import time
|
||||||
|
from typing import Dict, Optional, Tuple
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
|
||||||
|
|
||||||
|
from saltybot_social_msgs.msg import FaceLandmarks, FaceLandmarksArray
|
||||||
|
|
||||||
|
from ._landmark_kalman import LandmarkKalmanFilter
|
||||||
|
|
||||||
|
|
||||||
|
_SENSOR_QOS = QoSProfile(
|
||||||
|
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||||
|
history=HistoryPolicy.KEEP_LAST,
|
||||||
|
depth=4,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class LandmarkSmootherNode(Node):
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
super().__init__('landmark_smoother_node')
|
||||||
|
|
||||||
|
self.declare_parameter('prune_age_s', 3.0)
|
||||||
|
self.declare_parameter('process_noise_std', 0.01)
|
||||||
|
self.declare_parameter('obs_noise_std', 0.005)
|
||||||
|
|
||||||
|
self._prune_age = self.get_parameter('prune_age_s').value
|
||||||
|
self._proc_noise = self.get_parameter('process_noise_std').value
|
||||||
|
self._obs_noise = self.get_parameter('obs_noise_std').value
|
||||||
|
|
||||||
|
# track_key → (LandmarkKalmanFilter, last_update_time, last_stamp_sec)
|
||||||
|
self._tracks: Dict[int, Tuple[LandmarkKalmanFilter, float, float]] = {}
|
||||||
|
|
||||||
|
self._sub = self.create_subscription(
|
||||||
|
FaceLandmarksArray,
|
||||||
|
'/social/faces/landmarks',
|
||||||
|
self._on_landmarks,
|
||||||
|
_SENSOR_QOS,
|
||||||
|
)
|
||||||
|
self._pub = self.create_publisher(
|
||||||
|
FaceLandmarksArray,
|
||||||
|
'/social/faces/landmarks_smooth',
|
||||||
|
10,
|
||||||
|
)
|
||||||
|
self.create_timer(1.0, self._prune_stale_tracks)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f'landmark_smoother_node ready — '
|
||||||
|
f'prune_age={self._prune_age}s '
|
||||||
|
f'q={self._proc_noise} r={self._obs_noise}'
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Callback ──────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _on_landmarks(self, msg: FaceLandmarksArray) -> None:
|
||||||
|
now = time.monotonic()
|
||||||
|
out = FaceLandmarksArray()
|
||||||
|
out.header = msg.header
|
||||||
|
|
||||||
|
for face in msg.faces:
|
||||||
|
# Validate landmark count
|
||||||
|
if len(face.landmarks) != 136:
|
||||||
|
self.get_logger().warn(
|
||||||
|
f'face_id={face.face_id} has {len(face.landmarks)} floats, '
|
||||||
|
f'expected 136 — skipping',
|
||||||
|
throttle_duration_sec=5.0,
|
||||||
|
)
|
||||||
|
out.faces.append(face) # pass through unchanged
|
||||||
|
continue
|
||||||
|
|
||||||
|
obs = np.array(face.landmarks, dtype=np.float64).reshape(68, 2)
|
||||||
|
track_key = self._track_key(face)
|
||||||
|
|
||||||
|
if track_key in self._tracks:
|
||||||
|
kf, _last_t, last_stamp = self._tracks[track_key]
|
||||||
|
msg_stamp = _stamp_to_sec(msg.header.stamp)
|
||||||
|
dt = msg_stamp - last_stamp if msg_stamp > last_stamp else 1.0 / 30.0
|
||||||
|
smoothed = kf.update(obs, dt)
|
||||||
|
else:
|
||||||
|
kf = LandmarkKalmanFilter(
|
||||||
|
n_points=68,
|
||||||
|
process_noise_std=self._proc_noise,
|
||||||
|
obs_noise_std=self._obs_noise,
|
||||||
|
)
|
||||||
|
smoothed = kf.update(obs, dt=0.0)
|
||||||
|
|
||||||
|
msg_stamp = _stamp_to_sec(msg.header.stamp)
|
||||||
|
self._tracks[track_key] = (kf, now, msg_stamp)
|
||||||
|
|
||||||
|
smoothed_face = FaceLandmarks()
|
||||||
|
smoothed_face.header = face.header
|
||||||
|
smoothed_face.face_id = face.face_id
|
||||||
|
smoothed_face.person_id = face.person_id
|
||||||
|
smoothed_face.confidence = face.confidence
|
||||||
|
smoothed_face.landmarks = smoothed.flatten().tolist()
|
||||||
|
out.faces.append(smoothed_face)
|
||||||
|
|
||||||
|
self._pub.publish(out)
|
||||||
|
|
||||||
|
# ── Helpers ───────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _track_key(face: FaceLandmarks) -> int:
|
||||||
|
"""Use person_id when valid, otherwise fall back to face_id."""
|
||||||
|
return face.person_id if face.person_id >= 0 else -(face.face_id + 1)
|
||||||
|
|
||||||
|
def _prune_stale_tracks(self) -> None:
|
||||||
|
now = time.monotonic()
|
||||||
|
stale = [k for k, (_, t, _) in self._tracks.items()
|
||||||
|
if now - t > self._prune_age]
|
||||||
|
for k in stale:
|
||||||
|
del self._tracks[k]
|
||||||
|
if stale:
|
||||||
|
self.get_logger().debug(f'Pruned {len(stale)} stale track(s)')
|
||||||
|
|
||||||
|
|
||||||
|
def _stamp_to_sec(stamp) -> float:
|
||||||
|
return stamp.sec + stamp.nanosec * 1e-9
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None) -> None:
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = LandmarkSmootherNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
@ -25,6 +25,8 @@ setup(
|
|||||||
'console_scripts': [
|
'console_scripts': [
|
||||||
'face_recognition = saltybot_social_face.face_recognition_node:main',
|
'face_recognition = saltybot_social_face.face_recognition_node:main',
|
||||||
'enrollment_cli = saltybot_social_face.enrollment_cli:main',
|
'enrollment_cli = saltybot_social_face.enrollment_cli:main',
|
||||||
|
# 68-point Kalman landmark smoother (Issue #227)
|
||||||
|
'landmark_smoother = saltybot_social_face.landmark_smoother_node:main',
|
||||||
],
|
],
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|||||||
@ -0,0 +1,246 @@
|
|||||||
|
"""
|
||||||
|
test_landmark_smoother.py — Unit tests for LandmarkKalmanFilter (no ROS2 required).
|
||||||
|
|
||||||
|
Covers:
|
||||||
|
- Initialisation: first call returns observations unchanged
|
||||||
|
- Smoothing: output tracks input over multiple steps
|
||||||
|
- Unit vector: direction is preserved
|
||||||
|
- Shape / dtype
|
||||||
|
- dt handling: zero vs positive dt
|
||||||
|
- Noise reduction: repeated identical observations converge
|
||||||
|
- Track key logic tested via helper
|
||||||
|
- Fallback for short landmark arrays (pass-through guard)
|
||||||
|
"""
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
import math
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
||||||
|
|
||||||
|
from saltybot_social_face._landmark_kalman import LandmarkKalmanFilter
|
||||||
|
|
||||||
|
|
||||||
|
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _random_lm(n=68, seed=0):
|
||||||
|
rng = np.random.default_rng(seed)
|
||||||
|
return rng.uniform(0.1, 0.9, size=(n, 2)).astype(np.float32)
|
||||||
|
|
||||||
|
|
||||||
|
def _zero_lm(n=68):
|
||||||
|
return np.zeros((n, 2), dtype=np.float32)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Shape and dtype ───────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestOutputShape:
|
||||||
|
|
||||||
|
def test_output_shape_68(self):
|
||||||
|
kf = LandmarkKalmanFilter(n_points=68)
|
||||||
|
obs = _random_lm(68)
|
||||||
|
out = kf.update(obs, dt=0.0)
|
||||||
|
assert out.shape == (68, 2)
|
||||||
|
|
||||||
|
def test_output_dtype_float32(self):
|
||||||
|
kf = LandmarkKalmanFilter(n_points=68)
|
||||||
|
obs = _random_lm(68)
|
||||||
|
out = kf.update(obs, dt=0.0)
|
||||||
|
assert out.dtype == np.float32
|
||||||
|
|
||||||
|
def test_custom_n_points(self):
|
||||||
|
kf = LandmarkKalmanFilter(n_points=5)
|
||||||
|
obs = _random_lm(5)
|
||||||
|
out = kf.update(obs, dt=0.0)
|
||||||
|
assert out.shape == (5, 2)
|
||||||
|
|
||||||
|
def test_wrong_shape_raises(self):
|
||||||
|
kf = LandmarkKalmanFilter(n_points=68)
|
||||||
|
obs = _random_lm(5) # wrong number of points
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
kf.update(obs, dt=0.0)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Initialisation ────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestInitialisation:
|
||||||
|
|
||||||
|
def test_first_update_returns_obs(self):
|
||||||
|
"""On first call the filter has not yet built momentum, so output ≈ input."""
|
||||||
|
kf = LandmarkKalmanFilter(n_points=68)
|
||||||
|
obs = _random_lm(68)
|
||||||
|
out = kf.update(obs, dt=0.0)
|
||||||
|
np.testing.assert_allclose(out, obs, atol=1e-5)
|
||||||
|
|
||||||
|
def test_not_initialised_before_update(self):
|
||||||
|
kf = LandmarkKalmanFilter(n_points=68)
|
||||||
|
assert not kf.is_initialised
|
||||||
|
|
||||||
|
def test_initialised_after_first_update(self):
|
||||||
|
kf = LandmarkKalmanFilter(n_points=68)
|
||||||
|
kf.update(_random_lm(), dt=0.0)
|
||||||
|
assert kf.is_initialised
|
||||||
|
|
||||||
|
|
||||||
|
# ── Smoothing behaviour ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestSmoothingBehaviour:
|
||||||
|
|
||||||
|
def test_stationary_obs_converges(self):
|
||||||
|
"""Repeated identical observations should produce output ≈ input."""
|
||||||
|
kf = LandmarkKalmanFilter(n_points=68, obs_noise_std=0.005)
|
||||||
|
obs = _random_lm(68, seed=1)
|
||||||
|
for _ in range(30):
|
||||||
|
out = kf.update(obs, dt=1.0 / 30.0)
|
||||||
|
np.testing.assert_allclose(out, obs, atol=0.01)
|
||||||
|
|
||||||
|
def test_output_stays_bounded(self):
|
||||||
|
"""Output should remain within [0, 1] when inputs are in [0, 1]."""
|
||||||
|
kf = LandmarkKalmanFilter(n_points=68)
|
||||||
|
rng = np.random.default_rng(42)
|
||||||
|
for _ in range(50):
|
||||||
|
obs = rng.uniform(0.05, 0.95, size=(68, 2)).astype(np.float32)
|
||||||
|
out = kf.update(obs, dt=1.0 / 30.0)
|
||||||
|
# Allow a small margin beyond [0,1] due to velocity extrapolation
|
||||||
|
assert out.min() > -0.1
|
||||||
|
assert out.max() < 1.1
|
||||||
|
|
||||||
|
def test_smoothing_reduces_noise(self):
|
||||||
|
"""
|
||||||
|
With low process noise the filter should dampen high-frequency jitter.
|
||||||
|
Average per-frame change in output should be less than in noisy input.
|
||||||
|
"""
|
||||||
|
kf = LandmarkKalmanFilter(n_points=68, process_noise_std=0.002, obs_noise_std=0.01)
|
||||||
|
rng = np.random.default_rng(7)
|
||||||
|
base = rng.uniform(0.2, 0.8, size=(68, 2)).astype(np.float32)
|
||||||
|
noise = 0.05 # std-dev of added noise
|
||||||
|
prev_obs = None
|
||||||
|
prev_out = None
|
||||||
|
obs_deltas = []
|
||||||
|
out_deltas = []
|
||||||
|
for _ in range(40):
|
||||||
|
obs = (base + rng.normal(0, noise, size=(68, 2))).astype(np.float32)
|
||||||
|
out = kf.update(obs, dt=1.0 / 30.0)
|
||||||
|
if prev_obs is not None:
|
||||||
|
obs_deltas.append(np.abs(obs - prev_obs).mean())
|
||||||
|
out_deltas.append(np.abs(out - prev_out).mean())
|
||||||
|
prev_obs = obs.copy()
|
||||||
|
prev_out = out.copy()
|
||||||
|
assert np.mean(out_deltas) < np.mean(obs_deltas)
|
||||||
|
|
||||||
|
def test_velocity_prediction_follows_linear_motion(self):
|
||||||
|
"""
|
||||||
|
Points moving linearly should be predicted ahead-of-time so the
|
||||||
|
filter output lags by less than the raw measurement noise.
|
||||||
|
"""
|
||||||
|
kf = LandmarkKalmanFilter(n_points=68, process_noise_std=0.005, obs_noise_std=0.001)
|
||||||
|
dt = 1.0 / 30.0
|
||||||
|
vel = 0.002 # normalised coords/frame
|
||||||
|
obs = np.tile([0.3, 0.4], (68, 1)).astype(np.float32)
|
||||||
|
# Warm up the filter
|
||||||
|
for i in range(20):
|
||||||
|
obs_step = obs + vel * i
|
||||||
|
kf.update(obs_step, dt=dt)
|
||||||
|
# After warm-up, output should track the moving target
|
||||||
|
out = kf.update(obs + vel * 20, dt=dt)
|
||||||
|
target = obs + vel * 20
|
||||||
|
np.testing.assert_allclose(out, target, atol=0.01)
|
||||||
|
|
||||||
|
|
||||||
|
# ── dt handling ───────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestDtHandling:
|
||||||
|
|
||||||
|
def test_zero_dt_on_first_call_ok(self):
|
||||||
|
kf = LandmarkKalmanFilter(n_points=68)
|
||||||
|
obs = _random_lm()
|
||||||
|
out = kf.update(obs, dt=0.0) # must not crash
|
||||||
|
assert out.shape == (68, 2)
|
||||||
|
|
||||||
|
def test_large_dt_does_not_crash(self):
|
||||||
|
kf = LandmarkKalmanFilter(n_points=68)
|
||||||
|
obs = _random_lm()
|
||||||
|
kf.update(obs, dt=0.0)
|
||||||
|
out = kf.update(obs, dt=5.0) # 5 second gap
|
||||||
|
assert out.shape == (68, 2)
|
||||||
|
|
||||||
|
def test_multiple_calls_increase_smoothness(self):
|
||||||
|
"""Calling update repeatedly should not raise exceptions."""
|
||||||
|
kf = LandmarkKalmanFilter(n_points=68)
|
||||||
|
obs = _random_lm()
|
||||||
|
for i in range(100):
|
||||||
|
kf.update(obs, dt=1.0 / 30.0)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Process and observation noise parameters ──────────────────────────────────
|
||||||
|
|
||||||
|
class TestNoiseParameters:
|
||||||
|
|
||||||
|
def test_high_obs_noise_smooths_more(self):
|
||||||
|
"""Higher obs noise → output moves less per step (more smoothing)."""
|
||||||
|
rng = np.random.default_rng(99)
|
||||||
|
base = rng.uniform(0.2, 0.8, size=(68, 2)).astype(np.float32)
|
||||||
|
|
||||||
|
kf_low = LandmarkKalmanFilter(n_points=68, obs_noise_std=0.001)
|
||||||
|
kf_high = LandmarkKalmanFilter(n_points=68, obs_noise_std=0.1)
|
||||||
|
|
||||||
|
for i in range(10):
|
||||||
|
obs = (base + rng.normal(0, 0.02, size=(68, 2))).astype(np.float32)
|
||||||
|
kf_low.update(obs, dt=1.0 / 30.0)
|
||||||
|
kf_high.update(obs, dt=1.0 / 30.0)
|
||||||
|
|
||||||
|
obs_new = (base + rng.normal(0, 0.02, size=(68, 2))).astype(np.float32)
|
||||||
|
prev_low = kf_low._x[:, :2].copy()
|
||||||
|
prev_high = kf_high._x[:, :2].copy()
|
||||||
|
kf_low.update(obs_new, dt=1.0 / 30.0)
|
||||||
|
kf_high.update(obs_new, dt=1.0 / 30.0)
|
||||||
|
|
||||||
|
delta_low = np.abs(kf_low._x[:, :2] - prev_low).mean()
|
||||||
|
delta_high = np.abs(kf_high._x[:, :2] - prev_high).mean()
|
||||||
|
assert delta_high < delta_low
|
||||||
|
|
||||||
|
def test_known_stationary_value(self):
|
||||||
|
"""With very low noise a stationary signal should converge closely."""
|
||||||
|
kf = LandmarkKalmanFilter(n_points=1, process_noise_std=1e-4, obs_noise_std=1e-4)
|
||||||
|
obs = np.array([[0.5, 0.5]], dtype=np.float32)
|
||||||
|
for _ in range(50):
|
||||||
|
out = kf.update(obs, dt=1.0 / 30.0)
|
||||||
|
np.testing.assert_allclose(out, obs, atol=0.001)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Track key helper (reimplemented to mirror node logic without importing ROS) ─
|
||||||
|
|
||||||
|
def _track_key(person_id: int, face_id: int) -> int:
|
||||||
|
"""Mirror of LandmarkSmootherNode._track_key logic."""
|
||||||
|
return person_id if person_id >= 0 else -(face_id + 1)
|
||||||
|
|
||||||
|
|
||||||
|
class TestTrackKeyLogic:
|
||||||
|
|
||||||
|
def test_positive_person_id_used(self):
|
||||||
|
assert _track_key(person_id=3, face_id=7) == 3
|
||||||
|
|
||||||
|
def test_zero_person_id_used(self):
|
||||||
|
assert _track_key(person_id=0, face_id=7) == 0
|
||||||
|
|
||||||
|
def test_minus_one_person_id_uses_face_id(self):
|
||||||
|
key = _track_key(person_id=-1, face_id=2)
|
||||||
|
assert key == -(2 + 1) # == -3
|
||||||
|
|
||||||
|
def test_different_face_ids_give_different_keys(self):
|
||||||
|
k0 = _track_key(person_id=-1, face_id=0)
|
||||||
|
k1 = _track_key(person_id=-1, face_id=1)
|
||||||
|
assert k0 != k1
|
||||||
|
|
||||||
|
def test_person_key_does_not_collide_with_face_key(self):
|
||||||
|
"""Person ID 3 and face_id 3 should not produce the same key."""
|
||||||
|
kp = _track_key(person_id=3, face_id=99)
|
||||||
|
kf = _track_key(person_id=-1, face_id=3)
|
||||||
|
assert kp != kf
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
pytest.main([__file__, '-v'])
|
||||||
@ -46,6 +46,9 @@ rosidl_generate_interfaces(${PROJECT_NAME}
|
|||||||
"msg/MeshHandoff.msg"
|
"msg/MeshHandoff.msg"
|
||||||
# Issue #221 — hand gesture pointing direction
|
# Issue #221 — hand gesture pointing direction
|
||||||
"msg/PointingTarget.msg"
|
"msg/PointingTarget.msg"
|
||||||
|
# Issue #227 — face landmark smoothing
|
||||||
|
"msg/FaceLandmarks.msg"
|
||||||
|
"msg/FaceLandmarksArray.msg"
|
||||||
DEPENDENCIES std_msgs geometry_msgs builtin_interfaces
|
DEPENDENCIES std_msgs geometry_msgs builtin_interfaces
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@ -0,0 +1,15 @@
|
|||||||
|
# FaceLandmarks.msg — 68-point facial landmark positions (Issue #227)
|
||||||
|
#
|
||||||
|
# Companion to FaceDetection.msg (which carries 5-point SCRFD keypoints).
|
||||||
|
# Carries full 68-point MediaPipe/dlib-style landmarks for downstream use
|
||||||
|
# (smoothing, expression analysis, gaze estimation, etc.).
|
||||||
|
#
|
||||||
|
# landmarks: flat array [x0,y0, x1,y1, ..., x67,y67] — 136 float32 values.
|
||||||
|
# Coordinates are normalised to [0,1] relative to the image (same convention
|
||||||
|
# as MediaPipe FaceMesh nx, ny).
|
||||||
|
#
|
||||||
|
std_msgs/Header header
|
||||||
|
int32 face_id # matches FaceDetection.face_id (-1 if unset)
|
||||||
|
int32 person_id # matches gallery person ID (-1 = unidentified)
|
||||||
|
float32[] landmarks # 136 floats: x0,y0, x1,y1 … x67,y67 (normalised)
|
||||||
|
float32 confidence # detector confidence [0,1]
|
||||||
@ -0,0 +1,3 @@
|
|||||||
|
# FaceLandmarksArray.msg — array of 68-point landmark sets (Issue #227)
|
||||||
|
std_msgs/Header header
|
||||||
|
FaceLandmarks[] faces
|
||||||
Loading…
x
Reference in New Issue
Block a user