Merge pull request 'feat(social): robot mesh comms — peer announce + person handoff (Issue #171)' (#202) from sl-jetson/issue-171-mesh-comms into main
Some checks failed
social-bot integration tests / Lint (flake8 + pep257) (push) Failing after 2s
social-bot integration tests / Core integration tests (mock sensors, no GPU) (push) Has been skipped
social-bot integration tests / Latency profiling (GPU, Orin) (push) Has been cancelled

This commit is contained in:
sl-jetson 2026-03-02 11:17:03 -05:00
commit bfd58fc98c
8 changed files with 626 additions and 0 deletions

View File

@ -0,0 +1,8 @@
mesh_comms_node:
ros__parameters:
robot_id: "saltybot_1" # Set to $ROBOT_ID; override per-robot
namespace: "" # "" = default namespace; e.g. "/saltybot_1"
announce_rate_hz: 1.0 # Heartbeat rate (Hz) — keep <= 2 Hz on mesh
peer_timeout_s: 5.0 # Drop peer after this many seconds without announce
handoff_distance_m: 3.0 # (guard) min distance for handoff trigger (m)
greeted_memory_s: 300.0 # Forget a greeted name after N seconds

View File

@ -0,0 +1,51 @@
"""mesh.launch.py — Launch the social mesh comms node (Issue #171).
Usage:
ros2 launch saltybot_social mesh.launch.py
ros2 launch saltybot_social mesh.launch.py robot_id:=saltybot_2
"""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
pkg = get_package_share_directory("saltybot_social")
cfg = os.path.join(pkg, "config", "mesh_params.yaml")
return LaunchDescription([
DeclareLaunchArgument(
"robot_id",
default_value=os.environ.get("ROBOT_ID", "saltybot_1"),
description="Unique robot identifier for mesh announcements",
),
DeclareLaunchArgument(
"announce_rate_hz",
default_value="1.0",
description="Heartbeat rate (Hz)",
),
DeclareLaunchArgument(
"peer_timeout_s",
default_value="5.0",
description="Seconds before a peer is considered lost",
),
Node(
package="saltybot_social",
executable="mesh_comms_node",
name="mesh_comms_node",
output="screen",
parameters=[
cfg,
{
"robot_id": LaunchConfiguration("robot_id"),
"announce_rate_hz": LaunchConfiguration("announce_rate_hz"),
"peer_timeout_s": LaunchConfiguration("peer_timeout_s"),
},
],
),
])

View File

@ -0,0 +1,237 @@
"""mesh_comms_node.py — Robot-to-robot social mesh communication.
Issue #171
Topics:
Publish: /social/mesh/announce (MeshPeer, 1 Hz) presence heartbeat
/social/mesh/handoff (MeshHandoff) person context on LEAVING
/social/mesh/peers (String / JSON) peer list for UI
/social/mesh/greeted (String / JSON) mesh-wide greeted names
Subscribe: /social/mesh/announce (MeshPeer) remote peer announcements
/social/mesh/handoff (MeshHandoff) incoming handoffs
/social/person_states (PersonStateArray)
/social/orchestrator/state (String / JSON)
Parameters:
robot_id (str, $ROBOT_ID or "saltybot_1") unique robot identifier
namespace (str, $ROS_NAMESPACE or "") ROS2 namespace
announce_rate_hz (float, 1.0) heartbeat rate
peer_timeout_s (float, 5.0) drop peer after N seconds without announce
handoff_distance_m (float, 3.0) max distance to trigger handoff (unused guard)
greeted_memory_s (float, 300.0) forget a greeted name after N seconds
"""
from __future__ import annotations
import json
import os
import threading
import time
from typing import Dict, Set
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, DurabilityPolicy
from std_msgs.msg import String
from saltybot_social_msgs.msg import PersonStateArray, MeshPeer, MeshHandoff
_LEAVING = 4 # PersonState.STATE_LEAVING
_ENGAGED = 2 # PersonState.STATE_ENGAGED
_TALKING = 3 # PersonState.STATE_TALKING
class MeshCommsNode(Node):
"""Robot mesh comms — peer announce, person handoff, greeting deduplication."""
def __init__(self) -> None:
super().__init__("mesh_comms_node")
self.declare_parameter("robot_id", os.environ.get("ROBOT_ID", "saltybot_1"))
self.declare_parameter("namespace", os.environ.get("ROS_NAMESPACE", ""))
self.declare_parameter("announce_rate_hz", 1.0)
self.declare_parameter("peer_timeout_s", 5.0)
self.declare_parameter("handoff_distance_m", 3.0)
self.declare_parameter("greeted_memory_s", 300.0)
self._robot_id = self.get_parameter("robot_id").value
self._ns = self.get_parameter("namespace").value
self._peer_timeout = self.get_parameter("peer_timeout_s").value
self._handoff_dist = self.get_parameter("handoff_distance_m").value
self._greeted_mem = self.get_parameter("greeted_memory_s").value
qos = QoSProfile(depth=10)
mesh_qos = QoSProfile(
depth=10,
reliability=ReliabilityPolicy.BEST_EFFORT,
durability=DurabilityPolicy.VOLATILE,
)
self._announce_pub = self.create_publisher(MeshPeer, "/social/mesh/announce", mesh_qos)
self._handoff_pub = self.create_publisher(MeshHandoff, "/social/mesh/handoff", qos)
self._peers_pub = self.create_publisher(String, "/social/mesh/peers", qos)
self._greeted_pub = self.create_publisher(String, "/social/mesh/greeted", qos)
self._announce_sub = self.create_subscription(
MeshPeer, "/social/mesh/announce", self._on_announce, mesh_qos)
self._handoff_sub = self.create_subscription(
MeshHandoff, "/social/mesh/handoff", self._on_handoff, qos)
self._persons_sub = self.create_subscription(
PersonStateArray, "/social/person_states", self._on_person_states, qos)
self._orch_sub = self.create_subscription(
String, "/social/orchestrator/state", self._on_orch_state, qos)
self._lock = threading.Lock()
self._peers: Dict[str, dict] = {} # robot_id → {last_seen, msg}
self._person_states: dict = {} # person_id → PersonState
self._social_state = "idle"
self._greeted: Dict[str, float] = {} # person_name → timestamp
self._handoff_sent: Set[str] = set() # str(person_id) already handed off
rate = self.get_parameter("announce_rate_hz").value
self._announce_timer = self.create_timer(1.0 / rate, self._publish_announce)
self._cleanup_timer = self.create_timer(2.0, self._cleanup)
self.get_logger().info(
f"MeshCommsNode ready (robot_id={self._robot_id}, "
f"peer_timeout={self._peer_timeout}s, handoff_dist={self._handoff_dist}m)"
)
# ── Subscriber callbacks ──────────────────────────────────────────────────
def _on_orch_state(self, msg: String) -> None:
try:
d = json.loads(msg.data)
with self._lock:
self._social_state = d.get("state", "idle")
except Exception:
pass
def _on_person_states(self, msg: PersonStateArray) -> None:
with self._lock:
new_states = {p.person_id: p for p in msg.persons}
for pid, ps in new_states.items():
prev = self._person_states.get(pid)
if (prev is not None
and prev.state != _LEAVING
and ps.state == _LEAVING
and str(pid) not in self._handoff_sent):
self._trigger_handoff(ps)
self._person_states = new_states
def _on_announce(self, msg: MeshPeer) -> None:
if msg.robot_id == self._robot_id:
return
with self._lock:
self._peers[msg.robot_id] = {"last_seen": time.time(), "msg": msg}
for name in msg.greeted_person_names:
if name and name not in self._greeted:
self._greeted[name] = time.time()
self._publish_greeted()
def _on_handoff(self, msg: MeshHandoff) -> None:
if msg.from_robot_id == self._robot_id:
return
self.get_logger().info(
f"Handoff received from [{msg.from_robot_id}]: "
f"person='{msg.person_name or msg.person_id}' "
f"lang={msg.language or '?'} "
f"bearing={msg.bearing_to_target_deg:.0f}° dist={msg.distance_m:.1f}m"
)
# ── Publish helpers ───────────────────────────────────────────────────────
def _trigger_handoff(self, ps) -> None:
"""Build and publish a MeshHandoff for a LEAVING person. Must hold self._lock."""
msg = MeshHandoff()
msg.header.stamp = self.get_clock().now().to_msg()
msg.from_robot_id = self._robot_id
msg.to_robot_id = ""
msg.person_id = str(ps.person_id)
msg.person_name = ps.person_name
msg.face_id = ps.face_id
msg.speaker_id = ps.speaker_id
msg.bearing_to_target_deg = ps.bearing_deg
msg.distance_m = ps.distance
msg.engagement_score = ps.engagement_score
msg.conversation_summary = ""
msg.last_utterance = ""
msg.last_response = ""
msg.language = ""
self._handoff_sent.add(str(ps.person_id))
self._handoff_pub.publish(msg)
self.get_logger().info(
f"Handoff → '{ps.person_name or ps.person_id}' "
f"bearing={ps.bearing_deg:.0f}° dist={ps.distance:.1f}m"
)
def _publish_announce(self) -> None:
with self._lock:
active_ids = [
pid for pid, ps in self._person_states.items()
if ps.state in (_ENGAGED, _TALKING)
]
greeted_names = [
name for name, t in self._greeted.items()
if time.time() - t < self._greeted_mem
]
state = self._social_state
msg = MeshPeer()
msg.header.stamp = self.get_clock().now().to_msg()
msg.robot_id = self._robot_id
msg.namespace = self._ns
msg.social_state = state
msg.active_person_ids = active_ids
msg.greeted_person_names = greeted_names
msg.battery_pct = -1.0
self._announce_pub.publish(msg)
self._publish_peers_status()
def _publish_peers_status(self) -> None:
with self._lock:
entries = [
{
"robot_id": rid,
"social_state": d["msg"].social_state,
"active_persons": len(d["msg"].active_person_ids),
"age_s": round(time.time() - d["last_seen"], 1),
}
for rid, d in self._peers.items()
]
msg = String()
msg.data = json.dumps({"peers": entries, "ts": time.time()})
self._peers_pub.publish(msg)
def _publish_greeted(self) -> None:
with self._lock:
names = [n for n, t in self._greeted.items()
if time.time() - t < self._greeted_mem]
msg = String()
msg.data = json.dumps({"greeted": names, "ts": time.time()})
self._greeted_pub.publish(msg)
# ── Cleanup ───────────────────────────────────────────────────────────────
def _cleanup(self) -> None:
now = time.time()
with self._lock:
dead = [rid for rid, d in self._peers.items()
if now - d["last_seen"] > self._peer_timeout]
for rid in dead:
self.get_logger().info(f"Peer lost: {rid}")
del self._peers[rid]
self._greeted = {n: t for n, t in self._greeted.items()
if now - t < self._greeted_mem}
active_pids = {str(p) for p in self._person_states}
self._handoff_sent &= active_pids
def main(args=None) -> None:
rclpy.init(args=args)
node = MeshCommsNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()

View File

@ -39,6 +39,8 @@ setup(
'gesture_node = saltybot_social.gesture_node:main',
# Facial expression recognition (Issue #161)
'emotion_node = saltybot_social.emotion_node:main',
# Robot mesh communication (Issue #171)
'mesh_comms_node = saltybot_social.mesh_comms_node:main',
],
},
)

View File

@ -0,0 +1,278 @@
"""test_mesh_comms.py -- Unit tests for Issue #171 robot mesh comms."""
from __future__ import annotations
import json, os, time
from typing import Dict
import pytest
def _pkg_root():
return os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
def _read_src(rel_path):
with open(os.path.join(_pkg_root(), rel_path)) as f:
return f.read()
# ── Message definitions ───────────────────────────────────────────────────────
class TestMeshPeerMsg:
@pytest.fixture(scope="class")
def src(self):
return _read_src("../saltybot_social_msgs/msg/MeshPeer.msg")
def test_robot_id(self, src): assert "string robot_id" in src
def test_namespace(self, src): assert "string namespace" in src
def test_social_state(self, src): assert "string social_state" in src
def test_active_ids(self, src): assert "int32[] active_person_ids" in src
def test_greeted(self, src): assert "string[] greeted_person_names" in src
def test_battery(self, src): assert "float32 battery_pct" in src
def test_header(self, src): assert "std_msgs/Header" in src
def test_issue_tag(self, src): assert "171" in src
class TestMeshHandoffMsg:
@pytest.fixture(scope="class")
def src(self):
return _read_src("../saltybot_social_msgs/msg/MeshHandoff.msg")
def test_from_robot(self, src): assert "from_robot_id" in src
def test_to_robot(self, src): assert "to_robot_id" in src
def test_person_id(self, src): assert "string person_id" in src
def test_person_name(self, src): assert "string person_name" in src
def test_face_id(self, src): assert "int32 face_id" in src
def test_speaker_id(self, src): assert "string speaker_id" in src
def test_bearing(self, src): assert "bearing_to_target_deg" in src
def test_distance(self, src): assert "distance_m" in src
def test_engagement(self, src): assert "engagement_score" in src
def test_summary(self, src): assert "conversation_summary" in src
def test_last_utterance(self, src):assert "last_utterance" in src
def test_last_response(self, src): assert "last_response" in src
def test_language(self, src): assert "string language" in src
def test_header(self, src): assert "std_msgs/Header" in src
# ── CMakeLists includes new msgs ──────────────────────────────────────────────
class TestCMakeLists:
@pytest.fixture(scope="class")
def src(self):
return _read_src("../saltybot_social_msgs/CMakeLists.txt")
def test_mesh_peer(self, src): assert '"msg/MeshPeer.msg"' in src
def test_mesh_handoff(self, src): assert '"msg/MeshHandoff.msg"' in src
def test_issue_tag(self, src): assert "Issue #171" in src
# ── Node source structure ─────────────────────────────────────────────────────
class TestMeshCommsNodeSrc:
@pytest.fixture(scope="class")
def src(self):
return _read_src("saltybot_social/mesh_comms_node.py")
def test_class_defined(self, src): assert "class MeshCommsNode" in src
def test_robot_id_param(self, src): assert '"robot_id"' in src
def test_announce_rate_param(self, src): assert '"announce_rate_hz"' in src
def test_peer_timeout_param(self, src): assert '"peer_timeout_s"' in src
def test_handoff_dist_param(self, src): assert '"handoff_distance_m"' in src
def test_greeted_memory_param(self, src): assert '"greeted_memory_s"' in src
def test_announce_pub(self, src): assert "/social/mesh/announce" in src
def test_handoff_pub(self, src): assert "/social/mesh/handoff" in src
def test_peers_pub(self, src): assert "/social/mesh/peers" in src
def test_greeted_pub(self, src): assert "/social/mesh/greeted" in src
def test_person_states_sub(self, src): assert "/social/person_states" in src
def test_orch_state_sub(self, src): assert "/social/orchestrator/state" in src
def test_on_announce(self, src): assert "_on_announce" in src
def test_on_handoff(self, src): assert "_on_handoff" in src
def test_on_person_states(self, src): assert "_on_person_states" in src
def test_trigger_handoff(self, src): assert "_trigger_handoff" in src
def test_publish_announce(self, src): assert "_publish_announce" in src
def test_cleanup(self, src): assert "_cleanup" in src
def test_leaving_constant(self, src): assert "_LEAVING" in src
def test_engaged_constant(self, src): assert "_ENGAGED" in src
def test_talking_constant(self, src): assert "_TALKING" in src
def test_main(self, src): assert "def main" in src
def test_no_self_announce(self, src):
assert "self._robot_id" in src # ignores own announcements
def test_mesh_qos_best_effort(self, src): assert "BEST_EFFORT" in src
def test_issue_tag(self, src): assert "171" in src
# ── setup.py entry point ──────────────────────────────────────────────────────
class TestSetupPy:
@pytest.fixture(scope="class")
def src(self):
return _read_src("setup.py")
def test_mesh_entry_point(self, src):
assert "mesh_comms_node = saltybot_social.mesh_comms_node:main" in src
# ── Config file ───────────────────────────────────────────────────────────────
class TestMeshConfig:
@pytest.fixture(scope="class")
def src(self):
return _read_src("config/mesh_params.yaml")
def test_node_name(self, src): assert "mesh_comms_node:" in src
def test_robot_id(self, src): assert "robot_id:" in src
def test_announce_rate(self, src): assert "announce_rate_hz:" in src
def test_peer_timeout(self, src): assert "peer_timeout_s:" in src
def test_handoff_dist(self, src): assert "handoff_distance_m:" in src
def test_greeted_memory(self, src): assert "greeted_memory_s:" in src
# ── Peer tracking logic (pure Python, no ROS) ────────────────────────────────
class TestPeerTracking:
def _make_peers(self):
return {}
def _add_peer(self, peers, rid, state, active, greeted):
peers[rid] = {
"last_seen": time.time(),
"msg": type("M", (), {
"social_state": state,
"active_person_ids": active,
"greeted_person_names": greeted,
})(),
}
def _cleanup(self, peers, timeout=5.0):
now = time.time()
return {rid: d for rid, d in peers.items()
if now - d["last_seen"] <= timeout}
def test_add_peer(self):
peers = self._make_peers()
self._add_peer(peers, "bot2", "idle", [], [])
assert "bot2" in peers
def test_ignore_self(self):
peers = self._make_peers()
robot_id = "saltybot_1"
incoming_id = "saltybot_1"
if incoming_id != robot_id:
self._add_peer(peers, incoming_id, "idle", [], [])
assert "saltybot_1" not in peers
def test_peer_replaced_on_update(self):
peers = self._make_peers()
self._add_peer(peers, "bot2", "idle", [], [])
self._add_peer(peers, "bot2", "speaking", [42], [])
assert peers["bot2"]["msg"].social_state == "speaking"
def test_cleanup_removes_stale(self):
peers = self._make_peers()
peers["bot3"] = {"last_seen": time.time() - 10, "msg": None}
self._add_peer(peers, "bot4", "idle", [], [])
cleaned = self._cleanup(peers, timeout=5.0)
assert "bot3" not in cleaned
assert "bot4" in cleaned
def test_multiple_peers(self):
peers = self._make_peers()
for i in range(5):
self._add_peer(peers, f"bot{i}", "idle", [], [])
assert len(peers) == 5
# ── Greeting deduplication logic ─────────────────────────────────────────────
class TestGreetingDedup:
def test_new_name_added(self):
greeted: Dict[str, float] = {}
name = "Alice"
if name not in greeted:
greeted[name] = time.time()
assert "Alice" in greeted
def test_existing_name_not_overwritten(self):
greeted = {"Alice": 1000.0}
t_before = greeted["Alice"]
if "Alice" not in greeted:
greeted["Alice"] = time.time()
assert greeted["Alice"] == t_before
def test_peer_names_merged(self):
greeted: Dict[str, float] = {}
peer_names = ["Alice", "Bob"]
for n in peer_names:
if n and n not in greeted:
greeted[n] = time.time()
assert "Alice" in greeted and "Bob" in greeted
def test_expire_old_names(self):
greeted = {"Alice": time.time() - 400, "Bob": time.time()}
memory_s = 300.0
fresh = {n: t for n, t in greeted.items() if time.time() - t < memory_s}
assert "Alice" not in fresh and "Bob" in fresh
# ── Handoff trigger logic ─────────────────────────────────────────────────────
class TestHandoffTrigger:
def _should_handoff(self, prev_state, new_state, pid, sent):
_LEAVING = 4
return (prev_state != _LEAVING
and new_state == _LEAVING
and str(pid) not in sent)
def test_triggers_on_leaving(self):
assert self._should_handoff(2, 4, 1, set())
def test_no_duplicate_handoff(self):
sent = {"1"}
assert not self._should_handoff(2, 4, 1, sent)
def test_no_trigger_if_already_leaving(self):
assert not self._should_handoff(4, 4, 1, set())
def test_triggers_for_different_person(self):
sent = {"1"}
assert self._should_handoff(2, 4, 2, sent)
def test_handoff_sent_cleared_when_person_gone(self):
sent = {"1", "2", "3"}
active_pids = {"2", "3"}
sent &= active_pids
assert "1" not in sent and "2" in sent
# ── JSON status blob ──────────────────────────────────────────────────────────
class TestPeersBlob:
def test_peers_json(self):
peers = {
"bot2": {
"last_seen": time.time(),
"msg": type("M", (), {
"social_state": "idle",
"active_person_ids": [],
})(),
}
}
entries = [
{
"robot_id": rid,
"social_state": d["msg"].social_state,
"active_persons": len(d["msg"].active_person_ids),
"age_s": round(time.time() - d["last_seen"], 1),
}
for rid, d in peers.items()
]
blob = json.dumps({"peers": entries, "ts": time.time()})
parsed = json.loads(blob)
assert len(parsed["peers"]) == 1
assert parsed["peers"][0]["robot_id"] == "bot2"
def test_greeted_json(self):
greeted = {"Alice": time.time(), "Bob": time.time()}
names = list(greeted.keys())
blob = json.dumps({"greeted": names, "ts": time.time()})
parsed = json.loads(blob)
assert "Alice" in parsed["greeted"] and "Bob" in parsed["greeted"]

View File

@ -41,6 +41,9 @@ rosidl_generate_interfaces(${PROJECT_NAME}
# Issue #161 emotion detection
"msg/Expression.msg"
"msg/ExpressionArray.msg"
# Issue #171 robot mesh comms
"msg/MeshPeer.msg"
"msg/MeshHandoff.msg"
DEPENDENCIES std_msgs geometry_msgs builtin_interfaces
)

View File

@ -0,0 +1,27 @@
# MeshHandoff.msg — Person context transfer between robots.
# Published on /social/mesh/handoff when a tracked person transitions to
# STATE_LEAVING, enabling a nearby robot to warm-start LLM conversation context.
# Issue #171
std_msgs/Header header
string from_robot_id # Robot publishing this handoff
string to_robot_id # Intended recipient robot_id; "" = broadcast
# Person identity
string person_id # e.g. "person_42"
string person_name # "" if unknown
int32 face_id # -1 if unknown
string speaker_id # "" if unknown
# Spatial context at handoff time
float32 bearing_to_target_deg # Person's bearing from from_robot (°, +CW from forward)
float32 distance_m # Person's distance from from_robot (metres)
float32 engagement_score # 0..1
# Conversation warm-start context
string conversation_summary # LLM-generated summary of prior exchanges
string last_utterance # Person's last utterance to from_robot
string last_response # Robot's last response to person
string language # BCP-47 code e.g. "en", "fr" (empty = unknown)

View File

@ -0,0 +1,20 @@
# MeshPeer.msg — Robot social presence announcement.
# Published by mesh_comms_node on /social/mesh/announce at 1 Hz.
# All robots sharing the same DDS domain receive peer announcements via multicast.
# Issue #171
std_msgs/Header header
string robot_id # e.g. "saltybot_1"
string namespace # ROS2 namespace, e.g. "/saltybot_1" (empty = default)
# Current social pipeline state (mirrors orchestrator PipelineState)
string social_state # "idle" | "listening" | "thinking" | "speaking" | "throttled"
# Active person IDs this robot is currently engaged with or talking to
int32[] active_person_ids
# Names greeted since last boot — used for mesh-wide greeting deduplication
string[] greeted_person_names
float32 battery_pct # 0..100; -1 = unknown