feat(social): proximity-based greeting trigger (Issue #270) #272

Merged
sl-jetson merged 1 commits from sl-jetson/issue-270-greeting-trigger into main 2026-03-02 17:30:54 -05:00
5 changed files with 671 additions and 0 deletions

View File

@ -0,0 +1,8 @@
greeting_trigger_node:
ros__parameters:
proximity_m: 2.0 # Trigger when person is within this distance (m)
cooldown_s: 300.0 # Re-greeting suppression window per face_id (s)
unknown_distance: 0.0 # Distance assumed when PersonState not yet available
# 0.0 → always greet faces with no state yet
faces_topic: "/social/faces/detected"
states_topic: "/social/person_states"

View File

@ -0,0 +1,39 @@
"""greeting_trigger.launch.py -- Launch proximity-based greeting trigger (Issue #270).
Usage:
ros2 launch saltybot_social greeting_trigger.launch.py
ros2 launch saltybot_social greeting_trigger.launch.py proximity_m:=1.5 cooldown_s:=120.0
"""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
pkg = get_package_share_directory("saltybot_social")
cfg = os.path.join(pkg, "config", "greeting_trigger_params.yaml")
return LaunchDescription([
DeclareLaunchArgument("proximity_m", default_value="2.0",
description="Greeting proximity threshold (m)"),
DeclareLaunchArgument("cooldown_s", default_value="300.0",
description="Per-face_id re-greeting cooldown (s)"),
Node(
package="saltybot_social",
executable="greeting_trigger_node",
name="greeting_trigger_node",
output="screen",
parameters=[
cfg,
{
"proximity_m": LaunchConfiguration("proximity_m"),
"cooldown_s": LaunchConfiguration("cooldown_s"),
},
],
),
])

View File

@ -0,0 +1,150 @@
"""greeting_trigger_node.py -- Proximity-based greeting trigger.
Issue #270
Monitors face detections and person states. When a new face_id is seen
within ``proximity_m`` metres (default 2 m) and has not been greeted within
``cooldown_s`` seconds, publishes a JSON greeting trigger on
/saltybot/greeting_trigger.
Distance is looked up from the /social/person_states topic which carries a
face_id distance mapping. When no state is available for a face the node
applies a configurable default distance so it can still fire on face-only
pipelines.
Subscriptions:
/social/faces/detected saltybot_social_msgs/FaceDetectionArray
/social/person_states saltybot_social_msgs/PersonStateArray
Publication:
/saltybot/greeting_trigger std_msgs/String (JSON)
{"face_id": <int>, "person_name": <str>, "distance_m": <float>,
"ts": <float unix epoch>}
Parameters:
proximity_m (float, 2.0) -- trigger when distance <= this
cooldown_s (float, 300.0) -- suppress re-greeting same face_id
unknown_distance (float, 0.0) -- distance assumed when PersonState
is not yet available (0.0 always
trigger for unknown faces)
faces_topic (str, "/social/faces/detected")
states_topic (str, "/social/person_states")
"""
from __future__ import annotations
import json
import time
import threading
from typing import Dict
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile
from std_msgs.msg import String
# Custom messages — imported at runtime so offline tests can stub them
try:
from saltybot_social_msgs.msg import FaceDetectionArray, PersonStateArray
_MSGS = True
except ImportError:
_MSGS = False
class GreetingTriggerNode(Node):
"""Publishes greeting trigger when a person enters proximity."""
def __init__(self) -> None:
super().__init__("greeting_trigger_node")
self.declare_parameter("proximity_m", 2.0)
self.declare_parameter("cooldown_s", 300.0)
self.declare_parameter("unknown_distance", 0.0)
self.declare_parameter("faces_topic", "/social/faces/detected")
self.declare_parameter("states_topic", "/social/person_states")
self._proximity = self.get_parameter("proximity_m").value
self._cooldown = self.get_parameter("cooldown_s").value
self._unknown_dist = self.get_parameter("unknown_distance").value
faces_topic = self.get_parameter("faces_topic").value
states_topic = self.get_parameter("states_topic").value
# face_id → last known distance (m); updated from PersonStateArray
self._distance_cache: Dict[int, float] = {}
# face_id → unix timestamp of last greeting
self._last_greeted: Dict[int, float] = {}
self._lock = threading.Lock()
qos = QoSProfile(depth=10)
self._pub = self.create_publisher(String, "/saltybot/greeting_trigger", qos)
if _MSGS:
self._states_sub = self.create_subscription(
PersonStateArray, states_topic, self._on_person_states, qos
)
self._faces_sub = self.create_subscription(
FaceDetectionArray, faces_topic, self._on_faces, qos
)
else:
self.get_logger().warn(
"saltybot_social_msgs not available — node is passive (no subscriptions)"
)
self.get_logger().info(
f"GreetingTriggerNode ready "
f"(proximity={self._proximity}m, cooldown={self._cooldown}s)"
)
# ── Callbacks ──────────────────────────────────────────────────────────
def _on_person_states(self, msg: "PersonStateArray") -> None:
"""Cache face_id → distance from incoming PersonState array."""
with self._lock:
for ps in msg.persons:
if ps.face_id >= 0:
self._distance_cache[ps.face_id] = float(ps.distance)
def _on_faces(self, msg: "FaceDetectionArray") -> None:
"""Evaluate each detected face; fire greeting if conditions met."""
now = time.monotonic()
with self._lock:
for face in msg.faces:
fid = int(face.face_id)
dist = self._distance_cache.get(fid, self._unknown_dist)
if dist > self._proximity:
continue # too far
last = self._last_greeted.get(fid, 0.0)
if now - last < self._cooldown:
continue # still in cooldown
# Fire!
self._last_greeted[fid] = now
self._fire(fid, str(face.person_name), dist)
def _fire(self, face_id: int, person_name: str, distance_m: float) -> None:
payload = {
"face_id": face_id,
"person_name": person_name,
"distance_m": round(distance_m, 3),
"ts": time.time(),
}
msg = String()
msg.data = json.dumps(payload)
self._pub.publish(msg)
self.get_logger().info(
f"Greeting trigger: face_id={face_id} name={person_name!r} "
f"dist={distance_m:.2f}m"
)
def main(args=None) -> None:
rclpy.init(args=args)
node = GreetingTriggerNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()

View File

@ -47,6 +47,8 @@ setup(
'vad_node = saltybot_social.vad_node:main', 'vad_node = saltybot_social.vad_node:main',
# Ambient sound classifier — mel-spectrogram (Issue #252) # Ambient sound classifier — mel-spectrogram (Issue #252)
'ambient_sound_node = saltybot_social.ambient_sound_node:main', 'ambient_sound_node = saltybot_social.ambient_sound_node:main',
# Proximity-based greeting trigger (Issue #270)
'greeting_trigger_node = saltybot_social.greeting_trigger_node:main',
], ],
}, },
) )

View File

@ -0,0 +1,472 @@
"""test_greeting_trigger.py -- Offline tests for greeting_trigger_node (Issue #270).
Stubs out rclpy and saltybot_social_msgs so tests run without a ROS install.
"""
import importlib
import json
import sys
import time
import types
import unittest
# ── ROS2 / message stubs ──────────────────────────────────────────────────────
def _make_ros_stubs():
"""Install minimal stubs for rclpy and message packages."""
for mod_name in ("rclpy", "rclpy.node", "rclpy.qos",
"std_msgs", "std_msgs.msg",
"saltybot_social_msgs", "saltybot_social_msgs.msg"):
sys.modules[mod_name] = types.ModuleType(mod_name)
class _Node:
def __init__(self, name):
self._name = name
# Preserve _params if pre-set by _make_node (super().__init__() is
# called from GreetingTriggerNode.__init__, so don't reset here)
if not hasattr(self, '_params'):
self._params = {}
self._pubs = {}
self._subs = {}
self._logs = []
def declare_parameter(self, name, default):
# Don't overwrite values pre-set by _make_node
if name not in self._params:
self._params[name] = default
def get_parameter(self, name):
class _P:
def __init__(self, v):
self.value = v
return _P(self._params[name])
def create_publisher(self, msg_type, topic, qos):
pub = _FakePub()
self._pubs[topic] = pub
return pub
def create_subscription(self, msg_type, topic, cb, qos):
self._subs[topic] = cb
return object()
def get_logger(self):
node = self
class _L:
def info(self, m): node._logs.append(("INFO", m))
def warn(self, m): node._logs.append(("WARN", m))
def error(self, m): node._logs.append(("ERROR", m))
return _L()
def destroy_node(self): pass
class _FakePub:
def __init__(self):
self.msgs = []
def publish(self, msg):
self.msgs.append(msg)
class _QoSProfile:
def __init__(self, depth=10): self.depth = depth
class _String:
def __init__(self): self.data = ""
# rclpy
rclpy_mod = sys.modules["rclpy"]
rclpy_mod.init = lambda args=None: None
rclpy_mod.spin = lambda node: None
rclpy_mod.shutdown = lambda: None
# rclpy.node
sys.modules["rclpy.node"].Node = _Node
# rclpy.qos
sys.modules["rclpy.qos"].QoSProfile = _QoSProfile
# std_msgs.msg
sys.modules["std_msgs.msg"].String = _String
# saltybot_social_msgs.msg (FaceDetectionArray + PersonStateArray)
class _FaceDetection:
def __init__(self, face_id=0, person_name="", confidence=1.0):
self.face_id = face_id
self.person_name = person_name
self.confidence = confidence
class _FaceDetectionArray:
def __init__(self, faces=None):
self.faces = faces or []
class _PersonState:
def __init__(self, face_id=0, distance=0.0):
self.face_id = face_id
self.distance = distance
class _PersonStateArray:
def __init__(self, persons=None):
self.persons = persons or []
msgs = sys.modules["saltybot_social_msgs.msg"]
msgs.FaceDetection = _FaceDetection
msgs.FaceDetectionArray = _FaceDetectionArray
msgs.PersonState = _PersonState
msgs.PersonStateArray = _PersonStateArray
return _Node, _FakePub, _QoSProfile, _String, _FaceDetection, _FaceDetectionArray, _PersonState, _PersonStateArray
_Node, _FakePub, _QoSProfile, _String, _FaceDetection, _FaceDetectionArray, _PersonState, _PersonStateArray = _make_ros_stubs()
# ── Load module under test ────────────────────────────────────────────────────
_SRC = (
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
"saltybot_social/saltybot_social/greeting_trigger_node.py"
)
def _load_mod():
spec = importlib.util.spec_from_file_location("greeting_trigger_node_testmod", _SRC)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
# ── Helpers ───────────────────────────────────────────────────────────────────
def _make_node(mod, **kwargs):
"""Instantiate GreetingTriggerNode with overridden parameters."""
node = mod.GreetingTriggerNode.__new__(mod.GreetingTriggerNode)
# Pre-populate _params BEFORE __init__ so super().__init__() (which calls
# _Node.__init__) sees them and skips reset due to hasattr guard.
defaults = {
"proximity_m": 2.0,
"cooldown_s": 300.0,
"unknown_distance": 0.0,
"faces_topic": "/social/faces/detected",
"states_topic": "/social/person_states",
}
defaults.update(kwargs)
node._params = dict(defaults)
mod.GreetingTriggerNode.__init__(node)
return node
def _face_msg(faces):
return _FaceDetectionArray(faces=faces)
def _state_msg(persons):
return _PersonStateArray(persons=persons)
# ── Test suites ───────────────────────────────────────────────────────────────
class TestNodeInit(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.mod = _load_mod()
def test_imports_cleanly(self):
self.assertTrue(hasattr(self.mod, "GreetingTriggerNode"))
def test_default_proximity(self):
node = _make_node(self.mod)
self.assertEqual(node._proximity, 2.0)
def test_default_cooldown(self):
node = _make_node(self.mod)
self.assertEqual(node._cooldown, 300.0)
def test_default_unknown_distance(self):
node = _make_node(self.mod)
self.assertEqual(node._unknown_dist, 0.0)
def test_pub_topic(self):
node = _make_node(self.mod)
self.assertIn("/saltybot/greeting_trigger", node._pubs)
def test_subs_registered(self):
node = _make_node(self.mod)
self.assertIn("/social/faces/detected", node._subs)
self.assertIn("/social/person_states", node._subs)
def test_initial_caches_empty(self):
node = _make_node(self.mod)
self.assertEqual(node._distance_cache, {})
self.assertEqual(node._last_greeted, {})
class TestDistanceCache(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.mod = _load_mod()
def setUp(self):
self.node = _make_node(self.mod)
def test_state_updates_cache(self):
ps = _PersonState(face_id=1, distance=1.5)
self.node._on_person_states(_state_msg([ps]))
self.assertAlmostEqual(self.node._distance_cache[1], 1.5)
def test_multiple_states_cached(self):
persons = [_PersonState(face_id=i, distance=float(i)) for i in range(5)]
self.node._on_person_states(_state_msg(persons))
for i in range(5):
self.assertAlmostEqual(self.node._distance_cache[i], float(i))
def test_state_update_overwrites(self):
self.node._on_person_states(_state_msg([_PersonState(face_id=1, distance=3.0)]))
self.node._on_person_states(_state_msg([_PersonState(face_id=1, distance=1.0)]))
self.assertAlmostEqual(self.node._distance_cache[1], 1.0)
def test_negative_face_id_ignored(self):
self.node._on_person_states(_state_msg([_PersonState(face_id=-1, distance=1.0)]))
self.assertNotIn(-1, self.node._distance_cache)
def test_zero_distance_cached(self):
self.node._on_person_states(_state_msg([_PersonState(face_id=5, distance=0.0)]))
self.assertAlmostEqual(self.node._distance_cache[5], 0.0)
class TestGreetingTrigger(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.mod = _load_mod()
def setUp(self):
self.node = _make_node(self.mod, proximity_m=2.0, cooldown_s=300.0)
self.pub = self.node._pubs["/saltybot/greeting_trigger"]
def _inject_distance(self, face_id, distance):
self.node._on_person_states(_state_msg([_PersonState(face_id=face_id, distance=distance)]))
def test_triggers_within_proximity(self):
self._inject_distance(1, 1.5)
self.node._on_faces(_face_msg([_FaceDetection(face_id=1, person_name="alice")]))
self.assertEqual(len(self.pub.msgs), 1)
def test_no_trigger_beyond_proximity(self):
self._inject_distance(2, 3.0)
self.node._on_faces(_face_msg([_FaceDetection(face_id=2, person_name="bob")]))
self.assertEqual(len(self.pub.msgs), 0)
def test_trigger_at_exact_proximity(self):
self._inject_distance(3, 2.0)
self.node._on_faces(_face_msg([_FaceDetection(face_id=3, person_name="carol")]))
self.assertEqual(len(self.pub.msgs), 1)
def test_no_trigger_just_beyond(self):
self._inject_distance(4, 2.001)
self.node._on_faces(_face_msg([_FaceDetection(face_id=4, person_name="dave")]))
self.assertEqual(len(self.pub.msgs), 0)
def test_cooldown_suppresses_retrigger(self):
self._inject_distance(5, 1.0)
face = _FaceDetection(face_id=5, person_name="eve")
self.node._on_faces(_face_msg([face]))
self.node._on_faces(_face_msg([face])) # second call in cooldown
self.assertEqual(len(self.pub.msgs), 1)
def test_cooldown_per_face_id(self):
self._inject_distance(6, 1.0)
self._inject_distance(7, 1.0)
self.node._on_faces(_face_msg([_FaceDetection(face_id=6, person_name="f")]))
self.node._on_faces(_face_msg([_FaceDetection(face_id=7, person_name="g")]))
self.assertEqual(len(self.pub.msgs), 2)
def test_expired_cooldown_retrigers(self):
self._inject_distance(8, 1.0)
face = _FaceDetection(face_id=8, person_name="hank")
self.node._on_faces(_face_msg([face]))
# Manually expire the cooldown
self.node._last_greeted[8] = time.monotonic() - 400.0
self.node._on_faces(_face_msg([face]))
self.assertEqual(len(self.pub.msgs), 2)
def test_unknown_face_uses_unknown_distance(self):
# unknown_distance=0.0 → should trigger (0.0 <= 2.0)
node = _make_node(self.mod, unknown_distance=0.0)
pub = node._pubs["/saltybot/greeting_trigger"]
node._on_faces(_face_msg([_FaceDetection(face_id=99, person_name="stranger")]))
self.assertEqual(len(pub.msgs), 1)
def test_unknown_face_large_distance_no_trigger(self):
# unknown_distance=10.0 → should NOT trigger
node = _make_node(self.mod, unknown_distance=10.0)
pub = node._pubs["/saltybot/greeting_trigger"]
node._on_faces(_face_msg([_FaceDetection(face_id=100, person_name="far")]))
self.assertEqual(len(pub.msgs), 0)
def test_multiple_faces_triggers_each_within_range(self):
self._inject_distance(10, 1.0)
self._inject_distance(11, 3.0) # out of range
faces = [
_FaceDetection(face_id=10, person_name="near"),
_FaceDetection(face_id=11, person_name="far"),
]
self.node._on_faces(_face_msg(faces))
self.assertEqual(len(self.pub.msgs), 1)
def test_empty_face_array_no_trigger(self):
self.node._on_faces(_face_msg([]))
self.assertEqual(len(self.pub.msgs), 0)
class TestPayload(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.mod = _load_mod()
def setUp(self):
self.node = _make_node(self.mod)
self.pub = self.node._pubs["/saltybot/greeting_trigger"]
def _trigger(self, face_id=1, person_name="alice", distance=1.5):
self.node._on_person_states(_state_msg([_PersonState(face_id=face_id, distance=distance)]))
self.node._on_faces(_face_msg([_FaceDetection(face_id=face_id, person_name=person_name)]))
def test_payload_is_json(self):
self._trigger()
payload = json.loads(self.pub.msgs[0].data)
self.assertIsInstance(payload, dict)
def test_payload_face_id(self):
self._trigger(face_id=42)
payload = json.loads(self.pub.msgs[0].data)
self.assertEqual(payload["face_id"], 42)
def test_payload_person_name(self):
self._trigger(person_name="zara")
payload = json.loads(self.pub.msgs[0].data)
self.assertEqual(payload["person_name"], "zara")
def test_payload_distance(self):
self._trigger(distance=1.234)
payload = json.loads(self.pub.msgs[0].data)
self.assertAlmostEqual(payload["distance_m"], 1.234, places=2)
def test_payload_has_ts(self):
self._trigger()
payload = json.loads(self.pub.msgs[0].data)
self.assertIn("ts", payload)
self.assertIsInstance(payload["ts"], float)
def test_ts_is_recent(self):
before = time.time()
self._trigger()
after = time.time()
payload = json.loads(self.pub.msgs[0].data)
self.assertGreaterEqual(payload["ts"], before)
self.assertLessEqual(payload["ts"], after + 1.0)
class TestNodeSrc(unittest.TestCase):
"""Source-level checks — verify node structure without instantiation."""
@classmethod
def setUpClass(cls):
with open(_SRC) as f:
cls.src = f.read()
def test_issue_tag(self):
self.assertIn("#270", self.src)
def test_pub_topic(self):
self.assertIn("/saltybot/greeting_trigger", self.src)
def test_faces_topic(self):
self.assertIn("/social/faces/detected", self.src)
def test_states_topic(self):
self.assertIn("/social/person_states", self.src)
def test_proximity_param(self):
self.assertIn("proximity_m", self.src)
def test_cooldown_param(self):
self.assertIn("cooldown_s", self.src)
def test_unknown_distance_param(self):
self.assertIn("unknown_distance", self.src)
def test_json_output(self):
self.assertIn("json", self.src)
def test_face_id_in_payload(self):
self.assertIn("face_id", self.src)
def test_person_name_in_payload(self):
self.assertIn("person_name", self.src)
def test_distance_in_payload(self):
self.assertIn("distance_m", self.src)
def test_main_defined(self):
self.assertIn("def main", self.src)
def test_threading_lock(self):
self.assertIn("threading.Lock", self.src)
class TestConfig(unittest.TestCase):
"""Checks on config/launch/setup files."""
_CONFIG = (
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
"saltybot_social/config/greeting_trigger_params.yaml"
)
_LAUNCH = (
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
"saltybot_social/launch/greeting_trigger.launch.py"
)
_SETUP = (
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
"saltybot_social/setup.py"
)
def test_config_exists(self):
import os
self.assertTrue(os.path.exists(self._CONFIG))
def test_config_proximity(self):
with open(self._CONFIG) as f:
content = f.read()
self.assertIn("proximity_m", content)
def test_config_cooldown(self):
with open(self._CONFIG) as f:
content = f.read()
self.assertIn("cooldown_s", content)
def test_config_node_name(self):
with open(self._CONFIG) as f:
content = f.read()
self.assertIn("greeting_trigger_node", content)
def test_launch_exists(self):
import os
self.assertTrue(os.path.exists(self._LAUNCH))
def test_launch_proximity_arg(self):
with open(self._LAUNCH) as f:
content = f.read()
self.assertIn("proximity_m", content)
def test_launch_cooldown_arg(self):
with open(self._LAUNCH) as f:
content = f.read()
self.assertIn("cooldown_s", content)
def test_entry_point(self):
with open(self._SETUP) as f:
content = f.read()
self.assertIn("greeting_trigger_node", content)
if __name__ == "__main__":
unittest.main()