Merge pull request 'feat(tests): social-bot integration test suite (Issue #108)' (#118) from sl-jetson/issue-108-integration-tests into main
Some checks failed
social-bot integration tests / Lint (flake8 + pep257) (push) Failing after 1m8s
social-bot integration tests / Core integration tests (mock sensors, no GPU) (push) Has been skipped
social-bot integration tests / Latency profiling (GPU, Orin) (push) Has been cancelled

This commit is contained in:
sl-jetson 2026-03-02 09:03:18 -05:00
commit f278f0fd06
23 changed files with 3035 additions and 0 deletions

View File

@ -0,0 +1,239 @@
# .gitea/workflows/social-tests-ci.yml
# Gitea Actions CI pipeline for social-bot integration tests (Issue #108)
#
# Triggers: push or PR to main, and any branch matching sl-jetson/*
#
# Jobs:
# 1. lint — flake8 + pep257 on saltybot_social_tests
# 2. core-tests — launch + topic + services + GPU-budget + shutdown (no GPU)
# 3. latency — end-to-end latency profiling (GPU runner, optional)
name: social-bot integration tests
on:
push:
branches:
- main
- "sl-jetson/**"
paths:
- "jetson/ros2_ws/src/saltybot_social/**"
- "jetson/ros2_ws/src/saltybot_social_tests/**"
- "jetson/ros2_ws/src/saltybot_social_msgs/**"
- "jetson/ros2_ws/src/saltybot_social_nav/**"
- "jetson/ros2_ws/src/saltybot_social_face/**"
- "jetson/ros2_ws/src/saltybot_social_tracking/**"
- ".gitea/workflows/social-tests-ci.yml"
pull_request:
branches:
- main
paths:
- "jetson/ros2_ws/src/saltybot_social*/**"
- ".gitea/workflows/social-tests-ci.yml"
env:
ROS_DOMAIN_ID: "108"
SOCIAL_TEST_FULL: "0"
SOCIAL_STACK_RUNNING: "1"
jobs:
# ── 1. Lint ──────────────────────────────────────────────────────────────────
lint:
name: Lint (flake8 + pep257)
runs-on: ubuntu-latest
container:
image: ros:humble-ros-base-jammy
steps:
- uses: actions/checkout@v4
- name: Install lint tools
run: |
pip3 install --no-cache-dir flake8 pydocstyle
- name: flake8 — saltybot_social_tests
run: |
flake8 \
jetson/ros2_ws/src/saltybot_social_tests/saltybot_social_tests/ \
jetson/ros2_ws/src/saltybot_social_tests/test/ \
--max-line-length=100 \
--ignore=E501,W503 \
--exclude=__pycache__
- name: pep257 — saltybot_social_tests
run: |
pydocstyle \
jetson/ros2_ws/src/saltybot_social_tests/saltybot_social_tests/ \
jetson/ros2_ws/src/saltybot_social_tests/test/ \
--add-ignore=D100,D104,D205,D400 \
--match='(?!test_helpers|mock_sensors).*\.py' || true
# ── 2. Core integration tests (no GPU) ───────────────────────────────────────
core-tests:
name: Core integration tests (mock sensors, no GPU)
needs: lint
runs-on: ubuntu-latest
container:
image: ros:humble-ros-base-jammy
env:
ROS_DOMAIN_ID: "108"
SOCIAL_TEST_FULL: "0"
SOCIAL_STACK_RUNNING: "1"
PYTHONUNBUFFERED: "1"
steps:
- uses: actions/checkout@v4
- name: Install dependencies
run: |
apt-get update -q && apt-get install -y --no-install-recommends \
ros-humble-launch-testing \
ros-humble-launch-testing-ros \
ros-humble-vision-msgs \
ros-humble-tf2-ros \
ros-humble-tf2-geometry-msgs \
ros-humble-cv-bridge \
ros-humble-nav-msgs \
ros-humble-std-srvs \
procps \
&& rm -rf /var/lib/apt/lists/*
pip3 install --no-cache-dir "pytest>=7.0" "pytest-timeout>=2.1"
- name: Build workspace
run: |
. /opt/ros/humble/setup.bash
cd /
# Symlink source into a temp workspace to avoid long paths
mkdir -p /ros2_ws/src
for pkg in saltybot_social_msgs saltybot_social saltybot_social_face \
saltybot_social_nav saltybot_social_enrollment \
saltybot_social_tracking saltybot_social_personality \
saltybot_social_tests; do
ln -sfn \
"$GITHUB_WORKSPACE/jetson/ros2_ws/src/$pkg" \
"/ros2_ws/src/$pkg"
done
cd /ros2_ws
colcon build \
--packages-select \
saltybot_social_msgs \
saltybot_social \
saltybot_social_face \
saltybot_social_nav \
saltybot_social_enrollment \
saltybot_social_tracking \
saltybot_social_personality \
saltybot_social_tests \
--symlink-install \
--event-handlers console_cohesion+
- name: Launch test stack (background)
run: |
. /opt/ros/humble/setup.bash
. /ros2_ws/install/setup.bash
ros2 daemon start || true
ros2 launch saltybot_social_tests social_test.launch.py \
enable_speech:=false enable_llm:=false enable_tts:=false \
&
echo "LAUNCH_PID=$!" >> "$GITHUB_ENV"
# Wait for stack to come up
sleep 15
- name: Run core tests
run: |
. /opt/ros/humble/setup.bash
. /ros2_ws/install/setup.bash
pytest \
"$GITHUB_WORKSPACE/jetson/ros2_ws/src/saltybot_social_tests/test/" \
-v \
--timeout=120 \
--tb=short \
-m "not gpu_required and not full_stack and not slow" \
--junit-xml=test-results-core.xml \
-k "not test_latency_sla and not test_shutdown"
- name: Shutdown test stack
if: always()
run: |
[ -n "${LAUNCH_PID:-}" ] && kill -SIGTERM "$LAUNCH_PID" 2>/dev/null || true
sleep 3
- name: Run shutdown tests
run: |
. /opt/ros/humble/setup.bash
. /ros2_ws/install/setup.bash
pytest \
"$GITHUB_WORKSPACE/jetson/ros2_ws/src/saltybot_social_tests/test/test_shutdown.py" \
-v --timeout=60 --tb=short \
--junit-xml=test-results-shutdown.xml
- name: Upload test results
if: always()
uses: actions/upload-artifact@v4
with:
name: test-results-core
path: test-results-*.xml
# ── 3. GPU latency tests (Orin runner, optional) ──────────────────────────────
latency-gpu:
name: Latency profiling (GPU, Orin)
needs: core-tests
runs-on: [self-hosted, orin, gpu]
if: github.ref == 'refs/heads/main' || contains(github.event.pull_request.labels.*.name, 'run-gpu-tests')
env:
ROS_DOMAIN_ID: "108"
SOCIAL_TEST_FULL: "1"
SOCIAL_TEST_SPEECH: "0"
SOCIAL_STACK_RUNNING: "1"
steps:
- uses: actions/checkout@v4
- name: Build workspace (Orin)
run: |
. /opt/ros/humble/setup.bash
cd /ros2_ws
colcon build \
--packages-select saltybot_social_tests saltybot_social \
--symlink-install
- name: Launch full stack (with LLM, no speech/TTS)
run: |
. /opt/ros/humble/setup.bash
. /ros2_ws/install/setup.bash
SOCIAL_TEST_FULL=1 \
ros2 launch saltybot_social_tests social_test.launch.py \
enable_speech:=false enable_tts:=false \
&
echo "LAUNCH_PID=$!" >> "$GITHUB_ENV"
sleep 20
- name: GPU memory check
run: |
. /opt/ros/humble/setup.bash
. /ros2_ws/install/setup.bash
SOCIAL_STACK_RUNNING=1 \
GPU_BASELINE_MB=$(nvidia-smi --query-gpu=memory.used --format=csv,noheader,nounits) \
pytest \
"$GITHUB_WORKSPACE/jetson/ros2_ws/src/saltybot_social_tests/test/test_gpu_memory.py" \
-v --timeout=120 --tb=short \
--junit-xml=test-results-gpu.xml
- name: Latency profiling
run: |
. /opt/ros/humble/setup.bash
. /ros2_ws/install/setup.bash
SOCIAL_TEST_FULL=1 SOCIAL_STACK_RUNNING=1 \
pytest \
"$GITHUB_WORKSPACE/jetson/ros2_ws/src/saltybot_social_tests/test/test_latency.py" \
-v --timeout=300 --tb=short \
--junit-xml=test-results-latency.xml
- name: Shutdown
if: always()
run: |
[ -n "${LAUNCH_PID:-}" ] && kill -SIGTERM "$LAUNCH_PID" 2>/dev/null || true
sleep 5
- name: Upload GPU results
if: always()
uses: actions/upload-artifact@v4
with:
name: test-results-gpu
path: test-results-*.xml

View File

@ -0,0 +1,55 @@
# Social-bot Test Bags
This directory holds rosbag2 recordings used for replaying mock sensor data in CI.
## Bag contents
Each test bag records the following topics for **30 seconds** at real-time rate:
| Topic | Type | Rate | Notes |
|---|---|---|---|
| `/camera/color/image_raw` | `sensor_msgs/Image` | 15 Hz | 640×480 RGB8 |
| `/uwb/target` | `geometry_msgs/PoseStamped` | 10 Hz | simulated UWB |
| `/social/faces/detections` | `FaceDetectionArray` | 15 Hz | may be empty |
| `/social/tracking/fused_target` | `FusedTarget` | 10 Hz | single person |
| `/social/persons` | `PersonStateArray` | 5 Hz | one engaged person |
| `/social/orchestrator/state` | `std_msgs/String` | 2 Hz | JSON state |
| `/odom` | `nav_msgs/Odometry` | 20 Hz | zero motion |
| `/tf`, `/tf_static` | `TF2` | varies | camera → base_link |
## Recording a new bag
On hardware with the full social-bot stack running:
```bash
# Record 30s of live sensor data
./bags/record_social_test.sh --duration 30
# Record from mock sensor pub only (no hardware)
MOCK_ONLY=1 ./bags/record_social_test.sh --duration 15 --name social_test_mock
```
## Replaying a bag in CI
The Docker CI entrypoint uses live mock publishers (not rosbag replay) for the
default CI tests. To use a recorded bag instead:
```bash
# In a sourced ROS2 shell:
ros2 bag play bags/social_test_YYYYMMDD_HHMMSS/ --loop --rate 1.0
```
Then run the test suite against the replayed data:
```bash
export SOCIAL_STACK_RUNNING=1
pytest test/ -v -m "not full_stack"
```
## Bag naming convention
`social_test_YYYYMMDD_HHMMSS` — date/time of recording.
Bags committed to this directory should be small (< 50 MB) use a 5-second
clip or compress images to avoid bloating the repository. Large reference bags
(> 100 MB) should be stored on the NVMe and referenced by path.

View File

@ -0,0 +1,100 @@
#!/bin/bash
# record_social_test.sh — Record a rosbag for social-bot integration test replay.
#
# Records all topics required by the social-bot CI test suite.
# Output: bags/social_test_YYYYMMDD_HHMMSS/ (ROS2 bag format)
#
# Usage:
# # Record 30s with live hardware:
# ./bags/record_social_test.sh --duration 30
#
# # Record from already-running mock sensor stack:
# MOCK_ONLY=1 ./bags/record_social_test.sh --duration 15
#
# Requirements: ROS2 Humble sourced, social-bot stack running
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
BAGS_DIR="$SCRIPT_DIR"
# ── Args ──────────────────────────────────────────────────────────────────────
DURATION=30
BAG_NAME="social_test_$(date +%Y%m%d_%H%M%S)"
MOCK_ONLY="${MOCK_ONLY:-0}"
while [[ $# -gt 0 ]]; do
case "$1" in
--duration) DURATION="$2"; shift 2 ;;
--name) BAG_NAME="$2"; shift 2 ;;
*) echo "Unknown arg: $1"; exit 1 ;;
esac
done
BAG_PATH="$BAGS_DIR/$BAG_NAME"
# ── Source ROS2 ───────────────────────────────────────────────────────────────
source /opt/ros/humble/setup.bash 2>/dev/null || true
if [ -f /ros2_ws/install/setup.bash ]; then
source /ros2_ws/install/setup.bash
fi
echo "[record] Target bag: $BAG_PATH"
echo "[record] Duration: ${DURATION}s"
# ── Topics to record ──────────────────────────────────────────────────────────
TOPICS=(
# Mock / hardware sensor topics
"/camera/color/image_raw"
"/camera/color/camera_info"
"/camera/depth/image_rect_raw"
"/uwb/target"
"/scan"
"/odom"
"/map"
# Social-bot output topics
"/social/faces/detections"
"/social/faces/embeddings"
"/social/tracking/fused_target"
"/social/persons"
"/social/orchestrator/state"
"/social/speech/vad_state"
"/social/speech/transcript"
"/social/conversation/response"
"/social/nav/mode"
"/social/nav/status"
"/social/attention/target_id"
"/cmd_vel"
# TF for coordinate frames
"/tf"
"/tf_static"
)
# ── Start mock sensor pub if MOCK_ONLY ────────────────────────────────────────
MOCK_PID=""
if [[ "$MOCK_ONLY" == "1" ]]; then
echo "[record] Starting mock sensor publisher..."
ros2 run saltybot_social_tests mock_sensor_pub &
MOCK_PID=$!
sleep 2
fi
# ── Record ────────────────────────────────────────────────────────────────────
echo "[record] Recording..."
ros2 bag record \
--output "$BAG_PATH" \
--storage sqlite3 \
--max-bag-duration "$DURATION" \
"${TOPICS[@]}" &
BAG_PID=$!
sleep "$DURATION"
wait "$BAG_PID" 2>/dev/null || true
# ── Cleanup ───────────────────────────────────────────────────────────────────
if [[ -n "$MOCK_PID" ]]; then
kill "$MOCK_PID" 2>/dev/null || true
fi
echo "[record] Done. Bag saved to: $BAG_PATH"
echo "[record] Contents:"
ros2 bag info "$BAG_PATH" 2>/dev/null || echo "(ros2 bag info unavailable)"

View File

@ -0,0 +1,93 @@
# Dockerfile.ci — Lightweight CI test container for social-bot integration tests.
#
# Does NOT include GPU AI deps (whisper/LLM/TTS/TensorRT).
# Runs the core ROS2 stack with mock sensors only — no real hardware required.
#
# Build:
# docker build -f docker/Dockerfile.ci \
# -t saltybot/social-tests-ci:latest \
# ../../../../ # context = saltylab-firmware root
#
# Run (via compose):
# docker compose -f docker/docker-compose.ci.yml up --exit-code-from test-runner
FROM ros:humble-ros-base-jammy
LABEL maintainer="sl-jetson"
LABEL description="social-bot integration tests — CI mode (no GPU, mock sensors)"
ENV DEBIAN_FRONTEND=noninteractive
ENV ROS_DISTRO=humble
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
# ── System deps ───────────────────────────────────────────────────────────────
RUN apt-get update && apt-get install -y --no-install-recommends \
python3-dev python3-pip python3-setuptools python3-wheel \
python3-pytest \
ros-humble-launch-testing \
ros-humble-launch-testing-ament-cmake \
ros-humble-launch-testing-ros \
ros-humble-rosbag2 \
ros-humble-rosbag2-storage-default-plugins \
ros-humble-vision-msgs \
ros-humble-tf2-ros \
ros-humble-tf2-geometry-msgs \
ros-humble-cv-bridge \
ros-humble-nav-msgs \
ros-humble-geometry-msgs \
ros-humble-sensor-msgs \
ros-humble-std-srvs \
procps \
&& rm -rf /var/lib/apt/lists/*
# ── Python test deps ──────────────────────────────────────────────────────────
RUN pip3 install --no-cache-dir \
"pytest>=7.0" \
"pytest-timeout>=2.1" \
"pytest-xdist>=3.0"
# ── Workspace ─────────────────────────────────────────────────────────────────
RUN mkdir -p /ros2_ws/src
WORKDIR /ros2_ws
# Copy only the packages needed for the social-bot integration tests
COPY jetson/ros2_ws/src/saltybot_social_msgs /ros2_ws/src/saltybot_social_msgs/
COPY jetson/ros2_ws/src/saltybot_social /ros2_ws/src/saltybot_social/
COPY jetson/ros2_ws/src/saltybot_social_face /ros2_ws/src/saltybot_social_face/
COPY jetson/ros2_ws/src/saltybot_social_nav /ros2_ws/src/saltybot_social_nav/
COPY jetson/ros2_ws/src/saltybot_social_enrollment /ros2_ws/src/saltybot_social_enrollment/
COPY jetson/ros2_ws/src/saltybot_social_tracking /ros2_ws/src/saltybot_social_tracking/
COPY jetson/ros2_ws/src/saltybot_social_personality /ros2_ws/src/saltybot_social_personality/
COPY jetson/ros2_ws/src/saltybot_social_tests /ros2_ws/src/saltybot_social_tests/
# ── Build ─────────────────────────────────────────────────────────────────────
SHELL ["/bin/bash", "-c"]
RUN source /opt/ros/humble/setup.bash && \
cd /ros2_ws && \
colcon build \
--packages-select \
saltybot_social_msgs \
saltybot_social \
saltybot_social_face \
saltybot_social_nav \
saltybot_social_enrollment \
saltybot_social_tracking \
saltybot_social_personality \
saltybot_social_tests \
--symlink-install \
--cmake-args -DBUILD_TESTING=ON \
&& echo "Build complete"
# ── Test entrypoint ───────────────────────────────────────────────────────────
COPY jetson/ros2_ws/src/saltybot_social_tests/docker/entrypoint-ci.sh /entrypoint-ci.sh
RUN chmod +x /entrypoint-ci.sh
# CI env defaults (all can be overridden via docker-compose environment:)
ENV SOCIAL_TEST_FULL=0
ENV SOCIAL_TEST_SPEECH=0
ENV SOCIAL_STACK_RUNNING=0
ENV ROS_DOMAIN_ID=108
ENTRYPOINT ["/entrypoint-ci.sh"]
CMD ["pytest", "/ros2_ws/src/saltybot_social_tests/test/", "-v", "--timeout=120"]

View File

@ -0,0 +1,75 @@
# docker-compose.ci.yml — CI test runner for social-bot integration tests.
#
# Usage:
# # Default: core CI tests (no GPU, mock sensors only)
# docker compose -f docker/docker-compose.ci.yml up --exit-code-from test-runner
#
# # Full tests with GPU (requires nvidia container runtime):
# SOCIAL_TEST_FULL=1 \
# docker compose -f docker/docker-compose.ci.yml \
# --profile gpu up --exit-code-from test-runner
#
# Build image first:
# docker compose -f docker/docker-compose.ci.yml build
version: "3.9"
# ── Shared env ────────────────────────────────────────────────────────────────
x-common-env: &common-env
ROS_DOMAIN_ID: "108"
PYTHONUNBUFFERED: "1"
services:
# ── Core CI test runner (no GPU) ─────────────────────────────────────────────
test-runner:
build:
context: ../../../../ # saltylab-firmware root
dockerfile: jetson/ros2_ws/src/saltybot_social_tests/docker/Dockerfile.ci
image: saltybot/social-tests-ci:latest
container_name: social_tests_ci
environment:
<<: *common-env
SOCIAL_TEST_FULL: "${SOCIAL_TEST_FULL:-0}"
SOCIAL_TEST_SPEECH: "${SOCIAL_TEST_SPEECH:-0}"
SOCIAL_STACK_RUNNING: "1" # stack is started inside entrypoint
command: >
pytest
/ros2_ws/src/saltybot_social_tests/test/
-v
--timeout=120
--tb=short
-m "not gpu_required and not full_stack"
--junit-xml=/tmp/test-results/social-tests.xml
volumes:
- test-results:/tmp/test-results
# No network_mode host needed — ROS2 uses shared memory loopback in container
# ── GPU full-stack tests (Orin / CUDA host) ───────────────────────────────
test-runner-gpu:
extends:
service: test-runner
container_name: social_tests_ci_gpu
environment:
<<: *common-env
SOCIAL_TEST_FULL: "1"
SOCIAL_TEST_SPEECH: "1"
SOCIAL_STACK_RUNNING: "1"
command: >
pytest
/ros2_ws/src/saltybot_social_tests/test/
-v
--timeout=300
--tb=short
--junit-xml=/tmp/test-results/social-tests-gpu.xml
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: all
capabilities: [gpu]
profiles:
- gpu
volumes:
test-results:

View File

@ -0,0 +1,36 @@
#!/bin/bash
# entrypoint-ci.sh — CI container entrypoint for social-bot integration tests.
set -euo pipefail
source /opt/ros/humble/setup.bash
source /ros2_ws/install/setup.bash
# Start the ROS2 daemon in the background so node discovery works
ros2 daemon start 2>/dev/null || true
# Launch the test stack in the background using the test launch file
# (orchestrator + face + tracking + mock_sensors — no GPU speech/LLM/TTS)
echo "[CI] Launching social-bot test stack..."
ros2 launch saltybot_social_tests social_test.launch.py \
enable_speech:=false \
enable_llm:=false \
enable_tts:=false \
&
LAUNCH_PID=$!
# Give the stack up to 30s to come up
echo "[CI] Waiting for stack to initialise (max 30s)..."
sleep 10
# Run tests (command passed as args to this script)
echo "[CI] Running tests: $*"
"$@"
TEST_EXIT=$?
# Graceful shutdown
echo "[CI] Shutting down stack (PID $LAUNCH_PID)..."
kill -SIGTERM "$LAUNCH_PID" 2>/dev/null || true
sleep 5
kill -SIGKILL "$LAUNCH_PID" 2>/dev/null || true
exit $TEST_EXIT

View File

@ -0,0 +1,177 @@
"""social_test.launch.py — Launch social-bot stack in test/CI mode.
Differences from production social_bot.launch.py:
- speech/TTS disabled by default (no mic/speaker in CI)
- LLM disabled by default (no model file in CI; use CI_FULL=1 to enable)
- mock_sensor_pub node started automatically
- Shorter TimerAction delays (2s instead of production delays)
- ROS_DOMAIN_ID isolated to avoid collision with other test runs
Env vars:
SOCIAL_TEST_FULL=1 enable speech + LLM + TTS (requires GPU + models)
SOCIAL_TEST_SPEECH=1 enable speech pipeline only
"""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import (
DeclareLaunchArgument, GroupAction, LogInfo, TimerAction, SetEnvironmentVariable
)
from launch.conditions import IfCondition
from launch.substitutions import LaunchConfiguration, PythonExpression, EnvironmentVariable
from launch_ros.actions import Node
_FULL = os.environ.get("SOCIAL_TEST_FULL", "0") == "1"
_SPEECH = _FULL or os.environ.get("SOCIAL_TEST_SPEECH", "0") == "1"
def generate_launch_description() -> LaunchDescription:
social_pkg = get_package_share_directory("saltybot_social")
tests_pkg = get_package_share_directory("saltybot_social_tests")
args = [
DeclareLaunchArgument("enable_speech", default_value="true" if _SPEECH else "false"),
DeclareLaunchArgument("enable_llm", default_value="true" if _FULL else "false"),
DeclareLaunchArgument("enable_tts", default_value="true" if _FULL else "false"),
DeclareLaunchArgument("enable_orchestrator", default_value="true"),
DeclareLaunchArgument("enable_face", default_value="true"),
DeclareLaunchArgument("enable_tracking", default_value="true"),
DeclareLaunchArgument("enable_nav", default_value="true"),
DeclareLaunchArgument("enable_mock_sensors", default_value="true"),
]
# ── Mock sensor publisher (always first) ─────────────────────────────────
mock_sensors = Node(
package="saltybot_social_tests",
executable="mock_sensor_pub",
name="mock_sensor_pub",
output="screen",
condition=IfCondition(LaunchConfiguration("enable_mock_sensors")),
)
# ── Orchestrator (t=0s) ──────────────────────────────────────────────────
orchestrator = Node(
package="saltybot_social",
executable="orchestrator_node",
name="orchestrator_node",
parameters=[
os.path.join(social_pkg, "config", "orchestrator_params.yaml"),
{"watchdog_timeout_s": 60.0, # more lenient in CI
"gpu_mem_throttle_mb": 500.0}, # low floor for CI without GPU
],
condition=IfCondition(LaunchConfiguration("enable_orchestrator")),
output="screen",
)
# ── Face recognition (t=1s) ──────────────────────────────────────────────
face = TimerAction(period=1.0, actions=[
GroupAction(
condition=IfCondition(LaunchConfiguration("enable_face")),
actions=[
LogInfo(msg="[social_test] Starting face recognition..."),
Node(
package="saltybot_social_face",
executable="face_recognition",
name="face_recognition_node",
parameters=[{"use_trt": False, # CPU ONNX in CI
"detection_threshold": 0.5,
"gallery_path": "/tmp/social_test_gallery"}],
output="screen",
),
]
)
])
# ── Tracking fusion (t=1s) ───────────────────────────────────────────────
tracking = TimerAction(period=1.0, actions=[
GroupAction(
condition=IfCondition(LaunchConfiguration("enable_tracking")),
actions=[
Node(
package="saltybot_social_tracking",
executable="tracking_fusion_node",
name="tracking_fusion_node",
output="screen",
),
]
)
])
# ── Speech pipeline (t=2s, optional) ────────────────────────────────────
speech = TimerAction(period=2.0, actions=[
GroupAction(
condition=IfCondition(LaunchConfiguration("enable_speech")),
actions=[
LogInfo(msg="[social_test] Starting speech pipeline..."),
Node(
package="saltybot_social",
executable="speech_pipeline_node",
name="speech_pipeline_node",
parameters=[
os.path.join(social_pkg, "config", "speech_params.yaml"),
{"whisper_model": "tiny", # faster in CI
"use_silero_vad": False}, # avoid download in CI
],
output="screen",
),
]
)
])
# ── LLM + TTS (t=3s, optional) ──────────────────────────────────────────
llm = TimerAction(period=3.0, actions=[
GroupAction(
condition=IfCondition(LaunchConfiguration("enable_llm")),
actions=[
Node(
package="saltybot_social",
executable="conversation_node",
name="conversation_node",
parameters=[
os.path.join(social_pkg, "config", "conversation_params.yaml"),
],
output="screen",
),
]
)
])
tts = TimerAction(period=3.0, actions=[
GroupAction(
condition=IfCondition(LaunchConfiguration("enable_tts")),
actions=[
Node(
package="saltybot_social",
executable="tts_node",
name="tts_node",
parameters=[
os.path.join(social_pkg, "config", "tts_params.yaml"),
{"playback_enabled": False}, # no speaker in CI
],
output="screen",
),
]
)
])
# ── Social nav (t=2s) ────────────────────────────────────────────────────
nav = TimerAction(period=2.0, actions=[
GroupAction(
condition=IfCondition(LaunchConfiguration("enable_nav")),
actions=[
Node(
package="saltybot_social_nav",
executable="social_nav",
name="social_nav_node",
parameters=[{"use_midas_depth": False}], # no GPU depth in CI
output="screen",
),
]
)
])
return LaunchDescription(args + [
mock_sensors, orchestrator,
face, tracking, speech, llm, tts, nav,
])

View File

@ -0,0 +1,43 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_social_tests</name>
<version>0.1.0</version>
<description>
Integration test suite for the social-bot pipeline (Issue #108).
Tests: launch verification, topic rates, services, GPU memory budget,
end-to-end latency profiling, graceful shutdown.
CI-compatible via Docker + mock sensor publishers.
</description>
<maintainer email="seb@vayrette.com">sl-jetson</maintainer>
<license>MIT</license>
<!-- Runtime deps for test nodes -->
<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>sensor_msgs</depend>
<depend>geometry_msgs</depend>
<depend>nav_msgs</depend>
<!-- Social-bot packages under test -->
<depend>saltybot_social</depend>
<depend>saltybot_social_msgs</depend>
<depend>saltybot_social_face</depend>
<depend>saltybot_social_nav</depend>
<depend>saltybot_social_enrollment</depend>
<depend>saltybot_social_tracking</depend>
<depend>saltybot_social_personality</depend>
<!-- Test framework -->
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<test_depend>launch_testing</test_depend>
<test_depend>launch_testing_ament_cmake</test_depend>
<test_depend>launch_testing_ros</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,27 @@
[pytest]
# social-bot integration test configuration
# Markers registered here to silence PytestUnknownMarkWarning
markers =
gpu_required: test requires an NVIDIA GPU
full_stack: test requires SOCIAL_TEST_FULL=1
stack_running: test requires SOCIAL_STACK_RUNNING=1
slow: test takes more than 30 seconds
launch_test: launch_testing integration test
# Default test timeout (seconds) — individual tests can override
timeout = 120
# Show locals on failure for easier debugging
showlocals = false
# Brief summary: failed (f) + errors (e) + skipped (s) + passed (p)
addopts = -ra --tb=short
# Test paths
testpaths = test
# Filters
filterwarnings =
ignore::DeprecationWarning:launch_testing
ignore::UserWarning:rclpy

View File

@ -0,0 +1 @@
# saltybot_social_tests — integration test harness for social-bot pipeline

View File

@ -0,0 +1,228 @@
"""mock_sensors.py — Mock sensor publishers for CI integration tests.
Publishes minimal valid data on all topics that social-bot nodes subscribe to,
so the pipeline can start and produce output without real hardware.
Topics published:
/camera/color/image_raw blank 640×480 RGB8 image at 15Hz
/social/speech/transcript canned SpeechTranscript at 0.5Hz
/social/faces/detections empty FaceDetectionArray at 15Hz
/social/tracking/fused_target synthetic FusedTarget at 10Hz
/social/persons single synthetic PersonStateArray at 5Hz
/uwb/target synthetic PoseStamped at 10Hz
Usage (standalone):
ros2 run saltybot_social_tests mock_sensor_pub
Usage (programmatic):
from saltybot_social_tests.mock_sensors import MockSensorPublisher
pub = MockSensorPublisher(node)
pub.start()
...
pub.stop()
"""
from __future__ import annotations
import threading
import time
import struct
from typing import Optional
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSDurabilityPolicy
from std_msgs.msg import Header, String
from sensor_msgs.msg import Image
from geometry_msgs.msg import PoseStamped, Point, Quaternion, Vector3
# ── Helpers ───────────────────────────────────────────────────────────────────
def _now_header(node: Node, frame_id: str = "camera_color_optical_frame") -> Header:
h = Header()
h.stamp = node.get_clock().now().to_msg()
h.frame_id = frame_id
return h
def _blank_rgb_image(node: Node, width: int = 640, height: int = 480) -> Image:
img = Image()
img.header = _now_header(node)
img.width = width
img.height = height
img.encoding = "rgb8"
img.step = width * 3
img.data = bytes(width * height * 3) # all-black frame
return img
# ── Mock publisher class ──────────────────────────────────────────────────────
class MockSensorPublisher:
"""Publishes mock sensor data for all social-bot pipeline inputs."""
def __init__(self, node: Node) -> None:
self._node = node
self._running = False
self._threads: list = []
be_qos = QoSProfile(
reliability=QoSReliabilityPolicy.BEST_EFFORT, depth=5
)
rel_qos = QoSProfile(
reliability=QoSReliabilityPolicy.RELIABLE, depth=10
)
# Camera image (required by face detection + segmentation)
self._img_pub = node.create_publisher(Image, "/camera/color/image_raw", be_qos)
# Fused tracking target (required by social_nav, person_follower)
try:
from saltybot_social_msgs.msg import FusedTarget, FaceDetectionArray, PersonStateArray
self._fused_pub = node.create_publisher(
FusedTarget, "/social/tracking/fused_target", rel_qos
)
self._faces_pub = node.create_publisher(
FaceDetectionArray, "/social/faces/detections", be_qos
)
self._persons_pub = node.create_publisher(
PersonStateArray, "/social/persons", rel_qos
)
self._has_social_msgs = True
except ImportError:
self._has_social_msgs = False
node.get_logger().warn("saltybot_social_msgs not found — limited mock data")
try:
from saltybot_social_msgs.msg import SpeechTranscript
self._transcript_pub = node.create_publisher(
SpeechTranscript, "/social/speech/transcript", rel_qos
)
self._has_speech_msgs = True
except ImportError:
self._has_speech_msgs = False
# UWB target (required by person_follower fusion)
self._uwb_pub = node.create_publisher(PoseStamped, "/uwb/target", rel_qos)
self._canned_phrases = [
"Hello Salty, how are you?",
"What time is it?",
"Follow me please.",
"Tell me a joke.",
]
self._phrase_idx = 0
def start(self) -> None:
"""Start all mock publisher threads."""
self._running = True
specs = [
("camera", self._pub_camera, 1.0 / 15.0),
("faces", self._pub_faces, 1.0 / 15.0),
("fused", self._pub_fused, 1.0 / 10.0),
("persons", self._pub_persons, 1.0 / 5.0),
("uwb", self._pub_uwb, 1.0 / 10.0),
("transcript",self._pub_transcript, 2.0),
]
for name, fn, interval in specs:
t = threading.Thread(
target=self._loop, args=(fn, interval),
name=f"mock_{name}", daemon=True
)
t.start()
self._threads.append(t)
def stop(self) -> None:
self._running = False
def _loop(self, publish_fn, interval_s: float) -> None:
while self._running:
try:
publish_fn()
except Exception:
pass
time.sleep(interval_s)
def _pub_camera(self) -> None:
self._img_pub.publish(_blank_rgb_image(self._node))
def _pub_faces(self) -> None:
if not self._has_social_msgs:
return
from saltybot_social_msgs.msg import FaceDetectionArray
msg = FaceDetectionArray()
msg.header = _now_header(self._node)
# Empty array — camera is "working" but no face in frame
self._faces_pub.publish(msg)
def _pub_fused(self) -> None:
if not self._has_social_msgs:
return
from saltybot_social_msgs.msg import FusedTarget
msg = FusedTarget()
msg.header = _now_header(self._node, "base_link")
msg.position.x = 1.5 # 1.5m ahead
msg.position.y = 0.0
msg.velocity.x = 0.0
msg.track_id = 1
msg.confidence = 0.85
self._fused_pub.publish(msg)
def _pub_persons(self) -> None:
if not self._has_social_msgs:
return
from saltybot_social_msgs.msg import PersonStateArray, PersonState
msg = PersonStateArray()
msg.header = _now_header(self._node, "base_link")
p = PersonState()
p.header = msg.header
p.person_id = 1
p.person_name = "TestPerson"
p.state = PersonState.STATE_ENGAGED
p.distance = 1.5
p.bearing_deg = 0.0
p.engagement_score = 0.8
msg.persons = [p]
self._persons_pub.publish(msg)
def _pub_uwb(self) -> None:
msg = PoseStamped()
msg.header = _now_header(self._node, "base_link")
msg.pose.position.x = 1.5
msg.pose.position.y = 0.1
msg.pose.orientation.w = 1.0
self._uwb_pub.publish(msg)
def _pub_transcript(self) -> None:
if not self._has_speech_msgs:
return
from saltybot_social_msgs.msg import SpeechTranscript
msg = SpeechTranscript()
msg.header = _now_header(self._node)
msg.text = self._canned_phrases[self._phrase_idx % len(self._canned_phrases)]
msg.speaker_id = "test_person"
msg.confidence = 0.95
msg.audio_duration = 1.2
msg.is_partial = False
self._transcript_pub.publish(msg)
self._phrase_idx += 1
# ── Standalone entry point ────────────────────────────────────────────────────
def main(args=None) -> None:
rclpy.init(args=args)
node = rclpy.create_node("mock_sensor_pub")
pub = MockSensorPublisher(node)
pub.start()
node.get_logger().info("Mock sensor publisher running — Ctrl+C to stop")
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
pub.stop()
node.destroy_node()
rclpy.shutdown()

View File

@ -0,0 +1,262 @@
"""test_helpers.py — Shared helpers for social-bot integration tests.
No hardware dependencies. Provides:
- TopicRateChecker: measure actual publish rate on a topic
- NodeChecker: verify a named node is alive in the graph
- ServiceChecker: call a service and verify it responds
- GpuMemoryChecker: measure GPU allocation via pycuda / nvidia-smi
- TimeoutPoller: generic poll-until-true with timeout
"""
from __future__ import annotations
import subprocess
import threading
import time
from typing import Any, Callable, List, Optional
import rclpy
from rclpy.node import Node
from rclpy.qos import (
QoSProfile, QoSReliabilityPolicy, QoSDurabilityPolicy, QoSHistoryPolicy
)
# ── Timeout poller ────────────────────────────────────────────────────────────
def poll_until(
condition: Callable[[], bool],
timeout_s: float = 30.0,
interval_s: float = 0.5,
) -> bool:
"""Return True if condition() becomes True within timeout_s, else False."""
deadline = time.time() + timeout_s
while time.time() < deadline:
if condition():
return True
time.sleep(interval_s)
return False
# ── Node checker ──────────────────────────────────────────────────────────────
class NodeChecker:
"""Check whether ROS2 nodes are alive in the graph."""
def __init__(self, probe_node: Node) -> None:
self._node = probe_node
def is_alive(self, node_name: str, namespace: str = "/") -> bool:
"""Return True if node_name is present in the node graph."""
names_and_ns = self._node.get_node_names_and_namespaces()
for name, ns in names_and_ns:
if name == node_name:
return True
if ns.rstrip("/") + "/" + name == node_name:
return True
return False
def wait_for_nodes(
self,
node_names: List[str],
timeout_s: float = 30.0,
) -> dict:
"""Block until all nodes appear or timeout. Returns {name: found}."""
results = {n: False for n in node_names}
deadline = time.time() + timeout_s
while time.time() < deadline:
for name in node_names:
if not results[name]:
results[name] = self.is_alive(name)
if all(results.values()):
break
time.sleep(0.5)
return results
# ── Topic rate checker ─────────────────────────────────────────────────────────
class TopicRateChecker:
"""Measure the actual publish rate on a topic over a measurement window.
Usage:
checker = TopicRateChecker(node, "/social/faces/detections",
FaceDetectionArray, window_s=3.0)
checker.start()
time.sleep(5.0)
hz = checker.measured_hz()
checker.stop()
"""
def __init__(
self,
node: Node,
topic: str,
msg_type: Any,
window_s: float = 3.0,
qos: Optional[QoSProfile] = None,
) -> None:
self._window_s = window_s
self._timestamps: List[float] = []
self._lock = threading.Lock()
self._running = False
if qos is None:
qos = QoSProfile(
reliability=QoSReliabilityPolicy.BEST_EFFORT,
durability=QoSDurabilityPolicy.VOLATILE,
history=QoSHistoryPolicy.KEEP_LAST,
depth=10,
)
self._sub = node.create_subscription(
msg_type, topic, self._cb, qos
)
def _cb(self, _msg: Any) -> None:
now = time.time()
with self._lock:
self._timestamps.append(now)
# Prune old samples outside window
cutoff = now - self._window_s * 2
self._timestamps = [t for t in self._timestamps if t > cutoff]
def measured_hz(self) -> float:
"""Return Hz measured over the last window_s seconds."""
now = time.time()
cutoff = now - self._window_s
with self._lock:
recent = [t for t in self._timestamps if t > cutoff]
if len(recent) < 2:
return 0.0
return (len(recent) - 1) / (recent[-1] - recent[0])
def message_count(self) -> int:
"""Total messages received."""
with self._lock:
return len(self._timestamps)
def received_any(self) -> bool:
with self._lock:
return len(self._timestamps) > 0
# ── Service checker ────────────────────────────────────────────────────────────
class ServiceChecker:
"""Verify a ROS2 service is available and call it."""
def __init__(self, node: Node) -> None:
self._node = node
def is_available(self, service_name: str, srv_type: Any,
timeout_s: float = 5.0) -> bool:
"""Return True if the service becomes available within timeout_s."""
client = self._node.create_client(srv_type, service_name)
available = client.wait_for_service(timeout_sec=timeout_s)
self._node.destroy_client(client)
return available
def call_service(
self,
service_name: str,
srv_type: Any,
request: Any,
timeout_s: float = 10.0,
) -> Optional[Any]:
"""Call a service and return the response, or None on failure."""
import rclpy.executors
client = self._node.create_client(srv_type, service_name)
if not client.wait_for_service(timeout_sec=timeout_s):
self._node.destroy_client(client)
return None
future = client.call_async(request)
# Spin until done
deadline = time.time() + timeout_s
while not future.done() and time.time() < deadline:
rclpy.spin_once(self._node, timeout_sec=0.1)
self._node.destroy_client(client)
if future.done():
return future.result()
return None
# ── GPU memory checker ────────────────────────────────────────────────────────
class GpuMemoryChecker:
"""Query GPU memory usage. Gracefully no-ops if no GPU available."""
@staticmethod
def total_mb() -> Optional[float]:
"""Total GPU memory in MB."""
return GpuMemoryChecker._query("memory.total")
@staticmethod
def free_mb() -> Optional[float]:
"""Free GPU memory in MB."""
return GpuMemoryChecker._query("memory.free")
@staticmethod
def used_mb() -> Optional[float]:
"""Used GPU memory in MB."""
return GpuMemoryChecker._query("memory.used")
@staticmethod
def _query(field: str) -> Optional[float]:
try:
out = subprocess.check_output(
["nvidia-smi", f"--query-gpu={field}",
"--format=csv,noheader,nounits"],
timeout=5,
).decode().strip()
return float(out.split("\n")[0].strip())
except Exception:
pass
# Fallback: pycuda
try:
import pycuda.driver as drv
drv.init()
ctx = drv.Device(0).make_context()
free_b, total_b = drv.mem_get_info()
ctx.pop()
if field == "memory.free":
return free_b / 1024 / 1024
elif field == "memory.total":
return total_b / 1024 / 1024
elif field == "memory.used":
return (total_b - free_b) / 1024 / 1024
except Exception:
pass
return None
@staticmethod
def gpu_available() -> bool:
return GpuMemoryChecker.total_mb() is not None
# ── Process helpers ───────────────────────────────────────────────────────────
def count_zombie_processes(pattern: str = "ros") -> int:
"""Count zombie processes matching pattern via ps."""
try:
out = subprocess.check_output(
["ps", "aux"], timeout=5
).decode()
count = 0
for line in out.splitlines():
if "<defunct>" in line and pattern in line:
count += 1
return count
except Exception:
return 0
def get_process_pids(name_pattern: str) -> List[int]:
"""Return PIDs of processes matching name_pattern."""
try:
out = subprocess.check_output(
["pgrep", "-f", name_pattern], timeout=5
).decode().strip()
return [int(p) for p in out.splitlines() if p.strip()]
except Exception:
return []

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/saltybot_social_tests
[install]
install_scripts=$base/lib/saltybot_social_tests

View File

@ -0,0 +1,30 @@
from setuptools import find_packages, setup
import os
from glob import glob
package_name = 'saltybot_social_tests'
setup(
name=package_name,
version='0.1.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
(os.path.join('share', package_name, 'launch'),
glob(os.path.join('launch', '*launch.[pxy][yma]*'))),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='sl-jetson',
maintainer_email='seb@vayrette.com',
description='Integration test suite for social-bot pipeline (Issue #108)',
license='MIT',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'mock_sensor_pub = saltybot_social_tests.mock_sensors:main',
],
},
)

View File

@ -0,0 +1,113 @@
"""conftest.py — pytest configuration and shared fixtures for social-bot tests.
Custom markers:
@pytest.mark.gpu_required skip if no GPU / nvidia-smi
@pytest.mark.full_stack skip unless SOCIAL_TEST_FULL=1
@pytest.mark.stack_running skip unless SOCIAL_STACK_RUNNING=1
@pytest.mark.slow long tests (>30s); skipped in fast mode
@pytest.mark.launch_test launch_testing marker (framework)
Fixtures:
rclpy_node temporary rclpy probe node (auto-shutdown)
ros_domain_id asserts ROS_DOMAIN_ID=108 for test isolation
"""
from __future__ import annotations
import os
import subprocess
import sys
import time
import pytest
# ── Custom markers ────────────────────────────────────────────────────────────
def pytest_configure(config):
config.addinivalue_line("markers",
"gpu_required: test requires an NVIDIA GPU (skipped in headless CI)")
config.addinivalue_line("markers",
"full_stack: test requires SOCIAL_TEST_FULL=1 (full speech/LLM/TTS)")
config.addinivalue_line("markers",
"stack_running: test requires SOCIAL_STACK_RUNNING=1")
config.addinivalue_line("markers",
"slow: test takes more than 30 seconds")
config.addinivalue_line("markers",
"launch_test: launch_testing integration test")
def pytest_collection_modifyitems(config, items):
"""Automatically skip tests whose markers are not satisfied."""
full_stack = os.environ.get("SOCIAL_TEST_FULL", "0") == "1"
stack_running = os.environ.get("SOCIAL_STACK_RUNNING", "0") == "1"
fast_mode = os.environ.get("CI_FAST", "0") == "1"
gpu_ok = _nvidia_smi_available()
for item in items:
if item.get_closest_marker("gpu_required") and not gpu_ok:
item.add_marker(pytest.mark.skip(
reason="No GPU available (set SOCIAL_STACK_RUNNING=1 on hardware)"
))
if item.get_closest_marker("full_stack") and not full_stack:
item.add_marker(pytest.mark.skip(
reason="Set SOCIAL_TEST_FULL=1 to enable full-stack tests"
))
if item.get_closest_marker("stack_running") and not stack_running:
item.add_marker(pytest.mark.skip(
reason="Set SOCIAL_STACK_RUNNING=1 to enable stack-running tests"
))
if item.get_closest_marker("slow") and fast_mode:
item.add_marker(pytest.mark.skip(
reason="Slow test skipped in CI_FAST=1 mode"
))
# ── Helpers ───────────────────────────────────────────────────────────────────
def _nvidia_smi_available() -> bool:
try:
subprocess.check_output(
["nvidia-smi", "--query-gpu=name", "--format=csv,noheader"],
timeout=5, stderr=subprocess.DEVNULL,
)
return True
except Exception:
return False
# ── Fixtures ──────────────────────────────────────────────────────────────────
@pytest.fixture(scope="session")
def ros_domain_id():
"""Assert ROS_DOMAIN_ID is set to 108 for test isolation."""
domain = os.environ.get("ROS_DOMAIN_ID", "0")
assert domain == "108", (
f"ROS_DOMAIN_ID must be '108' for isolated test runs (got '{domain}'). "
"Set: export ROS_DOMAIN_ID=108"
)
return int(domain)
@pytest.fixture(scope="function")
def rclpy_node(request):
"""Provide a temporary rclpy probe node; auto-teardown after each test."""
import rclpy
if not rclpy.ok():
rclpy.init()
node_name = f"test_probe_{request.node.name[:20].replace('/', '_')}"
node = rclpy.create_node(node_name)
yield node
node.destroy_node()
@pytest.fixture(scope="session", autouse=True)
def rclpy_session():
"""Init/shutdown rclpy once for the entire test session."""
import rclpy
if not rclpy.ok():
rclpy.init()
yield
if rclpy.ok():
rclpy.shutdown()

View File

@ -0,0 +1,197 @@
"""test_gpu_memory.py — GPU memory budget tests for social-bot stack.
Requirements (Orin Nano Super 8GB shared memory):
- Full stack allocation must stay under 6GB total GPU usage
- Each major model budget:
Face detection (SCRFD-10GF TRT): < 100 MB
Face recognition (ArcFace TRT): < 300 MB
Speaker embedding (ECAPA-TDNN TRT): < 200 MB
Whisper STT (CTranslate2): < 700 MB
LLM (Phi-3-mini Q4 GGUF, 20 layers): < 2500 MB
Piper TTS (ONNX CPU): < 200 MB
Tracking (Kalman, CPU): < 10 MB
Total budget: < 6000 MB (6 GB)
Reserve for OS + ROS2: 2000 MB (2 GB)
Tests:
1. GPU available check (skip gracefully in CI without GPU)
2. Used memory at baseline (before stack loads) < 1GB
3. Used memory during full stack < 6GB
4. Memory does not grow continuously (no leak over 30s)
5. After shutdown, memory returns to near-baseline
"""
from __future__ import annotations
import os
import sys
import time
import unittest
import pytest
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from saltybot_social_tests.test_helpers import GpuMemoryChecker
# ── Budget constants ──────────────────────────────────────────────────────────
GPU_TOTAL_MB_EXPECTED = 8192.0 # Orin Nano Super 8GB
GPU_STACK_BUDGET_MB = 6000.0 # full stack must stay under 6GB used
GPU_BASELINE_MB = 1024.0 # baseline (OS + CUDA context) < 1GB
GPU_LEAK_TOLERANCE_MB = 200.0 # acceptable growth over 30s measurement window
# Per-model expected budgets (for documentation / CI without full stack)
MODEL_BUDGETS_MB = {
"scrfd_10g_trt": 100,
"arcface_r100_trt": 300,
"ecapa_tdnn_trt": 200,
"whisper_small": 700,
"phi3_mini_gguf": 2500,
"piper_tts": 200,
"tracking": 10,
}
TOTAL_MODEL_BUDGET_MB = sum(MODEL_BUDGETS_MB.values()) # 4010 MB
def _skip_if_no_gpu():
if not GpuMemoryChecker.gpu_available():
pytest.skip("No GPU available — skipping GPU memory tests")
class TestGpuMemoryBudget(unittest.TestCase):
"""GPU memory budget tests for the full social-bot stack."""
def test_gpu_is_available_or_ci(self):
"""GPU check: either GPU is available, or we're in CI (skip gracefully)."""
if os.environ.get("CI", ""):
# In CI without GPU, just log and pass
print("[INFO] CI mode detected — GPU memory tests will be skipped if no GPU")
available = GpuMemoryChecker.gpu_available()
if not available:
self.skipTest("No GPU available")
total = GpuMemoryChecker.total_mb()
self.assertIsNotNone(total)
print(f"[INFO] GPU total: {total:.0f} MB")
def test_gpu_total_memory_matches_orin(self):
"""Total GPU memory should be ~8GB on Orin Nano Super."""
_skip_if_no_gpu()
total = GpuMemoryChecker.total_mb()
# Allow ±10% from 8192MB (shared memory varies by config)
self.assertGreater(total, GPU_TOTAL_MB_EXPECTED * 0.5,
f"GPU total {total:.0f}MB seems too low for Orin")
def test_baseline_memory_under_threshold(self):
"""Baseline GPU usage (before models load) must be < 1GB."""
_skip_if_no_gpu()
used = GpuMemoryChecker.used_mb()
self.assertIsNotNone(used)
print(f"[INFO] Baseline GPU used: {used:.0f} MB")
# Note: if running after stack is up, this test would need stack to be down
# In standalone mode (no stack running), assert low baseline
if os.environ.get("SOCIAL_STACK_RUNNING", "0") == "0":
self.assertLess(
used, GPU_BASELINE_MB,
f"Baseline GPU usage {used:.0f}MB >= {GPU_BASELINE_MB}MB"
)
def test_full_stack_memory_under_budget(self):
"""Full stack GPU usage must stay under 6GB."""
_skip_if_no_gpu()
if os.environ.get("SOCIAL_STACK_RUNNING", "0") != "1":
self.skipTest("Set SOCIAL_STACK_RUNNING=1 to run with stack active")
used = GpuMemoryChecker.used_mb()
self.assertIsNotNone(used)
print(f"[INFO] Stack GPU used: {used:.0f} MB / {GPU_STACK_BUDGET_MB:.0f} MB budget")
self.assertLess(
used, GPU_STACK_BUDGET_MB,
f"GPU usage {used:.0f}MB exceeds {GPU_STACK_BUDGET_MB:.0f}MB budget"
)
def test_no_memory_leak_over_30s(self):
"""GPU memory must not grow by more than 200MB over 30 seconds."""
_skip_if_no_gpu()
if os.environ.get("SOCIAL_STACK_RUNNING", "0") != "1":
self.skipTest("Set SOCIAL_STACK_RUNNING=1 to run with stack active")
used_start = GpuMemoryChecker.used_mb()
print(f"[INFO] GPU used at start: {used_start:.0f} MB")
# Wait 30s while stack processes mock sensor data
time.sleep(30.0)
used_end = GpuMemoryChecker.used_mb()
growth = used_end - used_start
print(f"[INFO] GPU used after 30s: {used_end:.0f} MB (growth: {growth:+.0f} MB)")
self.assertLess(
growth, GPU_LEAK_TOLERANCE_MB,
f"GPU memory grew by {growth:.0f}MB over 30s — possible leak"
)
def test_memory_frees_after_context_release(self):
"""Test that pycuda context releases memory properly."""
_skip_if_no_gpu()
try:
import pycuda.driver as drv
drv.init()
ctx = drv.Device(0).make_context()
free_in, total_in = drv.mem_get_info()
# Allocate 100MB
alloc = drv.mem_alloc(100 * 1024 * 1024)
free_after_alloc, _ = drv.mem_get_info()
growth = (free_in - free_after_alloc) / 1024 / 1024
self.assertGreater(growth, 50,
"100MB allocation didn't register in GPU memory")
# Free it
del alloc
free_after_free, _ = drv.mem_get_info()
recovered = (free_after_free - free_after_alloc) / 1024 / 1024
ctx.pop()
self.assertGreater(recovered, 50,
f"Only {recovered:.0f}MB recovered after freeing 100MB alloc")
except ImportError:
self.skipTest("pycuda not available")
class TestModelBudgetDocumentation(unittest.TestCase):
"""Verify total model budget is within spec (pure math, no GPU needed)."""
def test_total_model_budget_under_6gb(self):
"""Sum of all model budgets must be < 6000MB."""
total = sum(MODEL_BUDGETS_MB.values())
self.assertLess(
total, GPU_STACK_BUDGET_MB,
f"Model budgets sum to {total}MB, exceeds {GPU_STACK_BUDGET_MB}MB limit"
)
def test_individual_model_budgets_reasonable(self):
"""No single model should claim more than 3GB."""
for model, budget in MODEL_BUDGETS_MB.items():
self.assertLess(
budget, 3000,
f"Model {model} budget {budget}MB seems unreasonably high"
)
def test_budget_includes_all_models(self):
"""Verify all expected models are accounted for in the budget."""
expected_keys = {
"scrfd_10g_trt", "arcface_r100_trt", "ecapa_tdnn_trt",
"whisper_small", "phi3_mini_gguf", "piper_tts", "tracking"
}
self.assertEqual(set(MODEL_BUDGETS_MB.keys()), expected_keys)
def test_budget_summary(self):
"""Print human-readable budget summary."""
print("\n[GPU Budget Summary]")
for model, mb in sorted(MODEL_BUDGETS_MB.items(), key=lambda x: -x[1]):
bar = "" * (mb // 100)
print(f" {model:<25} {mb:>5} MB {bar}")
print(f" {'TOTAL':<25} {TOTAL_MODEL_BUDGET_MB:>5} MB")
print(f" {'LIMIT':<25} {GPU_STACK_BUDGET_MB:>5} MB")
print(f" {'RESERVE':<25} {GPU_STACK_BUDGET_MB - TOTAL_MODEL_BUDGET_MB:>5} MB")

View File

@ -0,0 +1,323 @@
"""test_latency.py — End-to-end latency profiling for social-bot pipeline.
Pipeline stages and SLAs:
Stage SLA (p95) Notes
wake_word VAD onset < 100ms OpenWakeWord inference
VAD onset transcript (STT) < 500ms Whisper small, 1s utterance
transcript LLM first token < 500ms Phi-3-mini GGUF, TTFT
LLM first token TTS first < 200ms Piper synthesis, first sentence
Full end-to-end (wakespeaker) < 1200ms total pipeline SLA
Tests:
1. STT latency: inject synthetic transcript, measure to LLM response
2. LLM latency: inject transcript, measure time to ConversationResponse
3. TTS latency: inject ConversationResponse, verify audio starts quickly
4. E2E latency: orchestrator state transitions (LISTENINGTHINKINGSPEAKING)
5. Percentile report: print p50/p95/p99 for each stage over N runs
In CI (no GPU/models): tests use orchestrator state timestamps to infer latency
bounds without actually running STT/LLM/TTS.
"""
from __future__ import annotations
import json
import os
import sys
import time
import threading
import unittest
from collections import defaultdict
from typing import List, Optional
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from saltybot_social_tests.test_helpers import poll_until
# ── SLA constants (milliseconds) ─────────────────────────────────────────────
SLA_STT_P95_MS = 500.0
SLA_LLM_TTFT_P95_MS = 500.0
SLA_TTS_FIRST_P95_MS = 200.0
SLA_E2E_P95_MS = 1200.0
N_PROBE_RUNS = 5 # number of injection runs for percentile calculation
def _percentile(values: List[float], p: float) -> float:
if not values:
return 0.0
s = sorted(values)
idx = int(len(s) * p / 100)
return s[min(idx, len(s) - 1)]
class LatencyProbe(Node):
"""Injects synthetic messages and measures pipeline response latencies."""
def __init__(self) -> None:
super().__init__("latency_probe")
qos = QoSProfile(depth=10)
# Injectors
try:
from saltybot_social_msgs.msg import SpeechTranscript, ConversationResponse
self._transcript_pub = self.create_publisher(
SpeechTranscript, "/social/speech/transcript", qos
)
self._response_pub = self.create_publisher(
ConversationResponse, "/social/conversation/response", qos
)
self._has_social = True
except ImportError:
self._has_social = False
from std_msgs.msg import String
self._state_pub = self.create_publisher(String, "/social/test/inject", qos)
# Receivers
from std_msgs.msg import String
self._state_sub = self.create_subscription(
String, "/social/orchestrator/state",
self._on_state, qos
)
if self._has_social:
from saltybot_social_msgs.msg import ConversationResponse
self._response_sub = self.create_subscription(
ConversationResponse, "/social/conversation/response",
self._on_response, qos
)
# Timing data
self._inject_t: Optional[float] = None
self._response_times: List[float] = []
self._state_transitions: List[tuple] = [] # (from, to, elapsed_ms)
self._last_state: Optional[str] = None
self._last_state_t: float = time.time()
self._lock = threading.Lock()
def _on_state(self, msg) -> None:
try:
data = json.loads(msg.data)
new_state = data.get("state", "")
now = time.time()
with self._lock:
if self._last_state and self._last_state != new_state:
elapsed = (now - self._last_state_t) * 1000
self._state_transitions.append(
(self._last_state, new_state, elapsed)
)
self._last_state = new_state
self._last_state_t = now
except Exception:
pass
def _on_response(self, msg) -> None:
with self._lock:
if self._inject_t is not None:
latency_ms = (time.time() - self._inject_t) * 1000
self._response_times.append(latency_ms)
self._inject_t = None
def inject_transcript(self, text: str = "Hello Salty, what time is it?") -> None:
"""Publish a synthetic final transcript to trigger LLM."""
if not self._has_social:
return
from saltybot_social_msgs.msg import SpeechTranscript
msg = SpeechTranscript()
msg.header.stamp = self.get_clock().now().to_msg()
msg.text = text
msg.speaker_id = "test_user"
msg.confidence = 0.99
msg.audio_duration = 1.2
msg.is_partial = False
with self._lock:
self._inject_t = time.time()
self._transcript_pub.publish(msg)
def get_response_latencies(self) -> List[float]:
with self._lock:
return list(self._response_times)
def get_state_transitions(self) -> List[tuple]:
with self._lock:
return list(self._state_transitions)
def clear(self) -> None:
with self._lock:
self._response_times.clear()
self._state_transitions.clear()
self._inject_t = None
class TestPipelineLatency(unittest.TestCase):
"""End-to-end pipeline latency tests."""
@classmethod
def setUpClass(cls):
if not rclpy.ok():
rclpy.init()
cls.probe = LatencyProbe()
@classmethod
def tearDownClass(cls):
cls.probe.destroy_node()
def _spin_for(self, duration_s: float) -> None:
deadline = time.time() + duration_s
while time.time() < deadline:
rclpy.spin_once(self.probe, timeout_sec=0.05)
def test_orchestrator_state_transitions_logged(self):
"""Verify orchestrator transitions are captured by the probe."""
self.probe.clear()
self._spin_for(3.0) # wait for state messages
# Inject a transcript to trigger IDLE→LISTENING→THINKING
if self.probe._has_social:
self.probe.inject_transcript()
self._spin_for(5.0)
transitions = self.probe.get_state_transitions()
# If the stack is running with conversation_node, we should see transitions.
# In CI without LLM, orchestrator stays IDLE — that's OK.
print(f"\n[Latency] State transitions captured: {len(transitions)}")
for frm, to, ms in transitions:
print(f" {frm}{to}: {ms:.0f}ms")
def test_llm_response_latency_sla(self):
"""LLM must produce first ConversationResponse within 500ms p95."""
if os.environ.get("SOCIAL_TEST_FULL", "0") != "1":
self.skipTest("Set SOCIAL_TEST_FULL=1 to test with LLM running")
if not self.probe._has_social:
self.skipTest("saltybot_social_msgs not available")
self.probe.clear()
latencies = []
for i in range(N_PROBE_RUNS):
self.probe.clear()
self.probe.inject_transcript(f"Hello, run number {i}")
# Wait up to 5s for response
deadline = time.time() + 5.0
while time.time() < deadline:
rclpy.spin_once(self.probe, timeout_sec=0.05)
lats = self.probe.get_response_latencies()
if lats:
latencies.extend(lats)
break
time.sleep(1.0) # cooldown between runs
if not latencies:
self.fail("No LLM responses received in any of the test runs")
p50 = _percentile(latencies, 50)
p95 = _percentile(latencies, 95)
print(f"\n[Latency] LLM response: n={len(latencies)}, "
f"p50={p50:.0f}ms, p95={p95:.0f}ms")
self.assertLess(
p95, SLA_LLM_TTFT_P95_MS,
f"LLM p95 latency {p95:.0f}ms exceeds SLA {SLA_LLM_TTFT_P95_MS:.0f}ms"
)
def test_e2e_state_machine_latency(self):
"""IDLE→THINKING transition must complete within 1200ms p95."""
if not self.probe._has_social:
self.skipTest("saltybot_social_msgs not available")
self.probe.clear()
thinking_latencies = []
for _ in range(N_PROBE_RUNS):
self.probe.clear()
# Wait for IDLE state
def is_idle():
rclpy.spin_once(self.probe, timeout_sec=0.1)
return self.probe._last_state == "idle"
poll_until(is_idle, timeout_s=5.0)
# Inject transcript and measure time to THINKING
t0 = time.time()
self.probe.inject_transcript()
def is_thinking():
rclpy.spin_once(self.probe, timeout_sec=0.1)
return self.probe._last_state in ("thinking", "speaking")
reached = poll_until(is_thinking, timeout_s=5.0)
if reached:
elapsed_ms = (time.time() - t0) * 1000
thinking_latencies.append(elapsed_ms)
time.sleep(2.0) # allow pipeline to return to IDLE
if not thinking_latencies:
# In CI without full stack, this is expected
print("[Latency] No THINKING transitions — LLM/speech may not be running")
return
p95 = _percentile(thinking_latencies, 95)
print(f"\n[Latency] Inject→THINKING: n={len(thinking_latencies)}, "
f"p95={p95:.0f}ms")
self.assertLess(
p95, SLA_E2E_P95_MS,
f"E2E p95 {p95:.0f}ms exceeds SLA {SLA_E2E_P95_MS:.0f}ms"
)
class TestLatencyReport(unittest.TestCase):
"""Generate a latency report from orchestrator state data."""
@classmethod
def setUpClass(cls):
if not rclpy.ok():
rclpy.init()
cls.node = rclpy.create_node("latency_report_probe")
@classmethod
def tearDownClass(cls):
cls.node.destroy_node()
def test_orchestrator_reports_latency_stats(self):
"""Orchestrator state JSON must include latency field when profiling enabled."""
from std_msgs.msg import String
received = []
qos = QoSProfile(depth=10)
sub = self.node.create_subscription(
String, "/social/orchestrator/state",
lambda msg: received.append(msg.data), qos
)
deadline = time.time() + 5.0
while time.time() < deadline and not received:
rclpy.spin_once(self.node, timeout_sec=0.1)
self.node.destroy_subscription(sub)
if not received:
self.skipTest("Orchestrator not running")
state = json.loads(received[-1])
self.assertIn("latency", state,
"Orchestrator state JSON missing 'latency' field")
self.assertIn("ts", state)
print(f"\n[Latency] Orchestrator report: {json.dumps(state['latency'], indent=2)}")
def test_latency_sla_table(self):
"""Print SLA table for documentation — always passes."""
print("\n[Latency SLA Table]")
slas = [
("wake_word → transcript (STT)", SLA_STT_P95_MS),
("transcript → LLM first token", SLA_LLM_TTFT_P95_MS),
("LLM token → TTS first chunk", SLA_TTS_FIRST_P95_MS),
("end-to-end (wake → speaker)", SLA_E2E_P95_MS),
]
for stage, sla in slas:
print(f" {stage:<40} p95 < {sla:.0f}ms")

View File

@ -0,0 +1,208 @@
"""test_launch.py — Launch integration test: verify all social-bot nodes start.
Uses launch_testing framework to:
1. Start social_test.launch.py
2. Verify all expected nodes appear in the graph within 30s
3. Verify no node exits with an error code
4. Verify key topics are advertising (even if no messages yet)
Run with:
pytest test_launch.py -v
ros2 launch launch_testing launch_test.py <path>/test_launch.py
"""
import os
import sys
import time
import unittest
import launch
import launch_ros
import launch_testing
import launch_testing.actions
import launch_testing.markers
import pytest
import rclpy
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import IncludeLaunchDescription, TimerAction
from launch.launch_description_sources import PythonLaunchDescriptionSource
from launch_ros.actions import Node
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from saltybot_social_tests.test_helpers import NodeChecker, poll_until
# ── Nodes that must be alive within 30s in default CI mode ───────────────────
REQUIRED_NODES = [
"orchestrator_node",
"face_recognition_node",
"tracking_fusion_node",
"mock_sensor_pub",
]
# Nodes only required in full mode (SOCIAL_TEST_FULL=1)
FULL_MODE_NODES = [
"speech_pipeline_node",
"conversation_node",
"tts_node",
]
NODE_STARTUP_TIMEOUT_S = 30.0
# ── launch_testing fixture ────────────────────────────────────────────────────
@pytest.mark.launch_test
def generate_test_description():
"""Return the LaunchDescription to use for the launch test."""
tests_pkg = get_package_share_directory("saltybot_social_tests")
launch_file = os.path.join(tests_pkg, "launch", "social_test.launch.py")
return LaunchDescription([
IncludeLaunchDescription(
PythonLaunchDescriptionSource(launch_file),
launch_arguments={
"enable_speech": "false",
"enable_llm": "false",
"enable_tts": "false",
}.items(),
),
# Signal launch_testing that setup is done
launch_testing.actions.ReadyToTest(),
])
# ── Test cases ────────────────────────────────────────────────────────────────
class TestSocialBotLaunch(unittest.TestCase):
"""Integration tests that run against the launched social-bot stack."""
@classmethod
def setUpClass(cls):
rclpy.init()
cls.node = rclpy.create_node("social_launch_test_probe")
cls.checker = NodeChecker(cls.node)
@classmethod
def tearDownClass(cls):
cls.node.destroy_node()
rclpy.shutdown()
def _spin_briefly(self, duration_s: float = 0.5) -> None:
deadline = time.time() + duration_s
while time.time() < deadline:
rclpy.spin_once(self.node, timeout_sec=0.1)
# ── Test 1: All required nodes alive within 30s ───────────────────────────
def test_required_nodes_start(self):
"""All required nodes must appear in the graph within 30 seconds."""
results = self.checker.wait_for_nodes(
REQUIRED_NODES, timeout_s=NODE_STARTUP_TIMEOUT_S
)
missing = [n for n, found in results.items() if not found]
self.assertEqual(
missing, [],
f"Nodes did not start within {NODE_STARTUP_TIMEOUT_S}s: {missing}"
)
# ── Test 2: Expected topics are advertised ────────────────────────────────
def test_expected_topics_advertised(self):
"""Key topics must exist in the topic graph."""
self._spin_briefly(2.0)
required_topics = [
"/social/orchestrator/state",
"/social/faces/detections",
"/social/tracking/fused_target",
# mock publishers
"/camera/color/image_raw",
"/uwb/target",
]
all_topics = {name for name, _ in self.node.get_topic_names_and_types()}
missing = [t for t in required_topics if t not in all_topics]
self.assertEqual(
missing, [],
f"Topics not advertised: {missing}\nAll topics: {sorted(all_topics)}"
)
# ── Test 3: Orchestrator publishes state ──────────────────────────────────
def test_orchestrator_state_publishes(self):
"""orchestrator_node must publish /social/orchestrator/state within 5s."""
from std_msgs.msg import String
received = []
sub = self.node.create_subscription(
String, "/social/orchestrator/state",
lambda msg: received.append(msg.data), 10
)
deadline = time.time() + 5.0
while time.time() < deadline and not received:
rclpy.spin_once(self.node, timeout_sec=0.2)
self.node.destroy_subscription(sub)
self.assertTrue(
len(received) > 0,
"orchestrator_node did not publish /social/orchestrator/state within 5s"
)
# Validate JSON structure
import json
state = json.loads(received[0])
self.assertIn("state", state)
self.assertIn(state["state"],
["idle", "listening", "thinking", "speaking", "throttled"])
# ── Test 4: Face detection topic advertised at correct QoS ───────────────
def test_face_detection_topic_qos(self):
"""Face detection topic must exist and accept BEST_EFFORT subscribers."""
from rclpy.qos import QoSProfile, QoSReliabilityPolicy
from saltybot_social_msgs.msg import FaceDetectionArray
received = []
qos = QoSProfile(reliability=QoSReliabilityPolicy.BEST_EFFORT, depth=10)
sub = self.node.create_subscription(
FaceDetectionArray, "/social/faces/detections",
lambda msg: received.append(True), qos
)
deadline = time.time() + 5.0
while time.time() < deadline and not received:
rclpy.spin_once(self.node, timeout_sec=0.2)
self.node.destroy_subscription(sub)
self.assertTrue(
len(received) > 0,
"/social/faces/detections received no messages within 5s"
)
# ── Test 5: No node exits with error immediately ──────────────────────────
def test_no_immediate_node_exits(self):
"""All required nodes should still be alive after 5s (no instant crash)."""
time.sleep(2.0)
self._spin_briefly(1.0)
results = self.checker.wait_for_nodes(REQUIRED_NODES, timeout_s=2.0)
crashed = [n for n, found in results.items() if not found]
self.assertEqual(
crashed, [],
f"Nodes crashed or exited: {crashed}"
)
# ── Post-shutdown checks (run after the launch is torn down) ──────────────────
@launch_testing.post_shutdown_test()
class TestSocialBotShutdownBehavior(unittest.TestCase):
"""Tests that run after the launch has been shut down."""
def test_processes_exited_cleanly(self, proc_info):
"""All launched processes must exit with code 0 or SIGTERM (-15)."""
# Allow SIGTERM (returncode -15) as a graceful shutdown
launch_testing.asserts.assertExitCodes(
proc_info,
allowable_exit_codes=[0, -15, launch_testing.asserts.EXIT_OK],
)

View File

@ -0,0 +1,423 @@
"""test_services.py — Verify social-bot ROS2 services are available and respond.
Services tested:
/social/enroll (saltybot_social_msgs/EnrollPerson)
/social/persons/list (saltybot_social_msgs/ListPersons)
/social/persons/delete (saltybot_social_msgs/DeletePerson)
/social/persons/update (saltybot_social_msgs/UpdatePerson)
/social/query_mood (saltybot_social_msgs/QueryMood)
/social/nav/set_mode (std_srvs/SetString or saltybot_social_msgs/SetNavMode)
Each test:
1. Verifies the service is available (wait_for_service <= 10s)
2. Calls the service with a valid request
3. Asserts response is non-None and contains expected fields
"""
from __future__ import annotations
import os
import sys
import time
import unittest
import rclpy
from rclpy.node import Node
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from saltybot_social_tests.test_helpers import ServiceChecker
SERVICE_AVAILABILITY_TIMEOUT_S = 10.0
class TestSocialServices(unittest.TestCase):
"""Integration tests for social-bot ROS2 service interface."""
@classmethod
def setUpClass(cls):
if not rclpy.ok():
rclpy.init()
cls.node = rclpy.create_node("service_test_probe")
cls.checker = ServiceChecker(cls.node)
@classmethod
def tearDownClass(cls):
cls.node.destroy_node()
# ── Enroll service ─────────────────────────────────────────────────────────
def test_enroll_service_available(self):
"""/ social/enroll must be available within 10s."""
try:
from saltybot_social_msgs.srv import EnrollPerson
except ImportError:
self.skipTest("saltybot_social_msgs not installed")
available = self.checker.is_available(
"/social/enroll", EnrollPerson,
timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
)
self.assertTrue(available, "/social/enroll service not available")
def test_enroll_service_responds(self):
"""EnrollPerson service must respond with success or graceful failure."""
try:
from saltybot_social_msgs.srv import EnrollPerson
except ImportError:
self.skipTest("saltybot_social_msgs not installed")
req = EnrollPerson.Request()
req.name = "TestPerson_CI"
req.n_samples = 1 # minimal for CI
req.mode = "face"
response = self.checker.call_service(
"/social/enroll", EnrollPerson, req,
timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
)
self.assertIsNotNone(response, "/social/enroll did not respond")
# Response can be success=False (no camera data) — that's fine
# We just need a response, not success
self.assertTrue(
hasattr(response, "success"),
"EnrollPerson response missing 'success' field"
)
# ── List persons service ───────────────────────────────────────────────────
def test_list_persons_service_available(self):
"""/social/persons/list must be available within 10s."""
try:
from saltybot_social_msgs.srv import ListPersons
except ImportError:
self.skipTest("saltybot_social_msgs not installed")
available = self.checker.is_available(
"/social/persons/list", ListPersons,
timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
)
self.assertTrue(available, "/social/persons/list service not available")
def test_list_persons_service_responds(self):
"""/social/persons/list must return a list (possibly empty)."""
try:
from saltybot_social_msgs.srv import ListPersons
except ImportError:
self.skipTest("saltybot_social_msgs not installed")
req = ListPersons.Request()
response = self.checker.call_service(
"/social/persons/list", ListPersons, req,
timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
)
self.assertIsNotNone(response, "/social/persons/list did not respond")
self.assertTrue(
hasattr(response, "persons"),
"ListPersons response missing 'persons' field"
)
self.assertIsInstance(response.persons, list)
# ── Delete person service ─────────────────────────────────────────────────
def test_delete_person_service_available(self):
"""/social/persons/delete must be available."""
try:
from saltybot_social_msgs.srv import DeletePerson
except ImportError:
self.skipTest("saltybot_social_msgs not installed")
available = self.checker.is_available(
"/social/persons/delete", DeletePerson,
timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
)
self.assertTrue(available, "/social/persons/delete not available")
def test_delete_nonexistent_person_returns_graceful_error(self):
"""Deleting a non-existent person must not crash — graceful False."""
try:
from saltybot_social_msgs.srv import DeletePerson
except ImportError:
self.skipTest("saltybot_social_msgs not installed")
req = DeletePerson.Request()
req.person_id = 99999 # definitely doesn't exist
response = self.checker.call_service(
"/social/persons/delete", DeletePerson, req,
timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
)
self.assertIsNotNone(response)
# Should return success=False, not crash
self.assertFalse(
response.success,
"Deleting non-existent person should return success=False"
)
# ── Update person service ─────────────────────────────────────────────────
def test_update_person_service_available(self):
"""/social/persons/update must be available."""
try:
from saltybot_social_msgs.srv import UpdatePerson
except ImportError:
self.skipTest("saltybot_social_msgs not installed")
available = self.checker.is_available(
"/social/persons/update", UpdatePerson,
timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
)
self.assertTrue(available, "/social/persons/update not available")
# ── Mood query service ────────────────────────────────────────────────────
def test_query_mood_service_available(self):
"""/social/query_mood must be available (personality_node)."""
try:
from saltybot_social_msgs.srv import QueryMood
except ImportError:
self.skipTest("saltybot_social_msgs not installed")
available = self.checker.is_available(
"/social/query_mood", QueryMood,
timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
)
self.assertTrue(available, "/social/query_mood not available")
def test_query_mood_returns_valid_state(self):
"""QueryMood must return a valid mood with valence in [-1, 1]."""
try:
from saltybot_social_msgs.srv import QueryMood
except ImportError:
self.skipTest("saltybot_social_msgs not installed")
req = QueryMood.Request()
response = self.checker.call_service(
"/social/query_mood", QueryMood, req,
timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
)
self.assertIsNotNone(response, "/social/query_mood did not respond")
if hasattr(response, "mood"):
mood = response.mood
if hasattr(mood, "valence"):
self.assertGreaterEqual(mood.valence, -1.0)
self.assertLessEqual(mood.valence, 1.0)
# ── Nav set-mode service ──────────────────────────────────────────────────────
class TestNavSetModeService(unittest.TestCase):
"""/social/nav/set_mode — change follow mode at runtime."""
# Valid follow modes understood by social_nav_node
VALID_MODES = ["shadow", "lead", "side", "orbit", "loose", "tight", "stop"]
@classmethod
def setUpClass(cls):
if not rclpy.ok():
rclpy.init()
cls.node = rclpy.create_node("nav_set_mode_test_probe")
cls.checker = ServiceChecker(cls.node)
@classmethod
def tearDownClass(cls):
cls.node.destroy_node()
def _try_import_set_mode_srv(self):
"""Return the SetNavMode srv type, falling back to std_srvs."""
try:
# Custom service (future): saltybot_social_msgs/srv/SetNavMode
from saltybot_social_msgs.srv import SetNavMode
return SetNavMode, "custom"
except ImportError:
pass
try:
# Generic string service fallback (std_srvs doesn't have SetString
# in Humble; use rcl_interfaces/srv/SetParameters as canary)
from std_srvs.srv import Trigger
return Trigger, "trigger"
except ImportError:
return None, None
def test_nav_set_mode_service_available(self):
"""/social/nav/set_mode must be available within 10s."""
srv_type, variant = self._try_import_set_mode_srv()
if srv_type is None:
self.skipTest("No suitable srv type found for /social/nav/set_mode")
available = self.checker.is_available(
"/social/nav/set_mode", srv_type,
timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
)
self.assertTrue(
available,
"/social/nav/set_mode service not available within "
f"{SERVICE_AVAILABILITY_TIMEOUT_S}s"
)
def test_nav_set_mode_shadow_responds(self):
"""Setting mode 'shadow' must return a response (not crash)."""
srv_type, variant = self._try_import_set_mode_srv()
if srv_type is None:
self.skipTest("No suitable srv type for /social/nav/set_mode")
if variant == "custom":
req = srv_type.Request()
req.mode = "shadow"
else:
req = srv_type.Request() # Trigger: no payload
response = self.checker.call_service(
"/social/nav/set_mode", srv_type, req,
timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
)
self.assertIsNotNone(
response, "/social/nav/set_mode did not respond to 'shadow' mode"
)
if variant == "custom":
self.assertTrue(
hasattr(response, "success"),
"SetNavMode response missing 'success' field"
)
def test_nav_set_mode_invalid_mode_returns_graceful_error(self):
"""Sending an invalid mode string must not crash the node."""
srv_type, variant = self._try_import_set_mode_srv()
if srv_type is None or variant != "custom":
self.skipTest("Custom SetNavMode srv required for invalid-mode test")
req = srv_type.Request()
req.mode = "teleport_to_moon" # definitely invalid
response = self.checker.call_service(
"/social/nav/set_mode", srv_type, req,
timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
)
self.assertIsNotNone(
response, "/social/nav/set_mode crashed on invalid mode"
)
# Must return success=False for unknown mode, not crash
self.assertFalse(
response.success,
"Invalid mode 'teleport_to_moon' should return success=False"
)
def test_nav_set_mode_cycles_all_valid_modes(self):
"""Cycle through all valid modes and verify each returns a response."""
srv_type, variant = self._try_import_set_mode_srv()
if srv_type is None or variant != "custom":
self.skipTest("Custom SetNavMode srv required for mode cycling test")
for mode in self.VALID_MODES:
req = srv_type.Request()
req.mode = mode
response = self.checker.call_service(
"/social/nav/set_mode", srv_type, req,
timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
)
self.assertIsNotNone(
response, f"/social/nav/set_mode did not respond to mode '{mode}'"
)
def test_nav_mode_topic_reflects_service_call(self):
"""/social/nav/mode topic must reflect the mode set via the service."""
from std_msgs.msg import String
srv_type, variant = self._try_import_set_mode_srv()
if srv_type is None or variant != "custom":
self.skipTest("Custom SetNavMode srv required for mode-reflect test")
# Subscribe to mode topic
received_modes = []
sub = self.node.create_subscription(
String, "/social/nav/mode",
lambda msg: received_modes.append(msg.data), 10
)
# Request LEAD mode
req = srv_type.Request()
req.mode = "lead"
response = self.checker.call_service(
"/social/nav/set_mode", srv_type, req,
timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
)
if response is None or not response.success:
self.node.destroy_subscription(sub)
self.skipTest("/social/nav/set_mode did not accept 'lead' mode")
# Wait for mode topic to update
import rclpy as _rclpy
deadline = time.time() + 3.0
while time.time() < deadline and "lead" not in received_modes:
_rclpy.spin_once(self.node, timeout_sec=0.1)
self.node.destroy_subscription(sub)
self.assertIn(
"lead", received_modes,
"/social/nav/mode did not reflect 'lead' after service call"
)
class TestServiceResponseTimes(unittest.TestCase):
"""Verify services respond within acceptable latency."""
SLA_MS = 500.0 # services must respond within 500ms
@classmethod
def setUpClass(cls):
if not rclpy.ok():
rclpy.init()
cls.node = rclpy.create_node("service_latency_probe")
cls.checker = ServiceChecker(cls.node)
@classmethod
def tearDownClass(cls):
cls.node.destroy_node()
def _measure_service_latency(self, svc_name: str, srv_type, request) -> float:
"""Return response time in ms, or -1 on failure."""
client = self.node.create_client(srv_type, svc_name)
if not client.wait_for_service(timeout_sec=10.0):
self.node.destroy_client(client)
return -1.0
t0 = time.perf_counter()
future = client.call_async(request)
deadline = time.time() + 5.0
while not future.done() and time.time() < deadline:
rclpy.spin_once(self.node, timeout_sec=0.05)
self.node.destroy_client(client)
if future.done():
return (time.perf_counter() - t0) * 1000
return -1.0
def test_list_persons_response_time(self):
"""ListPersons must respond within 500ms."""
try:
from saltybot_social_msgs.srv import ListPersons
except ImportError:
self.skipTest("saltybot_social_msgs not installed")
latency_ms = self._measure_service_latency(
"/social/persons/list", ListPersons, ListPersons.Request()
)
self.assertGreater(latency_ms, 0,
"/social/persons/list did not respond")
self.assertLess(
latency_ms, self.SLA_MS,
f"/social/persons/list response time {latency_ms:.0f}ms > {self.SLA_MS}ms SLA"
)
def test_query_mood_response_time(self):
"""QueryMood must respond within 500ms."""
try:
from saltybot_social_msgs.srv import QueryMood
except ImportError:
self.skipTest("saltybot_social_msgs not installed")
latency_ms = self._measure_service_latency(
"/social/query_mood", QueryMood, QueryMood.Request()
)
self.assertGreater(latency_ms, 0,
"/social/query_mood did not respond")
self.assertLess(
latency_ms, self.SLA_MS,
f"/social/query_mood response time {latency_ms:.0f}ms > {self.SLA_MS}ms SLA"
)

View File

@ -0,0 +1,227 @@
"""test_shutdown.py — Graceful shutdown verification for social-bot stack.
Tests:
1. Clean shutdown: all nodes terminate within 10s of SIGTERM
2. No zombie processes remain after shutdown
3. GPU memory returns to near-baseline after shutdown
4. No file descriptor leaks (via /proc on Linux)
5. No orphaned rosbag / pycuda / audio processes
6. PyAudio streams properly closed (no held /dev/snd device)
These tests are designed to run AFTER the stack has been shut down.
They can also be run standalone to verify post-shutdown state.
"""
from __future__ import annotations
import os
import subprocess
import sys
import time
import unittest
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from saltybot_social_tests.test_helpers import (
GpuMemoryChecker, count_zombie_processes, get_process_pids
)
# ── Shutdown timing constants ─────────────────────────────────────────────────
MAX_SHUTDOWN_S = 10.0 # nodes must stop within 10s of receiving SIGTERM
BASELINE_DRIFT_MB = 300.0 # GPU memory allowed to drift above pre-launch baseline
class TestGracefulShutdown(unittest.TestCase):
"""Verify clean shutdown with no zombies or resource leaks."""
# ── Test 1: No zombie processes ────────────────────────────────────────────
def test_no_zombie_ros_processes(self):
"""No zombie (defunct) ROS2 processes after stack shutdown."""
count = count_zombie_processes("ros")
self.assertEqual(
count, 0,
f"Found {count} zombie ROS2 processes after shutdown"
)
def test_no_zombie_python_processes(self):
"""No zombie Python processes (from node children) after shutdown."""
count = count_zombie_processes("python")
self.assertEqual(
count, 0,
f"Found {count} zombie Python processes after shutdown"
)
# ── Test 2: All social-bot nodes terminated ───────────────────────────────
def test_orchestrator_node_terminated(self):
"""orchestrator_node must not have any live PIDs after shutdown."""
pids = get_process_pids("orchestrator_node")
self.assertEqual(
pids, [],
f"orchestrator_node still running with PIDs: {pids}"
)
def test_speech_pipeline_node_terminated(self):
"""speech_pipeline_node must not have any live PIDs after shutdown."""
pids = get_process_pids("speech_pipeline_node")
self.assertEqual(
pids, [],
f"speech_pipeline_node still running: {pids}"
)
def test_face_recognition_node_terminated(self):
"""face_recognition_node must not have any live PIDs after shutdown."""
pids = get_process_pids("face_recognition_node")
self.assertEqual(
pids, [],
f"face_recognition_node still running: {pids}"
)
def test_conversation_node_terminated(self):
pids = get_process_pids("conversation_node")
self.assertEqual(pids, [],
f"conversation_node still running: {pids}")
def test_tts_node_terminated(self):
pids = get_process_pids("tts_node")
self.assertEqual(pids, [],
f"tts_node still running: {pids}")
# ── Test 3: GPU memory released ───────────────────────────────────────────
def test_gpu_memory_released_after_shutdown(self):
"""GPU used memory must not be significantly above baseline."""
if not GpuMemoryChecker.gpu_available():
self.skipTest("No GPU available")
if os.environ.get("GPU_BASELINE_MB", "") == "":
self.skipTest("Set GPU_BASELINE_MB env var to enable this test")
baseline_mb = float(os.environ["GPU_BASELINE_MB"])
current_used = GpuMemoryChecker.used_mb()
drift = current_used - baseline_mb
print(f"[Shutdown] GPU: baseline={baseline_mb:.0f}MB, "
f"now={current_used:.0f}MB, drift={drift:+.0f}MB")
self.assertLess(
drift, BASELINE_DRIFT_MB,
f"GPU memory {drift:+.0f}MB above baseline after shutdown — possible leak"
)
# ── Test 4: Audio device released ─────────────────────────────────────────
def test_audio_device_released(self):
"""No process should hold /dev/snd after shutdown (Linux only)."""
if not os.path.exists("/dev/snd"):
self.skipTest("/dev/snd not present (not Linux or no sound card)")
try:
# fuser returns exit code 0 if device is in use
result = subprocess.run(
["fuser", "/dev/snd"],
capture_output=True, timeout=5
)
holders = result.stdout.decode().strip()
if holders:
# Check if any of those PIDs are our social-bot nodes
social_patterns = [
"speech_pipeline", "tts_node", "mock_sensor", "pyaudio"
]
held_by_social = False
for pid_str in holders.split():
try:
pid = int(pid_str)
cmdline = open(f"/proc/{pid}/cmdline").read()
if any(p in cmdline for p in social_patterns):
held_by_social = True
break
except Exception:
continue
self.assertFalse(
held_by_social,
f"Audio device /dev/snd held by social-bot process after shutdown: {holders}"
)
except FileNotFoundError:
self.skipTest("fuser command not found")
# ── Test 5: No orphaned subprocesses ──────────────────────────────────────
def test_no_orphaned_pycuda_processes(self):
"""No orphaned pycuda/CUDA processes left from TRT inference."""
# Check for any lingering nvidia-cuda-mps processes
pids = get_process_pids("nvidia-cuda-mps")
# MPS is OK if it was running before — just check for spikes
# In practice, just verify no new cuda-related orphans
pass # informational check only
def test_ros_daemon_still_healthy(self):
"""ros2 daemon should still be healthy after stack shutdown."""
try:
result = subprocess.run(
["ros2", "daemon", "status"],
capture_output=True, timeout=10, text=True
)
# Should output something about the daemon status
# A crashed daemon would affect subsequent tests
self.assertNotIn("error", result.stdout.lower(),
f"ros2 daemon unhealthy: {result.stdout}")
except FileNotFoundError:
self.skipTest("ros2 command not found — not in ROS environment")
except subprocess.TimeoutExpired:
self.fail("ros2 daemon status timed out")
class TestShutdownTiming(unittest.TestCase):
"""Test that nodes respond to SIGTERM within the shutdown window."""
def test_shutdown_timeout_constant_is_reasonable(self):
"""Shutdown timeout must be >= 5s (some nodes need time to flush)."""
self.assertGreaterEqual(MAX_SHUTDOWN_S, 5.0)
self.assertLessEqual(MAX_SHUTDOWN_S, 30.0,
"Shutdown timeout > 30s is too long for CI")
def test_no_ros_nodes_running_at_test_start(self):
"""Verify we're running in isolation (no leftover nodes from prev run)."""
social_nodes = [
"orchestrator_node", "face_recognition_node",
"speech_pipeline_node", "conversation_node", "tts_node",
"tracking_fusion_node", "social_nav_node",
]
already_running = []
for node_name in social_nodes:
pids = get_process_pids(node_name)
if pids:
already_running.append((node_name, pids))
if already_running:
# Warn but don't fail — stack may be legitimately running
print(f"\n[WARNING] Nodes still running from previous session: "
f"{already_running}")
class TestPostShutdownPure(unittest.TestCase):
"""Pure-Python post-shutdown checks (no ROS2 required)."""
def test_gpu_budget_constants_valid(self):
"""MAX_SHUTDOWN_S and BASELINE_DRIFT_MB must be positive."""
self.assertGreater(MAX_SHUTDOWN_S, 0)
self.assertGreater(BASELINE_DRIFT_MB, 0)
def test_zombie_counter_returns_int(self):
"""count_zombie_processes must return a non-negative int."""
result = count_zombie_processes("nonexistent_process_xyz")
self.assertIsInstance(result, int)
self.assertGreaterEqual(result, 0)
def test_pid_list_returns_list(self):
"""get_process_pids must return a list."""
result = get_process_pids("nonexistent_process_xyz")
self.assertIsInstance(result, list)
def test_gpu_checker_no_gpu_returns_none(self):
"""GpuMemoryChecker returns None gracefully when no GPU."""
# This test always passes — we just verify no exception is raised
# (On machines with GPU, this returns a value; without GPU, returns None)
result = GpuMemoryChecker.free_mb()
if result is not None:
self.assertGreater(result, 0)

View File

@ -0,0 +1,174 @@
"""test_topic_rates.py — Verify social-bot topics publish at expected rates.
Tests measure actual Hz over a 3-second window and assert:
/social/faces/detections >= 10 Hz (face detector, camera@15Hz input)
/social/tracking/fused_target >= 8 Hz (tracking fusion)
/social/orchestrator/state >= 1 Hz (orchestrator heartbeat)
/camera/color/image_raw >= 12 Hz (mock sensor publisher)
/uwb/target >= 8 Hz (mock UWB)
/social/persons >= 3 Hz (person state tracker)
Run as part of a live stack (requires social_test.launch.py to be running):
pytest test_topic_rates.py -v --stack-running
Or run standalone with mock sensor pub only for camera/uwb tests.
"""
from __future__ import annotations
import os
import sys
import time
import unittest
import pytest
import rclpy
from rclpy.node import Node
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from saltybot_social_tests.test_helpers import TopicRateChecker, poll_until
# ── Rate requirements ─────────────────────────────────────────────────────────
# (topic, msg_type_path, min_hz, warm_up_s)
RATE_SPECS = [
# Mock-published topics — should always pass
("camera_image", "/camera/color/image_raw", "sensor_msgs.msg.Image", 12.0, 2.0),
("uwb_target", "/uwb/target", "geometry_msgs.msg.PoseStamped", 8.0, 2.0),
# Social-bot topics
("faces_detections", "/social/faces/detections", "saltybot_social_msgs.msg.FaceDetectionArray", 10.0, 5.0),
("fused_target", "/social/tracking/fused_target", "saltybot_social_msgs.msg.FusedTarget", 8.0, 5.0),
("orchestrator_state", "/social/orchestrator/state", "std_msgs.msg.String", 1.0, 3.0),
("persons", "/social/persons", "saltybot_social_msgs.msg.PersonStateArray", 3.0, 5.0),
]
MEASUREMENT_WINDOW_S = 3.0
def _import_msg_type(path: str):
"""Dynamically import a message class from dotted path like 'std_msgs.msg.String'."""
parts = path.rsplit(".", 1)
module = __import__(parts[0], fromlist=[parts[1]])
return getattr(module, parts[1])
class TestTopicRates(unittest.TestCase):
"""Verify all social-bot topics publish at ≥ minimum required rate."""
@classmethod
def setUpClass(cls):
rclpy.init()
cls.node = rclpy.create_node("topic_rate_test_probe")
cls.checkers: dict = {}
# Create all rate checkers
for spec_id, topic, msg_path, min_hz, warm_up in RATE_SPECS:
try:
msg_type = _import_msg_type(msg_path)
cls.checkers[spec_id] = TopicRateChecker(
cls.node, topic, msg_type, window_s=MEASUREMENT_WINDOW_S
)
except Exception as e:
print(f"[WARN] Could not create checker for {spec_id}: {e}")
@classmethod
def tearDownClass(cls):
cls.node.destroy_node()
rclpy.shutdown()
def _spin_for(self, duration_s: float) -> None:
deadline = time.time() + duration_s
while time.time() < deadline:
rclpy.spin_once(self.node, timeout_sec=0.05)
def _check_rate(self, spec_id: str, min_hz: float, warm_up_s: float) -> None:
"""Generic rate check: warm up, measure, assert."""
checker = self.checkers.get(spec_id)
if checker is None:
self.skipTest(f"Checker for {spec_id} not available (missing msg type)")
# Warm-up period
self._spin_for(warm_up_s)
# Measurement window
self._spin_for(MEASUREMENT_WINDOW_S + 0.5)
hz = checker.measured_hz()
count = checker.message_count()
self.assertGreater(
hz, min_hz * 0.7, # 30% tolerance for scheduling jitter
f"{spec_id}: measured {hz:.2f} Hz < {min_hz:.1f} Hz minimum "
f"(received {count} messages total)"
)
# ── Individual test methods (one per topic) ────────────────────────────────
def test_camera_image_rate(self):
"""Mock camera must publish at ≥12Hz."""
self._check_rate("camera_image", 12.0, warm_up_s=2.0)
def test_uwb_target_rate(self):
"""Mock UWB must publish at ≥8Hz."""
self._check_rate("uwb_target", 8.0, warm_up_s=2.0)
def test_face_detection_rate(self):
"""Face detection must publish at ≥10Hz given 15Hz camera input."""
self._check_rate("faces_detections", 10.0, warm_up_s=5.0)
def test_tracking_fusion_rate(self):
"""Tracking fusion must publish fused_target at ≥8Hz."""
self._check_rate("fused_target", 8.0, warm_up_s=5.0)
def test_orchestrator_state_rate(self):
"""Orchestrator must publish state at ≥1Hz (config: 2Hz)."""
self._check_rate("orchestrator_state", 1.0, warm_up_s=3.0)
def test_persons_rate(self):
"""Person state tracker must publish at ≥3Hz."""
self._check_rate("persons", 3.0, warm_up_s=5.0)
class TestTopicPresence(unittest.TestCase):
"""Verify all expected topics exist in the topic graph (not rate)."""
EXPECTED_TOPICS = [
"/social/orchestrator/state",
"/social/faces/detections",
"/social/faces/embeddings",
"/social/tracking/fused_target",
"/social/persons",
"/social/speech/vad_state",
"/social/speech/transcript",
"/social/conversation/response",
"/social/nav/mode",
"/camera/color/image_raw",
"/uwb/target",
"/cmd_vel",
]
@classmethod
def setUpClass(cls):
if not rclpy.ok():
rclpy.init()
cls.node = rclpy.create_node("topic_presence_test_probe")
# Wait briefly for topics to register
deadline = time.time() + 10.0
while time.time() < deadline:
rclpy.spin_once(cls.node, timeout_sec=0.2)
known = {n for n, _ in cls.node.get_topic_names_and_types()}
if "/social/orchestrator/state" in known:
break
@classmethod
def tearDownClass(cls):
cls.node.destroy_node()
def test_all_expected_topics_advertised(self):
"""Every expected topic must appear in the topic graph."""
known = {n for n, _ in self.node.get_topic_names_and_types()}
missing = [t for t in self.EXPECTED_TOPICS if t not in known]
self.assertEqual(
missing, [],
f"Topics not advertised: {missing}"
)