feat(social): proximity-based greeting trigger — Issue #270
Some checks failed
social-bot integration tests / Lint (flake8 + pep257) (pull_request) Failing after 12s
social-bot integration tests / Core integration tests (mock sensors, no GPU) (pull_request) Has been skipped
social-bot integration tests / Latency profiling (GPU, Orin) (pull_request) Has been cancelled
Some checks failed
social-bot integration tests / Lint (flake8 + pep257) (pull_request) Failing after 12s
social-bot integration tests / Core integration tests (mock sensors, no GPU) (pull_request) Has been skipped
social-bot integration tests / Latency profiling (GPU, Orin) (pull_request) Has been cancelled
Adds greeting_trigger_node to saltybot_social:
- Subscribes to /social/faces/detected (FaceDetectionArray) for face arrivals
- Subscribes to /social/person_states (PersonStateArray) to cache face_id→distance
- Fires greeting when face_id is within proximity_m (default 2m) and
not in per-face_id cooldown window (default 300s)
- Publishes JSON on /saltybot/greeting_trigger:
{face_id, person_name, distance_m, ts}
- unknown_distance param controls assumed distance for faces with no PersonState yet
- Thread-safe distance cache and greeted map
- 50/50 tests passing
Closes #270
This commit is contained in:
parent
01ee02f837
commit
c7dd07f9ed
@ -0,0 +1,8 @@
|
|||||||
|
greeting_trigger_node:
|
||||||
|
ros__parameters:
|
||||||
|
proximity_m: 2.0 # Trigger when person is within this distance (m)
|
||||||
|
cooldown_s: 300.0 # Re-greeting suppression window per face_id (s)
|
||||||
|
unknown_distance: 0.0 # Distance assumed when PersonState not yet available
|
||||||
|
# 0.0 → always greet faces with no state yet
|
||||||
|
faces_topic: "/social/faces/detected"
|
||||||
|
states_topic: "/social/person_states"
|
||||||
@ -0,0 +1,39 @@
|
|||||||
|
"""greeting_trigger.launch.py -- Launch proximity-based greeting trigger (Issue #270).
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
ros2 launch saltybot_social greeting_trigger.launch.py
|
||||||
|
ros2 launch saltybot_social greeting_trigger.launch.py proximity_m:=1.5 cooldown_s:=120.0
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
from ament_index_python.packages import get_package_share_directory
|
||||||
|
from launch import LaunchDescription
|
||||||
|
from launch.actions import DeclareLaunchArgument
|
||||||
|
from launch.substitutions import LaunchConfiguration
|
||||||
|
from launch_ros.actions import Node
|
||||||
|
|
||||||
|
|
||||||
|
def generate_launch_description():
|
||||||
|
pkg = get_package_share_directory("saltybot_social")
|
||||||
|
cfg = os.path.join(pkg, "config", "greeting_trigger_params.yaml")
|
||||||
|
|
||||||
|
return LaunchDescription([
|
||||||
|
DeclareLaunchArgument("proximity_m", default_value="2.0",
|
||||||
|
description="Greeting proximity threshold (m)"),
|
||||||
|
DeclareLaunchArgument("cooldown_s", default_value="300.0",
|
||||||
|
description="Per-face_id re-greeting cooldown (s)"),
|
||||||
|
|
||||||
|
Node(
|
||||||
|
package="saltybot_social",
|
||||||
|
executable="greeting_trigger_node",
|
||||||
|
name="greeting_trigger_node",
|
||||||
|
output="screen",
|
||||||
|
parameters=[
|
||||||
|
cfg,
|
||||||
|
{
|
||||||
|
"proximity_m": LaunchConfiguration("proximity_m"),
|
||||||
|
"cooldown_s": LaunchConfiguration("cooldown_s"),
|
||||||
|
},
|
||||||
|
],
|
||||||
|
),
|
||||||
|
])
|
||||||
@ -0,0 +1,150 @@
|
|||||||
|
"""greeting_trigger_node.py -- Proximity-based greeting trigger.
|
||||||
|
Issue #270
|
||||||
|
|
||||||
|
Monitors face detections and person states. When a new face_id is seen
|
||||||
|
within ``proximity_m`` metres (default 2 m) and has not been greeted within
|
||||||
|
``cooldown_s`` seconds, publishes a JSON greeting trigger on
|
||||||
|
/saltybot/greeting_trigger.
|
||||||
|
|
||||||
|
Distance is looked up from the /social/person_states topic which carries a
|
||||||
|
face_id → distance mapping. When no state is available for a face the node
|
||||||
|
applies a configurable default distance so it can still fire on face-only
|
||||||
|
pipelines.
|
||||||
|
|
||||||
|
Subscriptions:
|
||||||
|
/social/faces/detected saltybot_social_msgs/FaceDetectionArray
|
||||||
|
/social/person_states saltybot_social_msgs/PersonStateArray
|
||||||
|
|
||||||
|
Publication:
|
||||||
|
/saltybot/greeting_trigger std_msgs/String (JSON)
|
||||||
|
{"face_id": <int>, "person_name": <str>, "distance_m": <float>,
|
||||||
|
"ts": <float unix epoch>}
|
||||||
|
|
||||||
|
Parameters:
|
||||||
|
proximity_m (float, 2.0) -- trigger when distance <= this
|
||||||
|
cooldown_s (float, 300.0) -- suppress re-greeting same face_id
|
||||||
|
unknown_distance (float, 0.0) -- distance assumed when PersonState
|
||||||
|
is not yet available (0.0 → always
|
||||||
|
trigger for unknown faces)
|
||||||
|
faces_topic (str, "/social/faces/detected")
|
||||||
|
states_topic (str, "/social/person_states")
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
from typing import Dict
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from rclpy.qos import QoSProfile
|
||||||
|
from std_msgs.msg import String
|
||||||
|
|
||||||
|
# Custom messages — imported at runtime so offline tests can stub them
|
||||||
|
try:
|
||||||
|
from saltybot_social_msgs.msg import FaceDetectionArray, PersonStateArray
|
||||||
|
_MSGS = True
|
||||||
|
except ImportError:
|
||||||
|
_MSGS = False
|
||||||
|
|
||||||
|
|
||||||
|
class GreetingTriggerNode(Node):
|
||||||
|
"""Publishes greeting trigger when a person enters proximity."""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
super().__init__("greeting_trigger_node")
|
||||||
|
|
||||||
|
self.declare_parameter("proximity_m", 2.0)
|
||||||
|
self.declare_parameter("cooldown_s", 300.0)
|
||||||
|
self.declare_parameter("unknown_distance", 0.0)
|
||||||
|
self.declare_parameter("faces_topic", "/social/faces/detected")
|
||||||
|
self.declare_parameter("states_topic", "/social/person_states")
|
||||||
|
|
||||||
|
self._proximity = self.get_parameter("proximity_m").value
|
||||||
|
self._cooldown = self.get_parameter("cooldown_s").value
|
||||||
|
self._unknown_dist = self.get_parameter("unknown_distance").value
|
||||||
|
faces_topic = self.get_parameter("faces_topic").value
|
||||||
|
states_topic = self.get_parameter("states_topic").value
|
||||||
|
|
||||||
|
# face_id → last known distance (m); updated from PersonStateArray
|
||||||
|
self._distance_cache: Dict[int, float] = {}
|
||||||
|
# face_id → unix timestamp of last greeting
|
||||||
|
self._last_greeted: Dict[int, float] = {}
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
|
||||||
|
qos = QoSProfile(depth=10)
|
||||||
|
self._pub = self.create_publisher(String, "/saltybot/greeting_trigger", qos)
|
||||||
|
|
||||||
|
if _MSGS:
|
||||||
|
self._states_sub = self.create_subscription(
|
||||||
|
PersonStateArray, states_topic, self._on_person_states, qos
|
||||||
|
)
|
||||||
|
self._faces_sub = self.create_subscription(
|
||||||
|
FaceDetectionArray, faces_topic, self._on_faces, qos
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
self.get_logger().warn(
|
||||||
|
"saltybot_social_msgs not available — node is passive (no subscriptions)"
|
||||||
|
)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"GreetingTriggerNode ready "
|
||||||
|
f"(proximity={self._proximity}m, cooldown={self._cooldown}s)"
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Callbacks ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _on_person_states(self, msg: "PersonStateArray") -> None:
|
||||||
|
"""Cache face_id → distance from incoming PersonState array."""
|
||||||
|
with self._lock:
|
||||||
|
for ps in msg.persons:
|
||||||
|
if ps.face_id >= 0:
|
||||||
|
self._distance_cache[ps.face_id] = float(ps.distance)
|
||||||
|
|
||||||
|
def _on_faces(self, msg: "FaceDetectionArray") -> None:
|
||||||
|
"""Evaluate each detected face; fire greeting if conditions met."""
|
||||||
|
now = time.monotonic()
|
||||||
|
with self._lock:
|
||||||
|
for face in msg.faces:
|
||||||
|
fid = int(face.face_id)
|
||||||
|
dist = self._distance_cache.get(fid, self._unknown_dist)
|
||||||
|
|
||||||
|
if dist > self._proximity:
|
||||||
|
continue # too far
|
||||||
|
|
||||||
|
last = self._last_greeted.get(fid, 0.0)
|
||||||
|
if now - last < self._cooldown:
|
||||||
|
continue # still in cooldown
|
||||||
|
|
||||||
|
# Fire!
|
||||||
|
self._last_greeted[fid] = now
|
||||||
|
self._fire(fid, str(face.person_name), dist)
|
||||||
|
|
||||||
|
def _fire(self, face_id: int, person_name: str, distance_m: float) -> None:
|
||||||
|
payload = {
|
||||||
|
"face_id": face_id,
|
||||||
|
"person_name": person_name,
|
||||||
|
"distance_m": round(distance_m, 3),
|
||||||
|
"ts": time.time(),
|
||||||
|
}
|
||||||
|
msg = String()
|
||||||
|
msg.data = json.dumps(payload)
|
||||||
|
self._pub.publish(msg)
|
||||||
|
self.get_logger().info(
|
||||||
|
f"Greeting trigger: face_id={face_id} name={person_name!r} "
|
||||||
|
f"dist={distance_m:.2f}m"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None) -> None:
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = GreetingTriggerNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
@ -47,6 +47,8 @@ setup(
|
|||||||
'vad_node = saltybot_social.vad_node:main',
|
'vad_node = saltybot_social.vad_node:main',
|
||||||
# Ambient sound classifier — mel-spectrogram (Issue #252)
|
# Ambient sound classifier — mel-spectrogram (Issue #252)
|
||||||
'ambient_sound_node = saltybot_social.ambient_sound_node:main',
|
'ambient_sound_node = saltybot_social.ambient_sound_node:main',
|
||||||
|
# Proximity-based greeting trigger (Issue #270)
|
||||||
|
'greeting_trigger_node = saltybot_social.greeting_trigger_node:main',
|
||||||
],
|
],
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|||||||
472
jetson/ros2_ws/src/saltybot_social/test/test_greeting_trigger.py
Normal file
472
jetson/ros2_ws/src/saltybot_social/test/test_greeting_trigger.py
Normal file
@ -0,0 +1,472 @@
|
|||||||
|
"""test_greeting_trigger.py -- Offline tests for greeting_trigger_node (Issue #270).
|
||||||
|
|
||||||
|
Stubs out rclpy and saltybot_social_msgs so tests run without a ROS install.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import importlib
|
||||||
|
import json
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import types
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
|
||||||
|
# ── ROS2 / message stubs ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _make_ros_stubs():
|
||||||
|
"""Install minimal stubs for rclpy and message packages."""
|
||||||
|
for mod_name in ("rclpy", "rclpy.node", "rclpy.qos",
|
||||||
|
"std_msgs", "std_msgs.msg",
|
||||||
|
"saltybot_social_msgs", "saltybot_social_msgs.msg"):
|
||||||
|
sys.modules[mod_name] = types.ModuleType(mod_name)
|
||||||
|
|
||||||
|
class _Node:
|
||||||
|
def __init__(self, name):
|
||||||
|
self._name = name
|
||||||
|
# Preserve _params if pre-set by _make_node (super().__init__() is
|
||||||
|
# called from GreetingTriggerNode.__init__, so don't reset here)
|
||||||
|
if not hasattr(self, '_params'):
|
||||||
|
self._params = {}
|
||||||
|
self._pubs = {}
|
||||||
|
self._subs = {}
|
||||||
|
self._logs = []
|
||||||
|
|
||||||
|
def declare_parameter(self, name, default):
|
||||||
|
# Don't overwrite values pre-set by _make_node
|
||||||
|
if name not in self._params:
|
||||||
|
self._params[name] = default
|
||||||
|
|
||||||
|
def get_parameter(self, name):
|
||||||
|
class _P:
|
||||||
|
def __init__(self, v):
|
||||||
|
self.value = v
|
||||||
|
return _P(self._params[name])
|
||||||
|
|
||||||
|
def create_publisher(self, msg_type, topic, qos):
|
||||||
|
pub = _FakePub()
|
||||||
|
self._pubs[topic] = pub
|
||||||
|
return pub
|
||||||
|
|
||||||
|
def create_subscription(self, msg_type, topic, cb, qos):
|
||||||
|
self._subs[topic] = cb
|
||||||
|
return object()
|
||||||
|
|
||||||
|
def get_logger(self):
|
||||||
|
node = self
|
||||||
|
class _L:
|
||||||
|
def info(self, m): node._logs.append(("INFO", m))
|
||||||
|
def warn(self, m): node._logs.append(("WARN", m))
|
||||||
|
def error(self, m): node._logs.append(("ERROR", m))
|
||||||
|
return _L()
|
||||||
|
|
||||||
|
def destroy_node(self): pass
|
||||||
|
|
||||||
|
class _FakePub:
|
||||||
|
def __init__(self):
|
||||||
|
self.msgs = []
|
||||||
|
def publish(self, msg):
|
||||||
|
self.msgs.append(msg)
|
||||||
|
|
||||||
|
class _QoSProfile:
|
||||||
|
def __init__(self, depth=10): self.depth = depth
|
||||||
|
|
||||||
|
class _String:
|
||||||
|
def __init__(self): self.data = ""
|
||||||
|
|
||||||
|
# rclpy
|
||||||
|
rclpy_mod = sys.modules["rclpy"]
|
||||||
|
rclpy_mod.init = lambda args=None: None
|
||||||
|
rclpy_mod.spin = lambda node: None
|
||||||
|
rclpy_mod.shutdown = lambda: None
|
||||||
|
|
||||||
|
# rclpy.node
|
||||||
|
sys.modules["rclpy.node"].Node = _Node
|
||||||
|
|
||||||
|
# rclpy.qos
|
||||||
|
sys.modules["rclpy.qos"].QoSProfile = _QoSProfile
|
||||||
|
|
||||||
|
# std_msgs.msg
|
||||||
|
sys.modules["std_msgs.msg"].String = _String
|
||||||
|
|
||||||
|
# saltybot_social_msgs.msg (FaceDetectionArray + PersonStateArray)
|
||||||
|
class _FaceDetection:
|
||||||
|
def __init__(self, face_id=0, person_name="", confidence=1.0):
|
||||||
|
self.face_id = face_id
|
||||||
|
self.person_name = person_name
|
||||||
|
self.confidence = confidence
|
||||||
|
|
||||||
|
class _FaceDetectionArray:
|
||||||
|
def __init__(self, faces=None):
|
||||||
|
self.faces = faces or []
|
||||||
|
|
||||||
|
class _PersonState:
|
||||||
|
def __init__(self, face_id=0, distance=0.0):
|
||||||
|
self.face_id = face_id
|
||||||
|
self.distance = distance
|
||||||
|
|
||||||
|
class _PersonStateArray:
|
||||||
|
def __init__(self, persons=None):
|
||||||
|
self.persons = persons or []
|
||||||
|
|
||||||
|
msgs = sys.modules["saltybot_social_msgs.msg"]
|
||||||
|
msgs.FaceDetection = _FaceDetection
|
||||||
|
msgs.FaceDetectionArray = _FaceDetectionArray
|
||||||
|
msgs.PersonState = _PersonState
|
||||||
|
msgs.PersonStateArray = _PersonStateArray
|
||||||
|
|
||||||
|
return _Node, _FakePub, _QoSProfile, _String, _FaceDetection, _FaceDetectionArray, _PersonState, _PersonStateArray
|
||||||
|
|
||||||
|
|
||||||
|
_Node, _FakePub, _QoSProfile, _String, _FaceDetection, _FaceDetectionArray, _PersonState, _PersonStateArray = _make_ros_stubs()
|
||||||
|
|
||||||
|
|
||||||
|
# ── Load module under test ────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
_SRC = (
|
||||||
|
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
|
||||||
|
"saltybot_social/saltybot_social/greeting_trigger_node.py"
|
||||||
|
)
|
||||||
|
|
||||||
|
def _load_mod():
|
||||||
|
spec = importlib.util.spec_from_file_location("greeting_trigger_node_testmod", _SRC)
|
||||||
|
mod = importlib.util.module_from_spec(spec)
|
||||||
|
spec.loader.exec_module(mod)
|
||||||
|
return mod
|
||||||
|
|
||||||
|
|
||||||
|
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _make_node(mod, **kwargs):
|
||||||
|
"""Instantiate GreetingTriggerNode with overridden parameters."""
|
||||||
|
node = mod.GreetingTriggerNode.__new__(mod.GreetingTriggerNode)
|
||||||
|
|
||||||
|
# Pre-populate _params BEFORE __init__ so super().__init__() (which calls
|
||||||
|
# _Node.__init__) sees them and skips reset due to hasattr guard.
|
||||||
|
defaults = {
|
||||||
|
"proximity_m": 2.0,
|
||||||
|
"cooldown_s": 300.0,
|
||||||
|
"unknown_distance": 0.0,
|
||||||
|
"faces_topic": "/social/faces/detected",
|
||||||
|
"states_topic": "/social/person_states",
|
||||||
|
}
|
||||||
|
defaults.update(kwargs)
|
||||||
|
node._params = dict(defaults)
|
||||||
|
|
||||||
|
mod.GreetingTriggerNode.__init__(node)
|
||||||
|
return node
|
||||||
|
|
||||||
|
|
||||||
|
def _face_msg(faces):
|
||||||
|
return _FaceDetectionArray(faces=faces)
|
||||||
|
|
||||||
|
def _state_msg(persons):
|
||||||
|
return _PersonStateArray(persons=persons)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Test suites ───────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestNodeInit(unittest.TestCase):
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls):
|
||||||
|
cls.mod = _load_mod()
|
||||||
|
|
||||||
|
def test_imports_cleanly(self):
|
||||||
|
self.assertTrue(hasattr(self.mod, "GreetingTriggerNode"))
|
||||||
|
|
||||||
|
def test_default_proximity(self):
|
||||||
|
node = _make_node(self.mod)
|
||||||
|
self.assertEqual(node._proximity, 2.0)
|
||||||
|
|
||||||
|
def test_default_cooldown(self):
|
||||||
|
node = _make_node(self.mod)
|
||||||
|
self.assertEqual(node._cooldown, 300.0)
|
||||||
|
|
||||||
|
def test_default_unknown_distance(self):
|
||||||
|
node = _make_node(self.mod)
|
||||||
|
self.assertEqual(node._unknown_dist, 0.0)
|
||||||
|
|
||||||
|
def test_pub_topic(self):
|
||||||
|
node = _make_node(self.mod)
|
||||||
|
self.assertIn("/saltybot/greeting_trigger", node._pubs)
|
||||||
|
|
||||||
|
def test_subs_registered(self):
|
||||||
|
node = _make_node(self.mod)
|
||||||
|
self.assertIn("/social/faces/detected", node._subs)
|
||||||
|
self.assertIn("/social/person_states", node._subs)
|
||||||
|
|
||||||
|
def test_initial_caches_empty(self):
|
||||||
|
node = _make_node(self.mod)
|
||||||
|
self.assertEqual(node._distance_cache, {})
|
||||||
|
self.assertEqual(node._last_greeted, {})
|
||||||
|
|
||||||
|
|
||||||
|
class TestDistanceCache(unittest.TestCase):
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls):
|
||||||
|
cls.mod = _load_mod()
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.node = _make_node(self.mod)
|
||||||
|
|
||||||
|
def test_state_updates_cache(self):
|
||||||
|
ps = _PersonState(face_id=1, distance=1.5)
|
||||||
|
self.node._on_person_states(_state_msg([ps]))
|
||||||
|
self.assertAlmostEqual(self.node._distance_cache[1], 1.5)
|
||||||
|
|
||||||
|
def test_multiple_states_cached(self):
|
||||||
|
persons = [_PersonState(face_id=i, distance=float(i)) for i in range(5)]
|
||||||
|
self.node._on_person_states(_state_msg(persons))
|
||||||
|
for i in range(5):
|
||||||
|
self.assertAlmostEqual(self.node._distance_cache[i], float(i))
|
||||||
|
|
||||||
|
def test_state_update_overwrites(self):
|
||||||
|
self.node._on_person_states(_state_msg([_PersonState(face_id=1, distance=3.0)]))
|
||||||
|
self.node._on_person_states(_state_msg([_PersonState(face_id=1, distance=1.0)]))
|
||||||
|
self.assertAlmostEqual(self.node._distance_cache[1], 1.0)
|
||||||
|
|
||||||
|
def test_negative_face_id_ignored(self):
|
||||||
|
self.node._on_person_states(_state_msg([_PersonState(face_id=-1, distance=1.0)]))
|
||||||
|
self.assertNotIn(-1, self.node._distance_cache)
|
||||||
|
|
||||||
|
def test_zero_distance_cached(self):
|
||||||
|
self.node._on_person_states(_state_msg([_PersonState(face_id=5, distance=0.0)]))
|
||||||
|
self.assertAlmostEqual(self.node._distance_cache[5], 0.0)
|
||||||
|
|
||||||
|
|
||||||
|
class TestGreetingTrigger(unittest.TestCase):
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls):
|
||||||
|
cls.mod = _load_mod()
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.node = _make_node(self.mod, proximity_m=2.0, cooldown_s=300.0)
|
||||||
|
self.pub = self.node._pubs["/saltybot/greeting_trigger"]
|
||||||
|
|
||||||
|
def _inject_distance(self, face_id, distance):
|
||||||
|
self.node._on_person_states(_state_msg([_PersonState(face_id=face_id, distance=distance)]))
|
||||||
|
|
||||||
|
def test_triggers_within_proximity(self):
|
||||||
|
self._inject_distance(1, 1.5)
|
||||||
|
self.node._on_faces(_face_msg([_FaceDetection(face_id=1, person_name="alice")]))
|
||||||
|
self.assertEqual(len(self.pub.msgs), 1)
|
||||||
|
|
||||||
|
def test_no_trigger_beyond_proximity(self):
|
||||||
|
self._inject_distance(2, 3.0)
|
||||||
|
self.node._on_faces(_face_msg([_FaceDetection(face_id=2, person_name="bob")]))
|
||||||
|
self.assertEqual(len(self.pub.msgs), 0)
|
||||||
|
|
||||||
|
def test_trigger_at_exact_proximity(self):
|
||||||
|
self._inject_distance(3, 2.0)
|
||||||
|
self.node._on_faces(_face_msg([_FaceDetection(face_id=3, person_name="carol")]))
|
||||||
|
self.assertEqual(len(self.pub.msgs), 1)
|
||||||
|
|
||||||
|
def test_no_trigger_just_beyond(self):
|
||||||
|
self._inject_distance(4, 2.001)
|
||||||
|
self.node._on_faces(_face_msg([_FaceDetection(face_id=4, person_name="dave")]))
|
||||||
|
self.assertEqual(len(self.pub.msgs), 0)
|
||||||
|
|
||||||
|
def test_cooldown_suppresses_retrigger(self):
|
||||||
|
self._inject_distance(5, 1.0)
|
||||||
|
face = _FaceDetection(face_id=5, person_name="eve")
|
||||||
|
self.node._on_faces(_face_msg([face]))
|
||||||
|
self.node._on_faces(_face_msg([face])) # second call in cooldown
|
||||||
|
self.assertEqual(len(self.pub.msgs), 1)
|
||||||
|
|
||||||
|
def test_cooldown_per_face_id(self):
|
||||||
|
self._inject_distance(6, 1.0)
|
||||||
|
self._inject_distance(7, 1.0)
|
||||||
|
self.node._on_faces(_face_msg([_FaceDetection(face_id=6, person_name="f")]))
|
||||||
|
self.node._on_faces(_face_msg([_FaceDetection(face_id=7, person_name="g")]))
|
||||||
|
self.assertEqual(len(self.pub.msgs), 2)
|
||||||
|
|
||||||
|
def test_expired_cooldown_retrigers(self):
|
||||||
|
self._inject_distance(8, 1.0)
|
||||||
|
face = _FaceDetection(face_id=8, person_name="hank")
|
||||||
|
self.node._on_faces(_face_msg([face]))
|
||||||
|
# Manually expire the cooldown
|
||||||
|
self.node._last_greeted[8] = time.monotonic() - 400.0
|
||||||
|
self.node._on_faces(_face_msg([face]))
|
||||||
|
self.assertEqual(len(self.pub.msgs), 2)
|
||||||
|
|
||||||
|
def test_unknown_face_uses_unknown_distance(self):
|
||||||
|
# unknown_distance=0.0 → should trigger (0.0 <= 2.0)
|
||||||
|
node = _make_node(self.mod, unknown_distance=0.0)
|
||||||
|
pub = node._pubs["/saltybot/greeting_trigger"]
|
||||||
|
node._on_faces(_face_msg([_FaceDetection(face_id=99, person_name="stranger")]))
|
||||||
|
self.assertEqual(len(pub.msgs), 1)
|
||||||
|
|
||||||
|
def test_unknown_face_large_distance_no_trigger(self):
|
||||||
|
# unknown_distance=10.0 → should NOT trigger
|
||||||
|
node = _make_node(self.mod, unknown_distance=10.0)
|
||||||
|
pub = node._pubs["/saltybot/greeting_trigger"]
|
||||||
|
node._on_faces(_face_msg([_FaceDetection(face_id=100, person_name="far")]))
|
||||||
|
self.assertEqual(len(pub.msgs), 0)
|
||||||
|
|
||||||
|
def test_multiple_faces_triggers_each_within_range(self):
|
||||||
|
self._inject_distance(10, 1.0)
|
||||||
|
self._inject_distance(11, 3.0) # out of range
|
||||||
|
faces = [
|
||||||
|
_FaceDetection(face_id=10, person_name="near"),
|
||||||
|
_FaceDetection(face_id=11, person_name="far"),
|
||||||
|
]
|
||||||
|
self.node._on_faces(_face_msg(faces))
|
||||||
|
self.assertEqual(len(self.pub.msgs), 1)
|
||||||
|
|
||||||
|
def test_empty_face_array_no_trigger(self):
|
||||||
|
self.node._on_faces(_face_msg([]))
|
||||||
|
self.assertEqual(len(self.pub.msgs), 0)
|
||||||
|
|
||||||
|
|
||||||
|
class TestPayload(unittest.TestCase):
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls):
|
||||||
|
cls.mod = _load_mod()
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.node = _make_node(self.mod)
|
||||||
|
self.pub = self.node._pubs["/saltybot/greeting_trigger"]
|
||||||
|
|
||||||
|
def _trigger(self, face_id=1, person_name="alice", distance=1.5):
|
||||||
|
self.node._on_person_states(_state_msg([_PersonState(face_id=face_id, distance=distance)]))
|
||||||
|
self.node._on_faces(_face_msg([_FaceDetection(face_id=face_id, person_name=person_name)]))
|
||||||
|
|
||||||
|
def test_payload_is_json(self):
|
||||||
|
self._trigger()
|
||||||
|
payload = json.loads(self.pub.msgs[0].data)
|
||||||
|
self.assertIsInstance(payload, dict)
|
||||||
|
|
||||||
|
def test_payload_face_id(self):
|
||||||
|
self._trigger(face_id=42)
|
||||||
|
payload = json.loads(self.pub.msgs[0].data)
|
||||||
|
self.assertEqual(payload["face_id"], 42)
|
||||||
|
|
||||||
|
def test_payload_person_name(self):
|
||||||
|
self._trigger(person_name="zara")
|
||||||
|
payload = json.loads(self.pub.msgs[0].data)
|
||||||
|
self.assertEqual(payload["person_name"], "zara")
|
||||||
|
|
||||||
|
def test_payload_distance(self):
|
||||||
|
self._trigger(distance=1.234)
|
||||||
|
payload = json.loads(self.pub.msgs[0].data)
|
||||||
|
self.assertAlmostEqual(payload["distance_m"], 1.234, places=2)
|
||||||
|
|
||||||
|
def test_payload_has_ts(self):
|
||||||
|
self._trigger()
|
||||||
|
payload = json.loads(self.pub.msgs[0].data)
|
||||||
|
self.assertIn("ts", payload)
|
||||||
|
self.assertIsInstance(payload["ts"], float)
|
||||||
|
|
||||||
|
def test_ts_is_recent(self):
|
||||||
|
before = time.time()
|
||||||
|
self._trigger()
|
||||||
|
after = time.time()
|
||||||
|
payload = json.loads(self.pub.msgs[0].data)
|
||||||
|
self.assertGreaterEqual(payload["ts"], before)
|
||||||
|
self.assertLessEqual(payload["ts"], after + 1.0)
|
||||||
|
|
||||||
|
|
||||||
|
class TestNodeSrc(unittest.TestCase):
|
||||||
|
"""Source-level checks — verify node structure without instantiation."""
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls):
|
||||||
|
with open(_SRC) as f:
|
||||||
|
cls.src = f.read()
|
||||||
|
|
||||||
|
def test_issue_tag(self):
|
||||||
|
self.assertIn("#270", self.src)
|
||||||
|
|
||||||
|
def test_pub_topic(self):
|
||||||
|
self.assertIn("/saltybot/greeting_trigger", self.src)
|
||||||
|
|
||||||
|
def test_faces_topic(self):
|
||||||
|
self.assertIn("/social/faces/detected", self.src)
|
||||||
|
|
||||||
|
def test_states_topic(self):
|
||||||
|
self.assertIn("/social/person_states", self.src)
|
||||||
|
|
||||||
|
def test_proximity_param(self):
|
||||||
|
self.assertIn("proximity_m", self.src)
|
||||||
|
|
||||||
|
def test_cooldown_param(self):
|
||||||
|
self.assertIn("cooldown_s", self.src)
|
||||||
|
|
||||||
|
def test_unknown_distance_param(self):
|
||||||
|
self.assertIn("unknown_distance", self.src)
|
||||||
|
|
||||||
|
def test_json_output(self):
|
||||||
|
self.assertIn("json", self.src)
|
||||||
|
|
||||||
|
def test_face_id_in_payload(self):
|
||||||
|
self.assertIn("face_id", self.src)
|
||||||
|
|
||||||
|
def test_person_name_in_payload(self):
|
||||||
|
self.assertIn("person_name", self.src)
|
||||||
|
|
||||||
|
def test_distance_in_payload(self):
|
||||||
|
self.assertIn("distance_m", self.src)
|
||||||
|
|
||||||
|
def test_main_defined(self):
|
||||||
|
self.assertIn("def main", self.src)
|
||||||
|
|
||||||
|
def test_threading_lock(self):
|
||||||
|
self.assertIn("threading.Lock", self.src)
|
||||||
|
|
||||||
|
|
||||||
|
class TestConfig(unittest.TestCase):
|
||||||
|
"""Checks on config/launch/setup files."""
|
||||||
|
|
||||||
|
_CONFIG = (
|
||||||
|
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
|
||||||
|
"saltybot_social/config/greeting_trigger_params.yaml"
|
||||||
|
)
|
||||||
|
_LAUNCH = (
|
||||||
|
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
|
||||||
|
"saltybot_social/launch/greeting_trigger.launch.py"
|
||||||
|
)
|
||||||
|
_SETUP = (
|
||||||
|
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
|
||||||
|
"saltybot_social/setup.py"
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_config_exists(self):
|
||||||
|
import os
|
||||||
|
self.assertTrue(os.path.exists(self._CONFIG))
|
||||||
|
|
||||||
|
def test_config_proximity(self):
|
||||||
|
with open(self._CONFIG) as f:
|
||||||
|
content = f.read()
|
||||||
|
self.assertIn("proximity_m", content)
|
||||||
|
|
||||||
|
def test_config_cooldown(self):
|
||||||
|
with open(self._CONFIG) as f:
|
||||||
|
content = f.read()
|
||||||
|
self.assertIn("cooldown_s", content)
|
||||||
|
|
||||||
|
def test_config_node_name(self):
|
||||||
|
with open(self._CONFIG) as f:
|
||||||
|
content = f.read()
|
||||||
|
self.assertIn("greeting_trigger_node", content)
|
||||||
|
|
||||||
|
def test_launch_exists(self):
|
||||||
|
import os
|
||||||
|
self.assertTrue(os.path.exists(self._LAUNCH))
|
||||||
|
|
||||||
|
def test_launch_proximity_arg(self):
|
||||||
|
with open(self._LAUNCH) as f:
|
||||||
|
content = f.read()
|
||||||
|
self.assertIn("proximity_m", content)
|
||||||
|
|
||||||
|
def test_launch_cooldown_arg(self):
|
||||||
|
with open(self._LAUNCH) as f:
|
||||||
|
content = f.read()
|
||||||
|
self.assertIn("cooldown_s", content)
|
||||||
|
|
||||||
|
def test_entry_point(self):
|
||||||
|
with open(self._SETUP) as f:
|
||||||
|
content = f.read()
|
||||||
|
self.assertIn("greeting_trigger_node", content)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
Loading…
x
Reference in New Issue
Block a user