Merge pull request 'feat(social): 68-point Kalman landmark smoother (Issue #227)' (#231) from sl-perception/issue-227-landmark-smooth into main
Some checks failed
Some checks failed
This commit is contained in:
commit
3b352ad2c5
@ -0,0 +1,161 @@
|
||||
"""
|
||||
_landmark_kalman.py — Vectorised constant-velocity Kalman filter for 2-D facial landmarks.
|
||||
|
||||
No ROS2 dependencies — pure numpy.
|
||||
|
||||
Each face track has one LandmarkKalmanFilter instance that maintains a
|
||||
(N, 4) state array where each row is [x, y, vx, vy] for one landmark.
|
||||
All N points are predicted / updated simultaneously using broadcast numpy
|
||||
operations, so there is no per-point Python loop.
|
||||
|
||||
Usage
|
||||
-----
|
||||
kf = LandmarkKalmanFilter(n_points=68)
|
||||
smoothed = kf.update(obs, dt) # obs: (68, 2) ndarray, dt: float (s)
|
||||
smoothed = kf.update(obs, dt) # subsequent calls
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import numpy as np
|
||||
from typing import Optional
|
||||
|
||||
|
||||
class LandmarkKalmanFilter:
|
||||
"""
|
||||
Constant-velocity Kalman filter over N 2-D landmark points.
|
||||
|
||||
State per point: [x, y, vx, vy]
|
||||
Observation per point: [x, y]
|
||||
|
||||
Parameters
|
||||
----------
|
||||
n_points : number of landmarks (default 68)
|
||||
process_noise_std : std-dev of velocity random-walk noise (normalised coords/s)
|
||||
obs_noise_std : std-dev of observation noise (normalised coords)
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
n_points: int = 68,
|
||||
process_noise_std: float = 0.01,
|
||||
obs_noise_std: float = 0.005,
|
||||
) -> None:
|
||||
self._n = n_points
|
||||
self._q_std = process_noise_std
|
||||
self._r_std = obs_noise_std
|
||||
|
||||
# State: (n, 4) [x, y, vx, vy]
|
||||
self._x: Optional[np.ndarray] = None # initialised on first update
|
||||
|
||||
# Covariance: (n, 4, 4)
|
||||
self._P: Optional[np.ndarray] = None
|
||||
|
||||
# Fixed matrices (shared across all points via broadcasting)
|
||||
# Observation matrix H: (2, 4) — selects x and y
|
||||
self._H = np.array([[1., 0., 0., 0.],
|
||||
[0., 1., 0., 0.]]) # (2, 4)
|
||||
|
||||
# Observation noise R: (2, 2)
|
||||
self._R = (obs_noise_std ** 2) * np.eye(2) # (2, 2)
|
||||
|
||||
# ── Public API ────────────────────────────────────────────────────────────
|
||||
|
||||
def update(self, obs: np.ndarray, dt: float) -> np.ndarray:
|
||||
"""
|
||||
Predict then update with new observations.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
obs : (n_points, 2) array of [x, y] normalised landmark coordinates
|
||||
dt : elapsed time since last call (seconds); unused on the first call
|
||||
|
||||
Returns
|
||||
-------
|
||||
(n_points, 2) array of smoothed [x, y] coordinates
|
||||
"""
|
||||
obs = np.asarray(obs, dtype=np.float64)
|
||||
if obs.shape != (self._n, 2):
|
||||
raise ValueError(f'Expected obs shape ({self._n}, 2), got {obs.shape}')
|
||||
|
||||
if self._x is None:
|
||||
self._initialise(obs)
|
||||
else:
|
||||
self._predict(max(dt, 1e-6))
|
||||
self._update(obs)
|
||||
|
||||
return self._x[:, :2].astype(np.float32)
|
||||
|
||||
@property
|
||||
def is_initialised(self) -> bool:
|
||||
return self._x is not None
|
||||
|
||||
# ── Internal ──────────────────────────────────────────────────────────────
|
||||
|
||||
def _initialise(self, obs: np.ndarray) -> None:
|
||||
"""Initialise state from first observation (zero velocity)."""
|
||||
self._x = np.zeros((self._n, 4), dtype=np.float64)
|
||||
self._x[:, :2] = obs
|
||||
|
||||
# Start with moderate position uncertainty, zero velocity
|
||||
init_pos_var = (self._r_std * 10) ** 2
|
||||
self._P = np.zeros((self._n, 4, 4), dtype=np.float64)
|
||||
self._P[:, 0, 0] = init_pos_var
|
||||
self._P[:, 1, 1] = init_pos_var
|
||||
self._P[:, 2, 2] = (self._q_std * 5) ** 2
|
||||
self._P[:, 3, 3] = (self._q_std * 5) ** 2
|
||||
|
||||
def _predict(self, dt: float) -> None:
|
||||
"""Constant-velocity predict step: x' = F x, P' = F P Fᵀ + Q."""
|
||||
# Transition matrix F: (4, 4)
|
||||
F = np.array([[1., 0., dt, 0.],
|
||||
[0., 1., 0., dt],
|
||||
[0., 0., 1., 0.],
|
||||
[0., 0., 0., 1.]])
|
||||
|
||||
# Process noise Q: (4, 4) — velocity driven (discrete Wiener model)
|
||||
q = self._q_std ** 2
|
||||
dt2 = dt * dt
|
||||
dt3 = dt2 * dt
|
||||
dt4 = dt3 * dt
|
||||
Q = q * np.array([[dt4/4, 0., dt3/2, 0. ],
|
||||
[0., dt4/4, 0., dt3/2],
|
||||
[dt3/2, 0., dt2, 0. ],
|
||||
[0., dt3/2, 0., dt2 ]])
|
||||
|
||||
# Vectorised over n points: (n,4) @ (4,4).T → (n,4)
|
||||
self._x = self._x @ F.T # (n, 4)
|
||||
|
||||
# P' = F P Fᵀ + Q for each of n points
|
||||
# (n,4,4) → matmul with (4,4) on both sides
|
||||
FP = np.einsum('ij,njk->nik', F, self._P) # (n, 4, 4)
|
||||
FPFt = np.einsum('nij,kj->nik', FP, F) # (n, 4, 4)
|
||||
self._P = FPFt + Q[np.newaxis] # (n, 4, 4)
|
||||
|
||||
def _update(self, obs: np.ndarray) -> None:
|
||||
"""Kalman update step with measurement obs: (n, 2)."""
|
||||
H = self._H # (2, 4)
|
||||
R = self._R # (2, 2)
|
||||
|
||||
# Innovation: y = obs − H x → (n, 2)
|
||||
Hx = np.einsum('ij,nj->ni', H, self._x) # (n, 2)
|
||||
y = obs - Hx # (n, 2)
|
||||
|
||||
# Innovation covariance S = H P Hᵀ + R → (n, 2, 2)
|
||||
HP = np.einsum('ij,njk->nik', H, self._P) # (n, 2, 4)
|
||||
S = np.einsum('nij,kj->nik', HP, H) + R[np.newaxis] # (n, 2, 2)
|
||||
|
||||
# Kalman gain K = P Hᵀ S⁻¹ → (n, 4, 2)
|
||||
PHt = np.einsum('nij,kj->nik', self._P, H) # (n, 4, 2)
|
||||
S_inv = np.linalg.inv(S) # (n, 2, 2)
|
||||
K = np.einsum('nij,njk->nik', PHt, S_inv) # (n, 4, 2)
|
||||
|
||||
# State update: x += K y
|
||||
Ky = np.einsum('nij,nj->ni', K, y) # (n, 4)
|
||||
self._x = self._x + Ky
|
||||
|
||||
# Covariance update: P = (I - K H) P
|
||||
n = self._n
|
||||
I = np.eye(4)[np.newaxis].repeat(n, axis=0) # (n, 4, 4)
|
||||
KH = np.einsum('nij,jk->nik', K, H) # (n, 4, 4)
|
||||
self._P = np.einsum('nij,njk->nik', I - KH, self._P) # (n, 4, 4)
|
||||
@ -0,0 +1,165 @@
|
||||
"""
|
||||
landmark_smoother_node.py — Per-person 68-point Kalman landmark smoother (Issue #227).
|
||||
|
||||
Subscribes to raw 68-point facial landmarks and republishes a smoothed version
|
||||
using a constant-velocity Kalman filter maintained per face track.
|
||||
|
||||
Subscribes:
|
||||
/social/faces/landmarks saltybot_social_msgs/FaceLandmarksArray
|
||||
|
||||
Publishes:
|
||||
/social/faces/landmarks_smooth saltybot_social_msgs/FaceLandmarksArray
|
||||
|
||||
Algorithm
|
||||
---------
|
||||
1. For each FaceLandmarks message, look up (or create) a per-track
|
||||
LandmarkKalmanFilter keyed by person_id (or face_id when person_id == -1).
|
||||
2. Call kf.update(obs, dt) → smoothed (68, 2) positions.
|
||||
3. Publish immediately — no extra timer delay.
|
||||
4. A 1 Hz timer prunes tracks not updated within `prune_age_s` (default 3.0 s).
|
||||
|
||||
Parameters
|
||||
----------
|
||||
prune_age_s float 3.0 Seconds of inactivity before a track is dropped
|
||||
process_noise_std float 0.01 Kalman process noise std-dev (normalised coords/s)
|
||||
obs_noise_std float 0.005 Kalman observation noise std-dev (normalised coords)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from typing import Dict, Optional, Tuple
|
||||
|
||||
import numpy as np
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
|
||||
|
||||
from saltybot_social_msgs.msg import FaceLandmarks, FaceLandmarksArray
|
||||
|
||||
from ._landmark_kalman import LandmarkKalmanFilter
|
||||
|
||||
|
||||
_SENSOR_QOS = QoSProfile(
|
||||
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||
history=HistoryPolicy.KEEP_LAST,
|
||||
depth=4,
|
||||
)
|
||||
|
||||
|
||||
class LandmarkSmootherNode(Node):
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__('landmark_smoother_node')
|
||||
|
||||
self.declare_parameter('prune_age_s', 3.0)
|
||||
self.declare_parameter('process_noise_std', 0.01)
|
||||
self.declare_parameter('obs_noise_std', 0.005)
|
||||
|
||||
self._prune_age = self.get_parameter('prune_age_s').value
|
||||
self._proc_noise = self.get_parameter('process_noise_std').value
|
||||
self._obs_noise = self.get_parameter('obs_noise_std').value
|
||||
|
||||
# track_key → (LandmarkKalmanFilter, last_update_time, last_stamp_sec)
|
||||
self._tracks: Dict[int, Tuple[LandmarkKalmanFilter, float, float]] = {}
|
||||
|
||||
self._sub = self.create_subscription(
|
||||
FaceLandmarksArray,
|
||||
'/social/faces/landmarks',
|
||||
self._on_landmarks,
|
||||
_SENSOR_QOS,
|
||||
)
|
||||
self._pub = self.create_publisher(
|
||||
FaceLandmarksArray,
|
||||
'/social/faces/landmarks_smooth',
|
||||
10,
|
||||
)
|
||||
self.create_timer(1.0, self._prune_stale_tracks)
|
||||
|
||||
self.get_logger().info(
|
||||
f'landmark_smoother_node ready — '
|
||||
f'prune_age={self._prune_age}s '
|
||||
f'q={self._proc_noise} r={self._obs_noise}'
|
||||
)
|
||||
|
||||
# ── Callback ──────────────────────────────────────────────────────────────
|
||||
|
||||
def _on_landmarks(self, msg: FaceLandmarksArray) -> None:
|
||||
now = time.monotonic()
|
||||
out = FaceLandmarksArray()
|
||||
out.header = msg.header
|
||||
|
||||
for face in msg.faces:
|
||||
# Validate landmark count
|
||||
if len(face.landmarks) != 136:
|
||||
self.get_logger().warn(
|
||||
f'face_id={face.face_id} has {len(face.landmarks)} floats, '
|
||||
f'expected 136 — skipping',
|
||||
throttle_duration_sec=5.0,
|
||||
)
|
||||
out.faces.append(face) # pass through unchanged
|
||||
continue
|
||||
|
||||
obs = np.array(face.landmarks, dtype=np.float64).reshape(68, 2)
|
||||
track_key = self._track_key(face)
|
||||
|
||||
if track_key in self._tracks:
|
||||
kf, _last_t, last_stamp = self._tracks[track_key]
|
||||
msg_stamp = _stamp_to_sec(msg.header.stamp)
|
||||
dt = msg_stamp - last_stamp if msg_stamp > last_stamp else 1.0 / 30.0
|
||||
smoothed = kf.update(obs, dt)
|
||||
else:
|
||||
kf = LandmarkKalmanFilter(
|
||||
n_points=68,
|
||||
process_noise_std=self._proc_noise,
|
||||
obs_noise_std=self._obs_noise,
|
||||
)
|
||||
smoothed = kf.update(obs, dt=0.0)
|
||||
|
||||
msg_stamp = _stamp_to_sec(msg.header.stamp)
|
||||
self._tracks[track_key] = (kf, now, msg_stamp)
|
||||
|
||||
smoothed_face = FaceLandmarks()
|
||||
smoothed_face.header = face.header
|
||||
smoothed_face.face_id = face.face_id
|
||||
smoothed_face.person_id = face.person_id
|
||||
smoothed_face.confidence = face.confidence
|
||||
smoothed_face.landmarks = smoothed.flatten().tolist()
|
||||
out.faces.append(smoothed_face)
|
||||
|
||||
self._pub.publish(out)
|
||||
|
||||
# ── Helpers ───────────────────────────────────────────────────────────────
|
||||
|
||||
@staticmethod
|
||||
def _track_key(face: FaceLandmarks) -> int:
|
||||
"""Use person_id when valid, otherwise fall back to face_id."""
|
||||
return face.person_id if face.person_id >= 0 else -(face.face_id + 1)
|
||||
|
||||
def _prune_stale_tracks(self) -> None:
|
||||
now = time.monotonic()
|
||||
stale = [k for k, (_, t, _) in self._tracks.items()
|
||||
if now - t > self._prune_age]
|
||||
for k in stale:
|
||||
del self._tracks[k]
|
||||
if stale:
|
||||
self.get_logger().debug(f'Pruned {len(stale)} stale track(s)')
|
||||
|
||||
|
||||
def _stamp_to_sec(stamp) -> float:
|
||||
return stamp.sec + stamp.nanosec * 1e-9
|
||||
|
||||
|
||||
def main(args=None) -> None:
|
||||
rclpy.init(args=args)
|
||||
node = LandmarkSmootherNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@ -25,6 +25,8 @@ setup(
|
||||
'console_scripts': [
|
||||
'face_recognition = saltybot_social_face.face_recognition_node:main',
|
||||
'enrollment_cli = saltybot_social_face.enrollment_cli:main',
|
||||
# 68-point Kalman landmark smoother (Issue #227)
|
||||
'landmark_smoother = saltybot_social_face.landmark_smoother_node:main',
|
||||
],
|
||||
},
|
||||
)
|
||||
|
||||
@ -0,0 +1,246 @@
|
||||
"""
|
||||
test_landmark_smoother.py — Unit tests for LandmarkKalmanFilter (no ROS2 required).
|
||||
|
||||
Covers:
|
||||
- Initialisation: first call returns observations unchanged
|
||||
- Smoothing: output tracks input over multiple steps
|
||||
- Unit vector: direction is preserved
|
||||
- Shape / dtype
|
||||
- dt handling: zero vs positive dt
|
||||
- Noise reduction: repeated identical observations converge
|
||||
- Track key logic tested via helper
|
||||
- Fallback for short landmark arrays (pass-through guard)
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
import math
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
||||
|
||||
from saltybot_social_face._landmark_kalman import LandmarkKalmanFilter
|
||||
|
||||
|
||||
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||||
|
||||
def _random_lm(n=68, seed=0):
|
||||
rng = np.random.default_rng(seed)
|
||||
return rng.uniform(0.1, 0.9, size=(n, 2)).astype(np.float32)
|
||||
|
||||
|
||||
def _zero_lm(n=68):
|
||||
return np.zeros((n, 2), dtype=np.float32)
|
||||
|
||||
|
||||
# ── Shape and dtype ───────────────────────────────────────────────────────────
|
||||
|
||||
class TestOutputShape:
|
||||
|
||||
def test_output_shape_68(self):
|
||||
kf = LandmarkKalmanFilter(n_points=68)
|
||||
obs = _random_lm(68)
|
||||
out = kf.update(obs, dt=0.0)
|
||||
assert out.shape == (68, 2)
|
||||
|
||||
def test_output_dtype_float32(self):
|
||||
kf = LandmarkKalmanFilter(n_points=68)
|
||||
obs = _random_lm(68)
|
||||
out = kf.update(obs, dt=0.0)
|
||||
assert out.dtype == np.float32
|
||||
|
||||
def test_custom_n_points(self):
|
||||
kf = LandmarkKalmanFilter(n_points=5)
|
||||
obs = _random_lm(5)
|
||||
out = kf.update(obs, dt=0.0)
|
||||
assert out.shape == (5, 2)
|
||||
|
||||
def test_wrong_shape_raises(self):
|
||||
kf = LandmarkKalmanFilter(n_points=68)
|
||||
obs = _random_lm(5) # wrong number of points
|
||||
with pytest.raises(ValueError):
|
||||
kf.update(obs, dt=0.0)
|
||||
|
||||
|
||||
# ── Initialisation ────────────────────────────────────────────────────────────
|
||||
|
||||
class TestInitialisation:
|
||||
|
||||
def test_first_update_returns_obs(self):
|
||||
"""On first call the filter has not yet built momentum, so output ≈ input."""
|
||||
kf = LandmarkKalmanFilter(n_points=68)
|
||||
obs = _random_lm(68)
|
||||
out = kf.update(obs, dt=0.0)
|
||||
np.testing.assert_allclose(out, obs, atol=1e-5)
|
||||
|
||||
def test_not_initialised_before_update(self):
|
||||
kf = LandmarkKalmanFilter(n_points=68)
|
||||
assert not kf.is_initialised
|
||||
|
||||
def test_initialised_after_first_update(self):
|
||||
kf = LandmarkKalmanFilter(n_points=68)
|
||||
kf.update(_random_lm(), dt=0.0)
|
||||
assert kf.is_initialised
|
||||
|
||||
|
||||
# ── Smoothing behaviour ───────────────────────────────────────────────────────
|
||||
|
||||
class TestSmoothingBehaviour:
|
||||
|
||||
def test_stationary_obs_converges(self):
|
||||
"""Repeated identical observations should produce output ≈ input."""
|
||||
kf = LandmarkKalmanFilter(n_points=68, obs_noise_std=0.005)
|
||||
obs = _random_lm(68, seed=1)
|
||||
for _ in range(30):
|
||||
out = kf.update(obs, dt=1.0 / 30.0)
|
||||
np.testing.assert_allclose(out, obs, atol=0.01)
|
||||
|
||||
def test_output_stays_bounded(self):
|
||||
"""Output should remain within [0, 1] when inputs are in [0, 1]."""
|
||||
kf = LandmarkKalmanFilter(n_points=68)
|
||||
rng = np.random.default_rng(42)
|
||||
for _ in range(50):
|
||||
obs = rng.uniform(0.05, 0.95, size=(68, 2)).astype(np.float32)
|
||||
out = kf.update(obs, dt=1.0 / 30.0)
|
||||
# Allow a small margin beyond [0,1] due to velocity extrapolation
|
||||
assert out.min() > -0.1
|
||||
assert out.max() < 1.1
|
||||
|
||||
def test_smoothing_reduces_noise(self):
|
||||
"""
|
||||
With low process noise the filter should dampen high-frequency jitter.
|
||||
Average per-frame change in output should be less than in noisy input.
|
||||
"""
|
||||
kf = LandmarkKalmanFilter(n_points=68, process_noise_std=0.002, obs_noise_std=0.01)
|
||||
rng = np.random.default_rng(7)
|
||||
base = rng.uniform(0.2, 0.8, size=(68, 2)).astype(np.float32)
|
||||
noise = 0.05 # std-dev of added noise
|
||||
prev_obs = None
|
||||
prev_out = None
|
||||
obs_deltas = []
|
||||
out_deltas = []
|
||||
for _ in range(40):
|
||||
obs = (base + rng.normal(0, noise, size=(68, 2))).astype(np.float32)
|
||||
out = kf.update(obs, dt=1.0 / 30.0)
|
||||
if prev_obs is not None:
|
||||
obs_deltas.append(np.abs(obs - prev_obs).mean())
|
||||
out_deltas.append(np.abs(out - prev_out).mean())
|
||||
prev_obs = obs.copy()
|
||||
prev_out = out.copy()
|
||||
assert np.mean(out_deltas) < np.mean(obs_deltas)
|
||||
|
||||
def test_velocity_prediction_follows_linear_motion(self):
|
||||
"""
|
||||
Points moving linearly should be predicted ahead-of-time so the
|
||||
filter output lags by less than the raw measurement noise.
|
||||
"""
|
||||
kf = LandmarkKalmanFilter(n_points=68, process_noise_std=0.005, obs_noise_std=0.001)
|
||||
dt = 1.0 / 30.0
|
||||
vel = 0.002 # normalised coords/frame
|
||||
obs = np.tile([0.3, 0.4], (68, 1)).astype(np.float32)
|
||||
# Warm up the filter
|
||||
for i in range(20):
|
||||
obs_step = obs + vel * i
|
||||
kf.update(obs_step, dt=dt)
|
||||
# After warm-up, output should track the moving target
|
||||
out = kf.update(obs + vel * 20, dt=dt)
|
||||
target = obs + vel * 20
|
||||
np.testing.assert_allclose(out, target, atol=0.01)
|
||||
|
||||
|
||||
# ── dt handling ───────────────────────────────────────────────────────────────
|
||||
|
||||
class TestDtHandling:
|
||||
|
||||
def test_zero_dt_on_first_call_ok(self):
|
||||
kf = LandmarkKalmanFilter(n_points=68)
|
||||
obs = _random_lm()
|
||||
out = kf.update(obs, dt=0.0) # must not crash
|
||||
assert out.shape == (68, 2)
|
||||
|
||||
def test_large_dt_does_not_crash(self):
|
||||
kf = LandmarkKalmanFilter(n_points=68)
|
||||
obs = _random_lm()
|
||||
kf.update(obs, dt=0.0)
|
||||
out = kf.update(obs, dt=5.0) # 5 second gap
|
||||
assert out.shape == (68, 2)
|
||||
|
||||
def test_multiple_calls_increase_smoothness(self):
|
||||
"""Calling update repeatedly should not raise exceptions."""
|
||||
kf = LandmarkKalmanFilter(n_points=68)
|
||||
obs = _random_lm()
|
||||
for i in range(100):
|
||||
kf.update(obs, dt=1.0 / 30.0)
|
||||
|
||||
|
||||
# ── Process and observation noise parameters ──────────────────────────────────
|
||||
|
||||
class TestNoiseParameters:
|
||||
|
||||
def test_high_obs_noise_smooths_more(self):
|
||||
"""Higher obs noise → output moves less per step (more smoothing)."""
|
||||
rng = np.random.default_rng(99)
|
||||
base = rng.uniform(0.2, 0.8, size=(68, 2)).astype(np.float32)
|
||||
|
||||
kf_low = LandmarkKalmanFilter(n_points=68, obs_noise_std=0.001)
|
||||
kf_high = LandmarkKalmanFilter(n_points=68, obs_noise_std=0.1)
|
||||
|
||||
for i in range(10):
|
||||
obs = (base + rng.normal(0, 0.02, size=(68, 2))).astype(np.float32)
|
||||
kf_low.update(obs, dt=1.0 / 30.0)
|
||||
kf_high.update(obs, dt=1.0 / 30.0)
|
||||
|
||||
obs_new = (base + rng.normal(0, 0.02, size=(68, 2))).astype(np.float32)
|
||||
prev_low = kf_low._x[:, :2].copy()
|
||||
prev_high = kf_high._x[:, :2].copy()
|
||||
kf_low.update(obs_new, dt=1.0 / 30.0)
|
||||
kf_high.update(obs_new, dt=1.0 / 30.0)
|
||||
|
||||
delta_low = np.abs(kf_low._x[:, :2] - prev_low).mean()
|
||||
delta_high = np.abs(kf_high._x[:, :2] - prev_high).mean()
|
||||
assert delta_high < delta_low
|
||||
|
||||
def test_known_stationary_value(self):
|
||||
"""With very low noise a stationary signal should converge closely."""
|
||||
kf = LandmarkKalmanFilter(n_points=1, process_noise_std=1e-4, obs_noise_std=1e-4)
|
||||
obs = np.array([[0.5, 0.5]], dtype=np.float32)
|
||||
for _ in range(50):
|
||||
out = kf.update(obs, dt=1.0 / 30.0)
|
||||
np.testing.assert_allclose(out, obs, atol=0.001)
|
||||
|
||||
|
||||
# ── Track key helper (reimplemented to mirror node logic without importing ROS) ─
|
||||
|
||||
def _track_key(person_id: int, face_id: int) -> int:
|
||||
"""Mirror of LandmarkSmootherNode._track_key logic."""
|
||||
return person_id if person_id >= 0 else -(face_id + 1)
|
||||
|
||||
|
||||
class TestTrackKeyLogic:
|
||||
|
||||
def test_positive_person_id_used(self):
|
||||
assert _track_key(person_id=3, face_id=7) == 3
|
||||
|
||||
def test_zero_person_id_used(self):
|
||||
assert _track_key(person_id=0, face_id=7) == 0
|
||||
|
||||
def test_minus_one_person_id_uses_face_id(self):
|
||||
key = _track_key(person_id=-1, face_id=2)
|
||||
assert key == -(2 + 1) # == -3
|
||||
|
||||
def test_different_face_ids_give_different_keys(self):
|
||||
k0 = _track_key(person_id=-1, face_id=0)
|
||||
k1 = _track_key(person_id=-1, face_id=1)
|
||||
assert k0 != k1
|
||||
|
||||
def test_person_key_does_not_collide_with_face_key(self):
|
||||
"""Person ID 3 and face_id 3 should not produce the same key."""
|
||||
kp = _track_key(person_id=3, face_id=99)
|
||||
kf = _track_key(person_id=-1, face_id=3)
|
||||
assert kp != kf
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
pytest.main([__file__, '-v'])
|
||||
@ -46,6 +46,9 @@ rosidl_generate_interfaces(${PROJECT_NAME}
|
||||
"msg/MeshHandoff.msg"
|
||||
# Issue #221 — hand gesture pointing direction
|
||||
"msg/PointingTarget.msg"
|
||||
# Issue #227 — face landmark smoothing
|
||||
"msg/FaceLandmarks.msg"
|
||||
"msg/FaceLandmarksArray.msg"
|
||||
DEPENDENCIES std_msgs geometry_msgs builtin_interfaces
|
||||
)
|
||||
|
||||
|
||||
@ -0,0 +1,15 @@
|
||||
# FaceLandmarks.msg — 68-point facial landmark positions (Issue #227)
|
||||
#
|
||||
# Companion to FaceDetection.msg (which carries 5-point SCRFD keypoints).
|
||||
# Carries full 68-point MediaPipe/dlib-style landmarks for downstream use
|
||||
# (smoothing, expression analysis, gaze estimation, etc.).
|
||||
#
|
||||
# landmarks: flat array [x0,y0, x1,y1, ..., x67,y67] — 136 float32 values.
|
||||
# Coordinates are normalised to [0,1] relative to the image (same convention
|
||||
# as MediaPipe FaceMesh nx, ny).
|
||||
#
|
||||
std_msgs/Header header
|
||||
int32 face_id # matches FaceDetection.face_id (-1 if unset)
|
||||
int32 person_id # matches gallery person ID (-1 = unidentified)
|
||||
float32[] landmarks # 136 floats: x0,y0, x1,y1 … x67,y67 (normalised)
|
||||
float32 confidence # detector confidence [0,1]
|
||||
@ -0,0 +1,3 @@
|
||||
# FaceLandmarksArray.msg — array of 68-point landmark sets (Issue #227)
|
||||
std_msgs/Header header
|
||||
FaceLandmarks[] faces
|
||||
Loading…
x
Reference in New Issue
Block a user