Compare commits
No commits in common. "3b352ad2c5e5548529678d375869b0e441d1298c" and "a566ab61cd5cfbf3d7c9cbb118c1a9f7e44b1514" have entirely different histories.
3b352ad2c5
...
a566ab61cd
@ -1,161 +0,0 @@
|
|||||||
"""
|
|
||||||
_landmark_kalman.py — Vectorised constant-velocity Kalman filter for 2-D facial landmarks.
|
|
||||||
|
|
||||||
No ROS2 dependencies — pure numpy.
|
|
||||||
|
|
||||||
Each face track has one LandmarkKalmanFilter instance that maintains a
|
|
||||||
(N, 4) state array where each row is [x, y, vx, vy] for one landmark.
|
|
||||||
All N points are predicted / updated simultaneously using broadcast numpy
|
|
||||||
operations, so there is no per-point Python loop.
|
|
||||||
|
|
||||||
Usage
|
|
||||||
-----
|
|
||||||
kf = LandmarkKalmanFilter(n_points=68)
|
|
||||||
smoothed = kf.update(obs, dt) # obs: (68, 2) ndarray, dt: float (s)
|
|
||||||
smoothed = kf.update(obs, dt) # subsequent calls
|
|
||||||
"""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import numpy as np
|
|
||||||
from typing import Optional
|
|
||||||
|
|
||||||
|
|
||||||
class LandmarkKalmanFilter:
|
|
||||||
"""
|
|
||||||
Constant-velocity Kalman filter over N 2-D landmark points.
|
|
||||||
|
|
||||||
State per point: [x, y, vx, vy]
|
|
||||||
Observation per point: [x, y]
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
n_points : number of landmarks (default 68)
|
|
||||||
process_noise_std : std-dev of velocity random-walk noise (normalised coords/s)
|
|
||||||
obs_noise_std : std-dev of observation noise (normalised coords)
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
n_points: int = 68,
|
|
||||||
process_noise_std: float = 0.01,
|
|
||||||
obs_noise_std: float = 0.005,
|
|
||||||
) -> None:
|
|
||||||
self._n = n_points
|
|
||||||
self._q_std = process_noise_std
|
|
||||||
self._r_std = obs_noise_std
|
|
||||||
|
|
||||||
# State: (n, 4) [x, y, vx, vy]
|
|
||||||
self._x: Optional[np.ndarray] = None # initialised on first update
|
|
||||||
|
|
||||||
# Covariance: (n, 4, 4)
|
|
||||||
self._P: Optional[np.ndarray] = None
|
|
||||||
|
|
||||||
# Fixed matrices (shared across all points via broadcasting)
|
|
||||||
# Observation matrix H: (2, 4) — selects x and y
|
|
||||||
self._H = np.array([[1., 0., 0., 0.],
|
|
||||||
[0., 1., 0., 0.]]) # (2, 4)
|
|
||||||
|
|
||||||
# Observation noise R: (2, 2)
|
|
||||||
self._R = (obs_noise_std ** 2) * np.eye(2) # (2, 2)
|
|
||||||
|
|
||||||
# ── Public API ────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def update(self, obs: np.ndarray, dt: float) -> np.ndarray:
|
|
||||||
"""
|
|
||||||
Predict then update with new observations.
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
obs : (n_points, 2) array of [x, y] normalised landmark coordinates
|
|
||||||
dt : elapsed time since last call (seconds); unused on the first call
|
|
||||||
|
|
||||||
Returns
|
|
||||||
-------
|
|
||||||
(n_points, 2) array of smoothed [x, y] coordinates
|
|
||||||
"""
|
|
||||||
obs = np.asarray(obs, dtype=np.float64)
|
|
||||||
if obs.shape != (self._n, 2):
|
|
||||||
raise ValueError(f'Expected obs shape ({self._n}, 2), got {obs.shape}')
|
|
||||||
|
|
||||||
if self._x is None:
|
|
||||||
self._initialise(obs)
|
|
||||||
else:
|
|
||||||
self._predict(max(dt, 1e-6))
|
|
||||||
self._update(obs)
|
|
||||||
|
|
||||||
return self._x[:, :2].astype(np.float32)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def is_initialised(self) -> bool:
|
|
||||||
return self._x is not None
|
|
||||||
|
|
||||||
# ── Internal ──────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def _initialise(self, obs: np.ndarray) -> None:
|
|
||||||
"""Initialise state from first observation (zero velocity)."""
|
|
||||||
self._x = np.zeros((self._n, 4), dtype=np.float64)
|
|
||||||
self._x[:, :2] = obs
|
|
||||||
|
|
||||||
# Start with moderate position uncertainty, zero velocity
|
|
||||||
init_pos_var = (self._r_std * 10) ** 2
|
|
||||||
self._P = np.zeros((self._n, 4, 4), dtype=np.float64)
|
|
||||||
self._P[:, 0, 0] = init_pos_var
|
|
||||||
self._P[:, 1, 1] = init_pos_var
|
|
||||||
self._P[:, 2, 2] = (self._q_std * 5) ** 2
|
|
||||||
self._P[:, 3, 3] = (self._q_std * 5) ** 2
|
|
||||||
|
|
||||||
def _predict(self, dt: float) -> None:
|
|
||||||
"""Constant-velocity predict step: x' = F x, P' = F P Fᵀ + Q."""
|
|
||||||
# Transition matrix F: (4, 4)
|
|
||||||
F = np.array([[1., 0., dt, 0.],
|
|
||||||
[0., 1., 0., dt],
|
|
||||||
[0., 0., 1., 0.],
|
|
||||||
[0., 0., 0., 1.]])
|
|
||||||
|
|
||||||
# Process noise Q: (4, 4) — velocity driven (discrete Wiener model)
|
|
||||||
q = self._q_std ** 2
|
|
||||||
dt2 = dt * dt
|
|
||||||
dt3 = dt2 * dt
|
|
||||||
dt4 = dt3 * dt
|
|
||||||
Q = q * np.array([[dt4/4, 0., dt3/2, 0. ],
|
|
||||||
[0., dt4/4, 0., dt3/2],
|
|
||||||
[dt3/2, 0., dt2, 0. ],
|
|
||||||
[0., dt3/2, 0., dt2 ]])
|
|
||||||
|
|
||||||
# Vectorised over n points: (n,4) @ (4,4).T → (n,4)
|
|
||||||
self._x = self._x @ F.T # (n, 4)
|
|
||||||
|
|
||||||
# P' = F P Fᵀ + Q for each of n points
|
|
||||||
# (n,4,4) → matmul with (4,4) on both sides
|
|
||||||
FP = np.einsum('ij,njk->nik', F, self._P) # (n, 4, 4)
|
|
||||||
FPFt = np.einsum('nij,kj->nik', FP, F) # (n, 4, 4)
|
|
||||||
self._P = FPFt + Q[np.newaxis] # (n, 4, 4)
|
|
||||||
|
|
||||||
def _update(self, obs: np.ndarray) -> None:
|
|
||||||
"""Kalman update step with measurement obs: (n, 2)."""
|
|
||||||
H = self._H # (2, 4)
|
|
||||||
R = self._R # (2, 2)
|
|
||||||
|
|
||||||
# Innovation: y = obs − H x → (n, 2)
|
|
||||||
Hx = np.einsum('ij,nj->ni', H, self._x) # (n, 2)
|
|
||||||
y = obs - Hx # (n, 2)
|
|
||||||
|
|
||||||
# Innovation covariance S = H P Hᵀ + R → (n, 2, 2)
|
|
||||||
HP = np.einsum('ij,njk->nik', H, self._P) # (n, 2, 4)
|
|
||||||
S = np.einsum('nij,kj->nik', HP, H) + R[np.newaxis] # (n, 2, 2)
|
|
||||||
|
|
||||||
# Kalman gain K = P Hᵀ S⁻¹ → (n, 4, 2)
|
|
||||||
PHt = np.einsum('nij,kj->nik', self._P, H) # (n, 4, 2)
|
|
||||||
S_inv = np.linalg.inv(S) # (n, 2, 2)
|
|
||||||
K = np.einsum('nij,njk->nik', PHt, S_inv) # (n, 4, 2)
|
|
||||||
|
|
||||||
# State update: x += K y
|
|
||||||
Ky = np.einsum('nij,nj->ni', K, y) # (n, 4)
|
|
||||||
self._x = self._x + Ky
|
|
||||||
|
|
||||||
# Covariance update: P = (I - K H) P
|
|
||||||
n = self._n
|
|
||||||
I = np.eye(4)[np.newaxis].repeat(n, axis=0) # (n, 4, 4)
|
|
||||||
KH = np.einsum('nij,jk->nik', K, H) # (n, 4, 4)
|
|
||||||
self._P = np.einsum('nij,njk->nik', I - KH, self._P) # (n, 4, 4)
|
|
||||||
@ -1,165 +0,0 @@
|
|||||||
"""
|
|
||||||
landmark_smoother_node.py — Per-person 68-point Kalman landmark smoother (Issue #227).
|
|
||||||
|
|
||||||
Subscribes to raw 68-point facial landmarks and republishes a smoothed version
|
|
||||||
using a constant-velocity Kalman filter maintained per face track.
|
|
||||||
|
|
||||||
Subscribes:
|
|
||||||
/social/faces/landmarks saltybot_social_msgs/FaceLandmarksArray
|
|
||||||
|
|
||||||
Publishes:
|
|
||||||
/social/faces/landmarks_smooth saltybot_social_msgs/FaceLandmarksArray
|
|
||||||
|
|
||||||
Algorithm
|
|
||||||
---------
|
|
||||||
1. For each FaceLandmarks message, look up (or create) a per-track
|
|
||||||
LandmarkKalmanFilter keyed by person_id (or face_id when person_id == -1).
|
|
||||||
2. Call kf.update(obs, dt) → smoothed (68, 2) positions.
|
|
||||||
3. Publish immediately — no extra timer delay.
|
|
||||||
4. A 1 Hz timer prunes tracks not updated within `prune_age_s` (default 3.0 s).
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
prune_age_s float 3.0 Seconds of inactivity before a track is dropped
|
|
||||||
process_noise_std float 0.01 Kalman process noise std-dev (normalised coords/s)
|
|
||||||
obs_noise_std float 0.005 Kalman observation noise std-dev (normalised coords)
|
|
||||||
"""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import time
|
|
||||||
from typing import Dict, Optional, Tuple
|
|
||||||
|
|
||||||
import numpy as np
|
|
||||||
|
|
||||||
import rclpy
|
|
||||||
from rclpy.node import Node
|
|
||||||
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
|
|
||||||
|
|
||||||
from saltybot_social_msgs.msg import FaceLandmarks, FaceLandmarksArray
|
|
||||||
|
|
||||||
from ._landmark_kalman import LandmarkKalmanFilter
|
|
||||||
|
|
||||||
|
|
||||||
_SENSOR_QOS = QoSProfile(
|
|
||||||
reliability=ReliabilityPolicy.BEST_EFFORT,
|
|
||||||
history=HistoryPolicy.KEEP_LAST,
|
|
||||||
depth=4,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class LandmarkSmootherNode(Node):
|
|
||||||
|
|
||||||
def __init__(self) -> None:
|
|
||||||
super().__init__('landmark_smoother_node')
|
|
||||||
|
|
||||||
self.declare_parameter('prune_age_s', 3.0)
|
|
||||||
self.declare_parameter('process_noise_std', 0.01)
|
|
||||||
self.declare_parameter('obs_noise_std', 0.005)
|
|
||||||
|
|
||||||
self._prune_age = self.get_parameter('prune_age_s').value
|
|
||||||
self._proc_noise = self.get_parameter('process_noise_std').value
|
|
||||||
self._obs_noise = self.get_parameter('obs_noise_std').value
|
|
||||||
|
|
||||||
# track_key → (LandmarkKalmanFilter, last_update_time, last_stamp_sec)
|
|
||||||
self._tracks: Dict[int, Tuple[LandmarkKalmanFilter, float, float]] = {}
|
|
||||||
|
|
||||||
self._sub = self.create_subscription(
|
|
||||||
FaceLandmarksArray,
|
|
||||||
'/social/faces/landmarks',
|
|
||||||
self._on_landmarks,
|
|
||||||
_SENSOR_QOS,
|
|
||||||
)
|
|
||||||
self._pub = self.create_publisher(
|
|
||||||
FaceLandmarksArray,
|
|
||||||
'/social/faces/landmarks_smooth',
|
|
||||||
10,
|
|
||||||
)
|
|
||||||
self.create_timer(1.0, self._prune_stale_tracks)
|
|
||||||
|
|
||||||
self.get_logger().info(
|
|
||||||
f'landmark_smoother_node ready — '
|
|
||||||
f'prune_age={self._prune_age}s '
|
|
||||||
f'q={self._proc_noise} r={self._obs_noise}'
|
|
||||||
)
|
|
||||||
|
|
||||||
# ── Callback ──────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def _on_landmarks(self, msg: FaceLandmarksArray) -> None:
|
|
||||||
now = time.monotonic()
|
|
||||||
out = FaceLandmarksArray()
|
|
||||||
out.header = msg.header
|
|
||||||
|
|
||||||
for face in msg.faces:
|
|
||||||
# Validate landmark count
|
|
||||||
if len(face.landmarks) != 136:
|
|
||||||
self.get_logger().warn(
|
|
||||||
f'face_id={face.face_id} has {len(face.landmarks)} floats, '
|
|
||||||
f'expected 136 — skipping',
|
|
||||||
throttle_duration_sec=5.0,
|
|
||||||
)
|
|
||||||
out.faces.append(face) # pass through unchanged
|
|
||||||
continue
|
|
||||||
|
|
||||||
obs = np.array(face.landmarks, dtype=np.float64).reshape(68, 2)
|
|
||||||
track_key = self._track_key(face)
|
|
||||||
|
|
||||||
if track_key in self._tracks:
|
|
||||||
kf, _last_t, last_stamp = self._tracks[track_key]
|
|
||||||
msg_stamp = _stamp_to_sec(msg.header.stamp)
|
|
||||||
dt = msg_stamp - last_stamp if msg_stamp > last_stamp else 1.0 / 30.0
|
|
||||||
smoothed = kf.update(obs, dt)
|
|
||||||
else:
|
|
||||||
kf = LandmarkKalmanFilter(
|
|
||||||
n_points=68,
|
|
||||||
process_noise_std=self._proc_noise,
|
|
||||||
obs_noise_std=self._obs_noise,
|
|
||||||
)
|
|
||||||
smoothed = kf.update(obs, dt=0.0)
|
|
||||||
|
|
||||||
msg_stamp = _stamp_to_sec(msg.header.stamp)
|
|
||||||
self._tracks[track_key] = (kf, now, msg_stamp)
|
|
||||||
|
|
||||||
smoothed_face = FaceLandmarks()
|
|
||||||
smoothed_face.header = face.header
|
|
||||||
smoothed_face.face_id = face.face_id
|
|
||||||
smoothed_face.person_id = face.person_id
|
|
||||||
smoothed_face.confidence = face.confidence
|
|
||||||
smoothed_face.landmarks = smoothed.flatten().tolist()
|
|
||||||
out.faces.append(smoothed_face)
|
|
||||||
|
|
||||||
self._pub.publish(out)
|
|
||||||
|
|
||||||
# ── Helpers ───────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _track_key(face: FaceLandmarks) -> int:
|
|
||||||
"""Use person_id when valid, otherwise fall back to face_id."""
|
|
||||||
return face.person_id if face.person_id >= 0 else -(face.face_id + 1)
|
|
||||||
|
|
||||||
def _prune_stale_tracks(self) -> None:
|
|
||||||
now = time.monotonic()
|
|
||||||
stale = [k for k, (_, t, _) in self._tracks.items()
|
|
||||||
if now - t > self._prune_age]
|
|
||||||
for k in stale:
|
|
||||||
del self._tracks[k]
|
|
||||||
if stale:
|
|
||||||
self.get_logger().debug(f'Pruned {len(stale)} stale track(s)')
|
|
||||||
|
|
||||||
|
|
||||||
def _stamp_to_sec(stamp) -> float:
|
|
||||||
return stamp.sec + stamp.nanosec * 1e-9
|
|
||||||
|
|
||||||
|
|
||||||
def main(args=None) -> None:
|
|
||||||
rclpy.init(args=args)
|
|
||||||
node = LandmarkSmootherNode()
|
|
||||||
try:
|
|
||||||
rclpy.spin(node)
|
|
||||||
finally:
|
|
||||||
node.destroy_node()
|
|
||||||
rclpy.shutdown()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
main()
|
|
||||||
@ -25,8 +25,6 @@ setup(
|
|||||||
'console_scripts': [
|
'console_scripts': [
|
||||||
'face_recognition = saltybot_social_face.face_recognition_node:main',
|
'face_recognition = saltybot_social_face.face_recognition_node:main',
|
||||||
'enrollment_cli = saltybot_social_face.enrollment_cli:main',
|
'enrollment_cli = saltybot_social_face.enrollment_cli:main',
|
||||||
# 68-point Kalman landmark smoother (Issue #227)
|
|
||||||
'landmark_smoother = saltybot_social_face.landmark_smoother_node:main',
|
|
||||||
],
|
],
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|||||||
@ -1,246 +0,0 @@
|
|||||||
"""
|
|
||||||
test_landmark_smoother.py — Unit tests for LandmarkKalmanFilter (no ROS2 required).
|
|
||||||
|
|
||||||
Covers:
|
|
||||||
- Initialisation: first call returns observations unchanged
|
|
||||||
- Smoothing: output tracks input over multiple steps
|
|
||||||
- Unit vector: direction is preserved
|
|
||||||
- Shape / dtype
|
|
||||||
- dt handling: zero vs positive dt
|
|
||||||
- Noise reduction: repeated identical observations converge
|
|
||||||
- Track key logic tested via helper
|
|
||||||
- Fallback for short landmark arrays (pass-through guard)
|
|
||||||
"""
|
|
||||||
|
|
||||||
import sys
|
|
||||||
import os
|
|
||||||
import math
|
|
||||||
|
|
||||||
import numpy as np
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
|
||||||
|
|
||||||
from saltybot_social_face._landmark_kalman import LandmarkKalmanFilter
|
|
||||||
|
|
||||||
|
|
||||||
# ── Helpers ───────────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def _random_lm(n=68, seed=0):
|
|
||||||
rng = np.random.default_rng(seed)
|
|
||||||
return rng.uniform(0.1, 0.9, size=(n, 2)).astype(np.float32)
|
|
||||||
|
|
||||||
|
|
||||||
def _zero_lm(n=68):
|
|
||||||
return np.zeros((n, 2), dtype=np.float32)
|
|
||||||
|
|
||||||
|
|
||||||
# ── Shape and dtype ───────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
class TestOutputShape:
|
|
||||||
|
|
||||||
def test_output_shape_68(self):
|
|
||||||
kf = LandmarkKalmanFilter(n_points=68)
|
|
||||||
obs = _random_lm(68)
|
|
||||||
out = kf.update(obs, dt=0.0)
|
|
||||||
assert out.shape == (68, 2)
|
|
||||||
|
|
||||||
def test_output_dtype_float32(self):
|
|
||||||
kf = LandmarkKalmanFilter(n_points=68)
|
|
||||||
obs = _random_lm(68)
|
|
||||||
out = kf.update(obs, dt=0.0)
|
|
||||||
assert out.dtype == np.float32
|
|
||||||
|
|
||||||
def test_custom_n_points(self):
|
|
||||||
kf = LandmarkKalmanFilter(n_points=5)
|
|
||||||
obs = _random_lm(5)
|
|
||||||
out = kf.update(obs, dt=0.0)
|
|
||||||
assert out.shape == (5, 2)
|
|
||||||
|
|
||||||
def test_wrong_shape_raises(self):
|
|
||||||
kf = LandmarkKalmanFilter(n_points=68)
|
|
||||||
obs = _random_lm(5) # wrong number of points
|
|
||||||
with pytest.raises(ValueError):
|
|
||||||
kf.update(obs, dt=0.0)
|
|
||||||
|
|
||||||
|
|
||||||
# ── Initialisation ────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
class TestInitialisation:
|
|
||||||
|
|
||||||
def test_first_update_returns_obs(self):
|
|
||||||
"""On first call the filter has not yet built momentum, so output ≈ input."""
|
|
||||||
kf = LandmarkKalmanFilter(n_points=68)
|
|
||||||
obs = _random_lm(68)
|
|
||||||
out = kf.update(obs, dt=0.0)
|
|
||||||
np.testing.assert_allclose(out, obs, atol=1e-5)
|
|
||||||
|
|
||||||
def test_not_initialised_before_update(self):
|
|
||||||
kf = LandmarkKalmanFilter(n_points=68)
|
|
||||||
assert not kf.is_initialised
|
|
||||||
|
|
||||||
def test_initialised_after_first_update(self):
|
|
||||||
kf = LandmarkKalmanFilter(n_points=68)
|
|
||||||
kf.update(_random_lm(), dt=0.0)
|
|
||||||
assert kf.is_initialised
|
|
||||||
|
|
||||||
|
|
||||||
# ── Smoothing behaviour ───────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
class TestSmoothingBehaviour:
|
|
||||||
|
|
||||||
def test_stationary_obs_converges(self):
|
|
||||||
"""Repeated identical observations should produce output ≈ input."""
|
|
||||||
kf = LandmarkKalmanFilter(n_points=68, obs_noise_std=0.005)
|
|
||||||
obs = _random_lm(68, seed=1)
|
|
||||||
for _ in range(30):
|
|
||||||
out = kf.update(obs, dt=1.0 / 30.0)
|
|
||||||
np.testing.assert_allclose(out, obs, atol=0.01)
|
|
||||||
|
|
||||||
def test_output_stays_bounded(self):
|
|
||||||
"""Output should remain within [0, 1] when inputs are in [0, 1]."""
|
|
||||||
kf = LandmarkKalmanFilter(n_points=68)
|
|
||||||
rng = np.random.default_rng(42)
|
|
||||||
for _ in range(50):
|
|
||||||
obs = rng.uniform(0.05, 0.95, size=(68, 2)).astype(np.float32)
|
|
||||||
out = kf.update(obs, dt=1.0 / 30.0)
|
|
||||||
# Allow a small margin beyond [0,1] due to velocity extrapolation
|
|
||||||
assert out.min() > -0.1
|
|
||||||
assert out.max() < 1.1
|
|
||||||
|
|
||||||
def test_smoothing_reduces_noise(self):
|
|
||||||
"""
|
|
||||||
With low process noise the filter should dampen high-frequency jitter.
|
|
||||||
Average per-frame change in output should be less than in noisy input.
|
|
||||||
"""
|
|
||||||
kf = LandmarkKalmanFilter(n_points=68, process_noise_std=0.002, obs_noise_std=0.01)
|
|
||||||
rng = np.random.default_rng(7)
|
|
||||||
base = rng.uniform(0.2, 0.8, size=(68, 2)).astype(np.float32)
|
|
||||||
noise = 0.05 # std-dev of added noise
|
|
||||||
prev_obs = None
|
|
||||||
prev_out = None
|
|
||||||
obs_deltas = []
|
|
||||||
out_deltas = []
|
|
||||||
for _ in range(40):
|
|
||||||
obs = (base + rng.normal(0, noise, size=(68, 2))).astype(np.float32)
|
|
||||||
out = kf.update(obs, dt=1.0 / 30.0)
|
|
||||||
if prev_obs is not None:
|
|
||||||
obs_deltas.append(np.abs(obs - prev_obs).mean())
|
|
||||||
out_deltas.append(np.abs(out - prev_out).mean())
|
|
||||||
prev_obs = obs.copy()
|
|
||||||
prev_out = out.copy()
|
|
||||||
assert np.mean(out_deltas) < np.mean(obs_deltas)
|
|
||||||
|
|
||||||
def test_velocity_prediction_follows_linear_motion(self):
|
|
||||||
"""
|
|
||||||
Points moving linearly should be predicted ahead-of-time so the
|
|
||||||
filter output lags by less than the raw measurement noise.
|
|
||||||
"""
|
|
||||||
kf = LandmarkKalmanFilter(n_points=68, process_noise_std=0.005, obs_noise_std=0.001)
|
|
||||||
dt = 1.0 / 30.0
|
|
||||||
vel = 0.002 # normalised coords/frame
|
|
||||||
obs = np.tile([0.3, 0.4], (68, 1)).astype(np.float32)
|
|
||||||
# Warm up the filter
|
|
||||||
for i in range(20):
|
|
||||||
obs_step = obs + vel * i
|
|
||||||
kf.update(obs_step, dt=dt)
|
|
||||||
# After warm-up, output should track the moving target
|
|
||||||
out = kf.update(obs + vel * 20, dt=dt)
|
|
||||||
target = obs + vel * 20
|
|
||||||
np.testing.assert_allclose(out, target, atol=0.01)
|
|
||||||
|
|
||||||
|
|
||||||
# ── dt handling ───────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
class TestDtHandling:
|
|
||||||
|
|
||||||
def test_zero_dt_on_first_call_ok(self):
|
|
||||||
kf = LandmarkKalmanFilter(n_points=68)
|
|
||||||
obs = _random_lm()
|
|
||||||
out = kf.update(obs, dt=0.0) # must not crash
|
|
||||||
assert out.shape == (68, 2)
|
|
||||||
|
|
||||||
def test_large_dt_does_not_crash(self):
|
|
||||||
kf = LandmarkKalmanFilter(n_points=68)
|
|
||||||
obs = _random_lm()
|
|
||||||
kf.update(obs, dt=0.0)
|
|
||||||
out = kf.update(obs, dt=5.0) # 5 second gap
|
|
||||||
assert out.shape == (68, 2)
|
|
||||||
|
|
||||||
def test_multiple_calls_increase_smoothness(self):
|
|
||||||
"""Calling update repeatedly should not raise exceptions."""
|
|
||||||
kf = LandmarkKalmanFilter(n_points=68)
|
|
||||||
obs = _random_lm()
|
|
||||||
for i in range(100):
|
|
||||||
kf.update(obs, dt=1.0 / 30.0)
|
|
||||||
|
|
||||||
|
|
||||||
# ── Process and observation noise parameters ──────────────────────────────────
|
|
||||||
|
|
||||||
class TestNoiseParameters:
|
|
||||||
|
|
||||||
def test_high_obs_noise_smooths_more(self):
|
|
||||||
"""Higher obs noise → output moves less per step (more smoothing)."""
|
|
||||||
rng = np.random.default_rng(99)
|
|
||||||
base = rng.uniform(0.2, 0.8, size=(68, 2)).astype(np.float32)
|
|
||||||
|
|
||||||
kf_low = LandmarkKalmanFilter(n_points=68, obs_noise_std=0.001)
|
|
||||||
kf_high = LandmarkKalmanFilter(n_points=68, obs_noise_std=0.1)
|
|
||||||
|
|
||||||
for i in range(10):
|
|
||||||
obs = (base + rng.normal(0, 0.02, size=(68, 2))).astype(np.float32)
|
|
||||||
kf_low.update(obs, dt=1.0 / 30.0)
|
|
||||||
kf_high.update(obs, dt=1.0 / 30.0)
|
|
||||||
|
|
||||||
obs_new = (base + rng.normal(0, 0.02, size=(68, 2))).astype(np.float32)
|
|
||||||
prev_low = kf_low._x[:, :2].copy()
|
|
||||||
prev_high = kf_high._x[:, :2].copy()
|
|
||||||
kf_low.update(obs_new, dt=1.0 / 30.0)
|
|
||||||
kf_high.update(obs_new, dt=1.0 / 30.0)
|
|
||||||
|
|
||||||
delta_low = np.abs(kf_low._x[:, :2] - prev_low).mean()
|
|
||||||
delta_high = np.abs(kf_high._x[:, :2] - prev_high).mean()
|
|
||||||
assert delta_high < delta_low
|
|
||||||
|
|
||||||
def test_known_stationary_value(self):
|
|
||||||
"""With very low noise a stationary signal should converge closely."""
|
|
||||||
kf = LandmarkKalmanFilter(n_points=1, process_noise_std=1e-4, obs_noise_std=1e-4)
|
|
||||||
obs = np.array([[0.5, 0.5]], dtype=np.float32)
|
|
||||||
for _ in range(50):
|
|
||||||
out = kf.update(obs, dt=1.0 / 30.0)
|
|
||||||
np.testing.assert_allclose(out, obs, atol=0.001)
|
|
||||||
|
|
||||||
|
|
||||||
# ── Track key helper (reimplemented to mirror node logic without importing ROS) ─
|
|
||||||
|
|
||||||
def _track_key(person_id: int, face_id: int) -> int:
|
|
||||||
"""Mirror of LandmarkSmootherNode._track_key logic."""
|
|
||||||
return person_id if person_id >= 0 else -(face_id + 1)
|
|
||||||
|
|
||||||
|
|
||||||
class TestTrackKeyLogic:
|
|
||||||
|
|
||||||
def test_positive_person_id_used(self):
|
|
||||||
assert _track_key(person_id=3, face_id=7) == 3
|
|
||||||
|
|
||||||
def test_zero_person_id_used(self):
|
|
||||||
assert _track_key(person_id=0, face_id=7) == 0
|
|
||||||
|
|
||||||
def test_minus_one_person_id_uses_face_id(self):
|
|
||||||
key = _track_key(person_id=-1, face_id=2)
|
|
||||||
assert key == -(2 + 1) # == -3
|
|
||||||
|
|
||||||
def test_different_face_ids_give_different_keys(self):
|
|
||||||
k0 = _track_key(person_id=-1, face_id=0)
|
|
||||||
k1 = _track_key(person_id=-1, face_id=1)
|
|
||||||
assert k0 != k1
|
|
||||||
|
|
||||||
def test_person_key_does_not_collide_with_face_key(self):
|
|
||||||
"""Person ID 3 and face_id 3 should not produce the same key."""
|
|
||||||
kp = _track_key(person_id=3, face_id=99)
|
|
||||||
kf = _track_key(person_id=-1, face_id=3)
|
|
||||||
assert kp != kf
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
pytest.main([__file__, '-v'])
|
|
||||||
@ -46,9 +46,6 @@ rosidl_generate_interfaces(${PROJECT_NAME}
|
|||||||
"msg/MeshHandoff.msg"
|
"msg/MeshHandoff.msg"
|
||||||
# Issue #221 — hand gesture pointing direction
|
# Issue #221 — hand gesture pointing direction
|
||||||
"msg/PointingTarget.msg"
|
"msg/PointingTarget.msg"
|
||||||
# Issue #227 — face landmark smoothing
|
|
||||||
"msg/FaceLandmarks.msg"
|
|
||||||
"msg/FaceLandmarksArray.msg"
|
|
||||||
DEPENDENCIES std_msgs geometry_msgs builtin_interfaces
|
DEPENDENCIES std_msgs geometry_msgs builtin_interfaces
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@ -1,15 +0,0 @@
|
|||||||
# FaceLandmarks.msg — 68-point facial landmark positions (Issue #227)
|
|
||||||
#
|
|
||||||
# Companion to FaceDetection.msg (which carries 5-point SCRFD keypoints).
|
|
||||||
# Carries full 68-point MediaPipe/dlib-style landmarks for downstream use
|
|
||||||
# (smoothing, expression analysis, gaze estimation, etc.).
|
|
||||||
#
|
|
||||||
# landmarks: flat array [x0,y0, x1,y1, ..., x67,y67] — 136 float32 values.
|
|
||||||
# Coordinates are normalised to [0,1] relative to the image (same convention
|
|
||||||
# as MediaPipe FaceMesh nx, ny).
|
|
||||||
#
|
|
||||||
std_msgs/Header header
|
|
||||||
int32 face_id # matches FaceDetection.face_id (-1 if unset)
|
|
||||||
int32 person_id # matches gallery person ID (-1 = unidentified)
|
|
||||||
float32[] landmarks # 136 floats: x0,y0, x1,y1 … x67,y67 (normalised)
|
|
||||||
float32 confidence # detector confidence [0,1]
|
|
||||||
@ -1,3 +0,0 @@
|
|||||||
# FaceLandmarksArray.msg — array of 68-point landmark sets (Issue #227)
|
|
||||||
std_msgs/Header header
|
|
||||||
FaceLandmarks[] faces
|
|
||||||
Loading…
x
Reference in New Issue
Block a user