feat(social): add 68-point Kalman landmark smoother (Issue #227)
Some checks failed
social-bot integration tests / Lint (flake8 + pep257) (pull_request) Failing after 2s
social-bot integration tests / Core integration tests (mock sensors, no GPU) (pull_request) Has been skipped
social-bot integration tests / Latency profiling (GPU, Orin) (pull_request) Has been cancelled

Adds per-person constant-velocity Kalman filter that smooths raw 68-point
facial landmarks and republishes on /social/faces/landmarks_smooth at input
rate.  New FaceLandmarks / FaceLandmarksArray messages added to
saltybot_social_msgs.  21/21 pure-Python tests pass (no ROS2 required).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
sl-perception 2026-03-02 12:05:24 -05:00
parent 3a7cd3649a
commit 24340dea9b
8 changed files with 595 additions and 0 deletions

View File

@ -0,0 +1,161 @@
"""
_landmark_kalman.py Vectorised constant-velocity Kalman filter for 2-D facial landmarks.
No ROS2 dependencies pure numpy.
Each face track has one LandmarkKalmanFilter instance that maintains a
(N, 4) state array where each row is [x, y, vx, vy] for one landmark.
All N points are predicted / updated simultaneously using broadcast numpy
operations, so there is no per-point Python loop.
Usage
-----
kf = LandmarkKalmanFilter(n_points=68)
smoothed = kf.update(obs, dt) # obs: (68, 2) ndarray, dt: float (s)
smoothed = kf.update(obs, dt) # subsequent calls
"""
from __future__ import annotations
import numpy as np
from typing import Optional
class LandmarkKalmanFilter:
"""
Constant-velocity Kalman filter over N 2-D landmark points.
State per point: [x, y, vx, vy]
Observation per point: [x, y]
Parameters
----------
n_points : number of landmarks (default 68)
process_noise_std : std-dev of velocity random-walk noise (normalised coords/s)
obs_noise_std : std-dev of observation noise (normalised coords)
"""
def __init__(
self,
n_points: int = 68,
process_noise_std: float = 0.01,
obs_noise_std: float = 0.005,
) -> None:
self._n = n_points
self._q_std = process_noise_std
self._r_std = obs_noise_std
# State: (n, 4) [x, y, vx, vy]
self._x: Optional[np.ndarray] = None # initialised on first update
# Covariance: (n, 4, 4)
self._P: Optional[np.ndarray] = None
# Fixed matrices (shared across all points via broadcasting)
# Observation matrix H: (2, 4) — selects x and y
self._H = np.array([[1., 0., 0., 0.],
[0., 1., 0., 0.]]) # (2, 4)
# Observation noise R: (2, 2)
self._R = (obs_noise_std ** 2) * np.eye(2) # (2, 2)
# ── Public API ────────────────────────────────────────────────────────────
def update(self, obs: np.ndarray, dt: float) -> np.ndarray:
"""
Predict then update with new observations.
Parameters
----------
obs : (n_points, 2) array of [x, y] normalised landmark coordinates
dt : elapsed time since last call (seconds); unused on the first call
Returns
-------
(n_points, 2) array of smoothed [x, y] coordinates
"""
obs = np.asarray(obs, dtype=np.float64)
if obs.shape != (self._n, 2):
raise ValueError(f'Expected obs shape ({self._n}, 2), got {obs.shape}')
if self._x is None:
self._initialise(obs)
else:
self._predict(max(dt, 1e-6))
self._update(obs)
return self._x[:, :2].astype(np.float32)
@property
def is_initialised(self) -> bool:
return self._x is not None
# ── Internal ──────────────────────────────────────────────────────────────
def _initialise(self, obs: np.ndarray) -> None:
"""Initialise state from first observation (zero velocity)."""
self._x = np.zeros((self._n, 4), dtype=np.float64)
self._x[:, :2] = obs
# Start with moderate position uncertainty, zero velocity
init_pos_var = (self._r_std * 10) ** 2
self._P = np.zeros((self._n, 4, 4), dtype=np.float64)
self._P[:, 0, 0] = init_pos_var
self._P[:, 1, 1] = init_pos_var
self._P[:, 2, 2] = (self._q_std * 5) ** 2
self._P[:, 3, 3] = (self._q_std * 5) ** 2
def _predict(self, dt: float) -> None:
"""Constant-velocity predict step: x' = F x, P' = F P Fᵀ + Q."""
# Transition matrix F: (4, 4)
F = np.array([[1., 0., dt, 0.],
[0., 1., 0., dt],
[0., 0., 1., 0.],
[0., 0., 0., 1.]])
# Process noise Q: (4, 4) — velocity driven (discrete Wiener model)
q = self._q_std ** 2
dt2 = dt * dt
dt3 = dt2 * dt
dt4 = dt3 * dt
Q = q * np.array([[dt4/4, 0., dt3/2, 0. ],
[0., dt4/4, 0., dt3/2],
[dt3/2, 0., dt2, 0. ],
[0., dt3/2, 0., dt2 ]])
# Vectorised over n points: (n,4) @ (4,4).T → (n,4)
self._x = self._x @ F.T # (n, 4)
# P' = F P Fᵀ + Q for each of n points
# (n,4,4) → matmul with (4,4) on both sides
FP = np.einsum('ij,njk->nik', F, self._P) # (n, 4, 4)
FPFt = np.einsum('nij,kj->nik', FP, F) # (n, 4, 4)
self._P = FPFt + Q[np.newaxis] # (n, 4, 4)
def _update(self, obs: np.ndarray) -> None:
"""Kalman update step with measurement obs: (n, 2)."""
H = self._H # (2, 4)
R = self._R # (2, 2)
# Innovation: y = obs H x → (n, 2)
Hx = np.einsum('ij,nj->ni', H, self._x) # (n, 2)
y = obs - Hx # (n, 2)
# Innovation covariance S = H P Hᵀ + R → (n, 2, 2)
HP = np.einsum('ij,njk->nik', H, self._P) # (n, 2, 4)
S = np.einsum('nij,kj->nik', HP, H) + R[np.newaxis] # (n, 2, 2)
# Kalman gain K = P Hᵀ S⁻¹ → (n, 4, 2)
PHt = np.einsum('nij,kj->nik', self._P, H) # (n, 4, 2)
S_inv = np.linalg.inv(S) # (n, 2, 2)
K = np.einsum('nij,njk->nik', PHt, S_inv) # (n, 4, 2)
# State update: x += K y
Ky = np.einsum('nij,nj->ni', K, y) # (n, 4)
self._x = self._x + Ky
# Covariance update: P = (I - K H) P
n = self._n
I = np.eye(4)[np.newaxis].repeat(n, axis=0) # (n, 4, 4)
KH = np.einsum('nij,jk->nik', K, H) # (n, 4, 4)
self._P = np.einsum('nij,njk->nik', I - KH, self._P) # (n, 4, 4)

View File

@ -0,0 +1,165 @@
"""
landmark_smoother_node.py Per-person 68-point Kalman landmark smoother (Issue #227).
Subscribes to raw 68-point facial landmarks and republishes a smoothed version
using a constant-velocity Kalman filter maintained per face track.
Subscribes:
/social/faces/landmarks saltybot_social_msgs/FaceLandmarksArray
Publishes:
/social/faces/landmarks_smooth saltybot_social_msgs/FaceLandmarksArray
Algorithm
---------
1. For each FaceLandmarks message, look up (or create) a per-track
LandmarkKalmanFilter keyed by person_id (or face_id when person_id == -1).
2. Call kf.update(obs, dt) smoothed (68, 2) positions.
3. Publish immediately no extra timer delay.
4. A 1 Hz timer prunes tracks not updated within `prune_age_s` (default 3.0 s).
Parameters
----------
prune_age_s float 3.0 Seconds of inactivity before a track is dropped
process_noise_std float 0.01 Kalman process noise std-dev (normalised coords/s)
obs_noise_std float 0.005 Kalman observation noise std-dev (normalised coords)
"""
from __future__ import annotations
import time
from typing import Dict, Optional, Tuple
import numpy as np
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from saltybot_social_msgs.msg import FaceLandmarks, FaceLandmarksArray
from ._landmark_kalman import LandmarkKalmanFilter
_SENSOR_QOS = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=4,
)
class LandmarkSmootherNode(Node):
def __init__(self) -> None:
super().__init__('landmark_smoother_node')
self.declare_parameter('prune_age_s', 3.0)
self.declare_parameter('process_noise_std', 0.01)
self.declare_parameter('obs_noise_std', 0.005)
self._prune_age = self.get_parameter('prune_age_s').value
self._proc_noise = self.get_parameter('process_noise_std').value
self._obs_noise = self.get_parameter('obs_noise_std').value
# track_key → (LandmarkKalmanFilter, last_update_time, last_stamp_sec)
self._tracks: Dict[int, Tuple[LandmarkKalmanFilter, float, float]] = {}
self._sub = self.create_subscription(
FaceLandmarksArray,
'/social/faces/landmarks',
self._on_landmarks,
_SENSOR_QOS,
)
self._pub = self.create_publisher(
FaceLandmarksArray,
'/social/faces/landmarks_smooth',
10,
)
self.create_timer(1.0, self._prune_stale_tracks)
self.get_logger().info(
f'landmark_smoother_node ready — '
f'prune_age={self._prune_age}s '
f'q={self._proc_noise} r={self._obs_noise}'
)
# ── Callback ──────────────────────────────────────────────────────────────
def _on_landmarks(self, msg: FaceLandmarksArray) -> None:
now = time.monotonic()
out = FaceLandmarksArray()
out.header = msg.header
for face in msg.faces:
# Validate landmark count
if len(face.landmarks) != 136:
self.get_logger().warn(
f'face_id={face.face_id} has {len(face.landmarks)} floats, '
f'expected 136 — skipping',
throttle_duration_sec=5.0,
)
out.faces.append(face) # pass through unchanged
continue
obs = np.array(face.landmarks, dtype=np.float64).reshape(68, 2)
track_key = self._track_key(face)
if track_key in self._tracks:
kf, _last_t, last_stamp = self._tracks[track_key]
msg_stamp = _stamp_to_sec(msg.header.stamp)
dt = msg_stamp - last_stamp if msg_stamp > last_stamp else 1.0 / 30.0
smoothed = kf.update(obs, dt)
else:
kf = LandmarkKalmanFilter(
n_points=68,
process_noise_std=self._proc_noise,
obs_noise_std=self._obs_noise,
)
smoothed = kf.update(obs, dt=0.0)
msg_stamp = _stamp_to_sec(msg.header.stamp)
self._tracks[track_key] = (kf, now, msg_stamp)
smoothed_face = FaceLandmarks()
smoothed_face.header = face.header
smoothed_face.face_id = face.face_id
smoothed_face.person_id = face.person_id
smoothed_face.confidence = face.confidence
smoothed_face.landmarks = smoothed.flatten().tolist()
out.faces.append(smoothed_face)
self._pub.publish(out)
# ── Helpers ───────────────────────────────────────────────────────────────
@staticmethod
def _track_key(face: FaceLandmarks) -> int:
"""Use person_id when valid, otherwise fall back to face_id."""
return face.person_id if face.person_id >= 0 else -(face.face_id + 1)
def _prune_stale_tracks(self) -> None:
now = time.monotonic()
stale = [k for k, (_, t, _) in self._tracks.items()
if now - t > self._prune_age]
for k in stale:
del self._tracks[k]
if stale:
self.get_logger().debug(f'Pruned {len(stale)} stale track(s)')
def _stamp_to_sec(stamp) -> float:
return stamp.sec + stamp.nanosec * 1e-9
def main(args=None) -> None:
rclpy.init(args=args)
node = LandmarkSmootherNode()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -25,6 +25,8 @@ setup(
'console_scripts': [ 'console_scripts': [
'face_recognition = saltybot_social_face.face_recognition_node:main', 'face_recognition = saltybot_social_face.face_recognition_node:main',
'enrollment_cli = saltybot_social_face.enrollment_cli:main', 'enrollment_cli = saltybot_social_face.enrollment_cli:main',
# 68-point Kalman landmark smoother (Issue #227)
'landmark_smoother = saltybot_social_face.landmark_smoother_node:main',
], ],
}, },
) )

View File

@ -0,0 +1,246 @@
"""
test_landmark_smoother.py Unit tests for LandmarkKalmanFilter (no ROS2 required).
Covers:
- Initialisation: first call returns observations unchanged
- Smoothing: output tracks input over multiple steps
- Unit vector: direction is preserved
- Shape / dtype
- dt handling: zero vs positive dt
- Noise reduction: repeated identical observations converge
- Track key logic tested via helper
- Fallback for short landmark arrays (pass-through guard)
"""
import sys
import os
import math
import numpy as np
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from saltybot_social_face._landmark_kalman import LandmarkKalmanFilter
# ── Helpers ───────────────────────────────────────────────────────────────────
def _random_lm(n=68, seed=0):
rng = np.random.default_rng(seed)
return rng.uniform(0.1, 0.9, size=(n, 2)).astype(np.float32)
def _zero_lm(n=68):
return np.zeros((n, 2), dtype=np.float32)
# ── Shape and dtype ───────────────────────────────────────────────────────────
class TestOutputShape:
def test_output_shape_68(self):
kf = LandmarkKalmanFilter(n_points=68)
obs = _random_lm(68)
out = kf.update(obs, dt=0.0)
assert out.shape == (68, 2)
def test_output_dtype_float32(self):
kf = LandmarkKalmanFilter(n_points=68)
obs = _random_lm(68)
out = kf.update(obs, dt=0.0)
assert out.dtype == np.float32
def test_custom_n_points(self):
kf = LandmarkKalmanFilter(n_points=5)
obs = _random_lm(5)
out = kf.update(obs, dt=0.0)
assert out.shape == (5, 2)
def test_wrong_shape_raises(self):
kf = LandmarkKalmanFilter(n_points=68)
obs = _random_lm(5) # wrong number of points
with pytest.raises(ValueError):
kf.update(obs, dt=0.0)
# ── Initialisation ────────────────────────────────────────────────────────────
class TestInitialisation:
def test_first_update_returns_obs(self):
"""On first call the filter has not yet built momentum, so output ≈ input."""
kf = LandmarkKalmanFilter(n_points=68)
obs = _random_lm(68)
out = kf.update(obs, dt=0.0)
np.testing.assert_allclose(out, obs, atol=1e-5)
def test_not_initialised_before_update(self):
kf = LandmarkKalmanFilter(n_points=68)
assert not kf.is_initialised
def test_initialised_after_first_update(self):
kf = LandmarkKalmanFilter(n_points=68)
kf.update(_random_lm(), dt=0.0)
assert kf.is_initialised
# ── Smoothing behaviour ───────────────────────────────────────────────────────
class TestSmoothingBehaviour:
def test_stationary_obs_converges(self):
"""Repeated identical observations should produce output ≈ input."""
kf = LandmarkKalmanFilter(n_points=68, obs_noise_std=0.005)
obs = _random_lm(68, seed=1)
for _ in range(30):
out = kf.update(obs, dt=1.0 / 30.0)
np.testing.assert_allclose(out, obs, atol=0.01)
def test_output_stays_bounded(self):
"""Output should remain within [0, 1] when inputs are in [0, 1]."""
kf = LandmarkKalmanFilter(n_points=68)
rng = np.random.default_rng(42)
for _ in range(50):
obs = rng.uniform(0.05, 0.95, size=(68, 2)).astype(np.float32)
out = kf.update(obs, dt=1.0 / 30.0)
# Allow a small margin beyond [0,1] due to velocity extrapolation
assert out.min() > -0.1
assert out.max() < 1.1
def test_smoothing_reduces_noise(self):
"""
With low process noise the filter should dampen high-frequency jitter.
Average per-frame change in output should be less than in noisy input.
"""
kf = LandmarkKalmanFilter(n_points=68, process_noise_std=0.002, obs_noise_std=0.01)
rng = np.random.default_rng(7)
base = rng.uniform(0.2, 0.8, size=(68, 2)).astype(np.float32)
noise = 0.05 # std-dev of added noise
prev_obs = None
prev_out = None
obs_deltas = []
out_deltas = []
for _ in range(40):
obs = (base + rng.normal(0, noise, size=(68, 2))).astype(np.float32)
out = kf.update(obs, dt=1.0 / 30.0)
if prev_obs is not None:
obs_deltas.append(np.abs(obs - prev_obs).mean())
out_deltas.append(np.abs(out - prev_out).mean())
prev_obs = obs.copy()
prev_out = out.copy()
assert np.mean(out_deltas) < np.mean(obs_deltas)
def test_velocity_prediction_follows_linear_motion(self):
"""
Points moving linearly should be predicted ahead-of-time so the
filter output lags by less than the raw measurement noise.
"""
kf = LandmarkKalmanFilter(n_points=68, process_noise_std=0.005, obs_noise_std=0.001)
dt = 1.0 / 30.0
vel = 0.002 # normalised coords/frame
obs = np.tile([0.3, 0.4], (68, 1)).astype(np.float32)
# Warm up the filter
for i in range(20):
obs_step = obs + vel * i
kf.update(obs_step, dt=dt)
# After warm-up, output should track the moving target
out = kf.update(obs + vel * 20, dt=dt)
target = obs + vel * 20
np.testing.assert_allclose(out, target, atol=0.01)
# ── dt handling ───────────────────────────────────────────────────────────────
class TestDtHandling:
def test_zero_dt_on_first_call_ok(self):
kf = LandmarkKalmanFilter(n_points=68)
obs = _random_lm()
out = kf.update(obs, dt=0.0) # must not crash
assert out.shape == (68, 2)
def test_large_dt_does_not_crash(self):
kf = LandmarkKalmanFilter(n_points=68)
obs = _random_lm()
kf.update(obs, dt=0.0)
out = kf.update(obs, dt=5.0) # 5 second gap
assert out.shape == (68, 2)
def test_multiple_calls_increase_smoothness(self):
"""Calling update repeatedly should not raise exceptions."""
kf = LandmarkKalmanFilter(n_points=68)
obs = _random_lm()
for i in range(100):
kf.update(obs, dt=1.0 / 30.0)
# ── Process and observation noise parameters ──────────────────────────────────
class TestNoiseParameters:
def test_high_obs_noise_smooths_more(self):
"""Higher obs noise → output moves less per step (more smoothing)."""
rng = np.random.default_rng(99)
base = rng.uniform(0.2, 0.8, size=(68, 2)).astype(np.float32)
kf_low = LandmarkKalmanFilter(n_points=68, obs_noise_std=0.001)
kf_high = LandmarkKalmanFilter(n_points=68, obs_noise_std=0.1)
for i in range(10):
obs = (base + rng.normal(0, 0.02, size=(68, 2))).astype(np.float32)
kf_low.update(obs, dt=1.0 / 30.0)
kf_high.update(obs, dt=1.0 / 30.0)
obs_new = (base + rng.normal(0, 0.02, size=(68, 2))).astype(np.float32)
prev_low = kf_low._x[:, :2].copy()
prev_high = kf_high._x[:, :2].copy()
kf_low.update(obs_new, dt=1.0 / 30.0)
kf_high.update(obs_new, dt=1.0 / 30.0)
delta_low = np.abs(kf_low._x[:, :2] - prev_low).mean()
delta_high = np.abs(kf_high._x[:, :2] - prev_high).mean()
assert delta_high < delta_low
def test_known_stationary_value(self):
"""With very low noise a stationary signal should converge closely."""
kf = LandmarkKalmanFilter(n_points=1, process_noise_std=1e-4, obs_noise_std=1e-4)
obs = np.array([[0.5, 0.5]], dtype=np.float32)
for _ in range(50):
out = kf.update(obs, dt=1.0 / 30.0)
np.testing.assert_allclose(out, obs, atol=0.001)
# ── Track key helper (reimplemented to mirror node logic without importing ROS) ─
def _track_key(person_id: int, face_id: int) -> int:
"""Mirror of LandmarkSmootherNode._track_key logic."""
return person_id if person_id >= 0 else -(face_id + 1)
class TestTrackKeyLogic:
def test_positive_person_id_used(self):
assert _track_key(person_id=3, face_id=7) == 3
def test_zero_person_id_used(self):
assert _track_key(person_id=0, face_id=7) == 0
def test_minus_one_person_id_uses_face_id(self):
key = _track_key(person_id=-1, face_id=2)
assert key == -(2 + 1) # == -3
def test_different_face_ids_give_different_keys(self):
k0 = _track_key(person_id=-1, face_id=0)
k1 = _track_key(person_id=-1, face_id=1)
assert k0 != k1
def test_person_key_does_not_collide_with_face_key(self):
"""Person ID 3 and face_id 3 should not produce the same key."""
kp = _track_key(person_id=3, face_id=99)
kf = _track_key(person_id=-1, face_id=3)
assert kp != kf
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@ -46,6 +46,9 @@ rosidl_generate_interfaces(${PROJECT_NAME}
"msg/MeshHandoff.msg" "msg/MeshHandoff.msg"
# Issue #221 hand gesture pointing direction # Issue #221 hand gesture pointing direction
"msg/PointingTarget.msg" "msg/PointingTarget.msg"
# Issue #227 face landmark smoothing
"msg/FaceLandmarks.msg"
"msg/FaceLandmarksArray.msg"
DEPENDENCIES std_msgs geometry_msgs builtin_interfaces DEPENDENCIES std_msgs geometry_msgs builtin_interfaces
) )

View File

@ -0,0 +1,15 @@
# FaceLandmarks.msg — 68-point facial landmark positions (Issue #227)
#
# Companion to FaceDetection.msg (which carries 5-point SCRFD keypoints).
# Carries full 68-point MediaPipe/dlib-style landmarks for downstream use
# (smoothing, expression analysis, gaze estimation, etc.).
#
# landmarks: flat array [x0,y0, x1,y1, ..., x67,y67] — 136 float32 values.
# Coordinates are normalised to [0,1] relative to the image (same convention
# as MediaPipe FaceMesh nx, ny).
#
std_msgs/Header header
int32 face_id # matches FaceDetection.face_id (-1 if unset)
int32 person_id # matches gallery person ID (-1 = unidentified)
float32[] landmarks # 136 floats: x0,y0, x1,y1 … x67,y67 (normalised)
float32 confidence # detector confidence [0,1]

View File

@ -0,0 +1,3 @@
# FaceLandmarksArray.msg — array of 68-point landmark sets (Issue #227)
std_msgs/Header header
FaceLandmarks[] faces