diff --git a/.gitea/workflows/social-tests-ci.yml b/.gitea/workflows/social-tests-ci.yml
new file mode 100644
index 0000000..15950eb
--- /dev/null
+++ b/.gitea/workflows/social-tests-ci.yml
@@ -0,0 +1,239 @@
+# .gitea/workflows/social-tests-ci.yml
+# Gitea Actions CI pipeline for social-bot integration tests (Issue #108)
+#
+# Triggers: push or PR to main, and any branch matching sl-jetson/*
+#
+# Jobs:
+# 1. lint — flake8 + pep257 on saltybot_social_tests
+# 2. core-tests — launch + topic + services + GPU-budget + shutdown (no GPU)
+# 3. latency — end-to-end latency profiling (GPU runner, optional)
+
+name: social-bot integration tests
+
+on:
+ push:
+ branches:
+ - main
+ - "sl-jetson/**"
+ paths:
+ - "jetson/ros2_ws/src/saltybot_social/**"
+ - "jetson/ros2_ws/src/saltybot_social_tests/**"
+ - "jetson/ros2_ws/src/saltybot_social_msgs/**"
+ - "jetson/ros2_ws/src/saltybot_social_nav/**"
+ - "jetson/ros2_ws/src/saltybot_social_face/**"
+ - "jetson/ros2_ws/src/saltybot_social_tracking/**"
+ - ".gitea/workflows/social-tests-ci.yml"
+ pull_request:
+ branches:
+ - main
+ paths:
+ - "jetson/ros2_ws/src/saltybot_social*/**"
+ - ".gitea/workflows/social-tests-ci.yml"
+
+env:
+ ROS_DOMAIN_ID: "108"
+ SOCIAL_TEST_FULL: "0"
+ SOCIAL_STACK_RUNNING: "1"
+
+jobs:
+ # ── 1. Lint ──────────────────────────────────────────────────────────────────
+ lint:
+ name: Lint (flake8 + pep257)
+ runs-on: ubuntu-latest
+ container:
+ image: ros:humble-ros-base-jammy
+ steps:
+ - uses: actions/checkout@v4
+
+ - name: Install lint tools
+ run: |
+ pip3 install --no-cache-dir flake8 pydocstyle
+
+ - name: flake8 — saltybot_social_tests
+ run: |
+ flake8 \
+ jetson/ros2_ws/src/saltybot_social_tests/saltybot_social_tests/ \
+ jetson/ros2_ws/src/saltybot_social_tests/test/ \
+ --max-line-length=100 \
+ --ignore=E501,W503 \
+ --exclude=__pycache__
+
+ - name: pep257 — saltybot_social_tests
+ run: |
+ pydocstyle \
+ jetson/ros2_ws/src/saltybot_social_tests/saltybot_social_tests/ \
+ jetson/ros2_ws/src/saltybot_social_tests/test/ \
+ --add-ignore=D100,D104,D205,D400 \
+ --match='(?!test_helpers|mock_sensors).*\.py' || true
+
+ # ── 2. Core integration tests (no GPU) ───────────────────────────────────────
+ core-tests:
+ name: Core integration tests (mock sensors, no GPU)
+ needs: lint
+ runs-on: ubuntu-latest
+ container:
+ image: ros:humble-ros-base-jammy
+ env:
+ ROS_DOMAIN_ID: "108"
+ SOCIAL_TEST_FULL: "0"
+ SOCIAL_STACK_RUNNING: "1"
+ PYTHONUNBUFFERED: "1"
+ steps:
+ - uses: actions/checkout@v4
+
+ - name: Install dependencies
+ run: |
+ apt-get update -q && apt-get install -y --no-install-recommends \
+ ros-humble-launch-testing \
+ ros-humble-launch-testing-ros \
+ ros-humble-vision-msgs \
+ ros-humble-tf2-ros \
+ ros-humble-tf2-geometry-msgs \
+ ros-humble-cv-bridge \
+ ros-humble-nav-msgs \
+ ros-humble-std-srvs \
+ procps \
+ && rm -rf /var/lib/apt/lists/*
+ pip3 install --no-cache-dir "pytest>=7.0" "pytest-timeout>=2.1"
+
+ - name: Build workspace
+ run: |
+ . /opt/ros/humble/setup.bash
+ cd /
+ # Symlink source into a temp workspace to avoid long paths
+ mkdir -p /ros2_ws/src
+ for pkg in saltybot_social_msgs saltybot_social saltybot_social_face \
+ saltybot_social_nav saltybot_social_enrollment \
+ saltybot_social_tracking saltybot_social_personality \
+ saltybot_social_tests; do
+ ln -sfn \
+ "$GITHUB_WORKSPACE/jetson/ros2_ws/src/$pkg" \
+ "/ros2_ws/src/$pkg"
+ done
+ cd /ros2_ws
+ colcon build \
+ --packages-select \
+ saltybot_social_msgs \
+ saltybot_social \
+ saltybot_social_face \
+ saltybot_social_nav \
+ saltybot_social_enrollment \
+ saltybot_social_tracking \
+ saltybot_social_personality \
+ saltybot_social_tests \
+ --symlink-install \
+ --event-handlers console_cohesion+
+
+ - name: Launch test stack (background)
+ run: |
+ . /opt/ros/humble/setup.bash
+ . /ros2_ws/install/setup.bash
+ ros2 daemon start || true
+ ros2 launch saltybot_social_tests social_test.launch.py \
+ enable_speech:=false enable_llm:=false enable_tts:=false \
+ &
+ echo "LAUNCH_PID=$!" >> "$GITHUB_ENV"
+ # Wait for stack to come up
+ sleep 15
+
+ - name: Run core tests
+ run: |
+ . /opt/ros/humble/setup.bash
+ . /ros2_ws/install/setup.bash
+ pytest \
+ "$GITHUB_WORKSPACE/jetson/ros2_ws/src/saltybot_social_tests/test/" \
+ -v \
+ --timeout=120 \
+ --tb=short \
+ -m "not gpu_required and not full_stack and not slow" \
+ --junit-xml=test-results-core.xml \
+ -k "not test_latency_sla and not test_shutdown"
+
+ - name: Shutdown test stack
+ if: always()
+ run: |
+ [ -n "${LAUNCH_PID:-}" ] && kill -SIGTERM "$LAUNCH_PID" 2>/dev/null || true
+ sleep 3
+
+ - name: Run shutdown tests
+ run: |
+ . /opt/ros/humble/setup.bash
+ . /ros2_ws/install/setup.bash
+ pytest \
+ "$GITHUB_WORKSPACE/jetson/ros2_ws/src/saltybot_social_tests/test/test_shutdown.py" \
+ -v --timeout=60 --tb=short \
+ --junit-xml=test-results-shutdown.xml
+
+ - name: Upload test results
+ if: always()
+ uses: actions/upload-artifact@v4
+ with:
+ name: test-results-core
+ path: test-results-*.xml
+
+ # ── 3. GPU latency tests (Orin runner, optional) ──────────────────────────────
+ latency-gpu:
+ name: Latency profiling (GPU, Orin)
+ needs: core-tests
+ runs-on: [self-hosted, orin, gpu]
+ if: github.ref == 'refs/heads/main' || contains(github.event.pull_request.labels.*.name, 'run-gpu-tests')
+ env:
+ ROS_DOMAIN_ID: "108"
+ SOCIAL_TEST_FULL: "1"
+ SOCIAL_TEST_SPEECH: "0"
+ SOCIAL_STACK_RUNNING: "1"
+ steps:
+ - uses: actions/checkout@v4
+
+ - name: Build workspace (Orin)
+ run: |
+ . /opt/ros/humble/setup.bash
+ cd /ros2_ws
+ colcon build \
+ --packages-select saltybot_social_tests saltybot_social \
+ --symlink-install
+
+ - name: Launch full stack (with LLM, no speech/TTS)
+ run: |
+ . /opt/ros/humble/setup.bash
+ . /ros2_ws/install/setup.bash
+ SOCIAL_TEST_FULL=1 \
+ ros2 launch saltybot_social_tests social_test.launch.py \
+ enable_speech:=false enable_tts:=false \
+ &
+ echo "LAUNCH_PID=$!" >> "$GITHUB_ENV"
+ sleep 20
+
+ - name: GPU memory check
+ run: |
+ . /opt/ros/humble/setup.bash
+ . /ros2_ws/install/setup.bash
+ SOCIAL_STACK_RUNNING=1 \
+ GPU_BASELINE_MB=$(nvidia-smi --query-gpu=memory.used --format=csv,noheader,nounits) \
+ pytest \
+ "$GITHUB_WORKSPACE/jetson/ros2_ws/src/saltybot_social_tests/test/test_gpu_memory.py" \
+ -v --timeout=120 --tb=short \
+ --junit-xml=test-results-gpu.xml
+
+ - name: Latency profiling
+ run: |
+ . /opt/ros/humble/setup.bash
+ . /ros2_ws/install/setup.bash
+ SOCIAL_TEST_FULL=1 SOCIAL_STACK_RUNNING=1 \
+ pytest \
+ "$GITHUB_WORKSPACE/jetson/ros2_ws/src/saltybot_social_tests/test/test_latency.py" \
+ -v --timeout=300 --tb=short \
+ --junit-xml=test-results-latency.xml
+
+ - name: Shutdown
+ if: always()
+ run: |
+ [ -n "${LAUNCH_PID:-}" ] && kill -SIGTERM "$LAUNCH_PID" 2>/dev/null || true
+ sleep 5
+
+ - name: Upload GPU results
+ if: always()
+ uses: actions/upload-artifact@v4
+ with:
+ name: test-results-gpu
+ path: test-results-*.xml
diff --git a/jetson/ros2_ws/src/saltybot_social_tests/bags/README.md b/jetson/ros2_ws/src/saltybot_social_tests/bags/README.md
new file mode 100644
index 0000000..c88c453
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_social_tests/bags/README.md
@@ -0,0 +1,55 @@
+# Social-bot Test Bags
+
+This directory holds rosbag2 recordings used for replaying mock sensor data in CI.
+
+## Bag contents
+
+Each test bag records the following topics for **30 seconds** at real-time rate:
+
+| Topic | Type | Rate | Notes |
+|---|---|---|---|
+| `/camera/color/image_raw` | `sensor_msgs/Image` | 15 Hz | 640×480 RGB8 |
+| `/uwb/target` | `geometry_msgs/PoseStamped` | 10 Hz | simulated UWB |
+| `/social/faces/detections` | `FaceDetectionArray` | 15 Hz | may be empty |
+| `/social/tracking/fused_target` | `FusedTarget` | 10 Hz | single person |
+| `/social/persons` | `PersonStateArray` | 5 Hz | one engaged person |
+| `/social/orchestrator/state` | `std_msgs/String` | 2 Hz | JSON state |
+| `/odom` | `nav_msgs/Odometry` | 20 Hz | zero motion |
+| `/tf`, `/tf_static` | `TF2` | varies | camera → base_link |
+
+## Recording a new bag
+
+On hardware with the full social-bot stack running:
+
+```bash
+# Record 30s of live sensor data
+./bags/record_social_test.sh --duration 30
+
+# Record from mock sensor pub only (no hardware)
+MOCK_ONLY=1 ./bags/record_social_test.sh --duration 15 --name social_test_mock
+```
+
+## Replaying a bag in CI
+
+The Docker CI entrypoint uses live mock publishers (not rosbag replay) for the
+default CI tests. To use a recorded bag instead:
+
+```bash
+# In a sourced ROS2 shell:
+ros2 bag play bags/social_test_YYYYMMDD_HHMMSS/ --loop --rate 1.0
+```
+
+Then run the test suite against the replayed data:
+
+```bash
+export SOCIAL_STACK_RUNNING=1
+pytest test/ -v -m "not full_stack"
+```
+
+## Bag naming convention
+
+`social_test_YYYYMMDD_HHMMSS` — date/time of recording.
+
+Bags committed to this directory should be small (< 50 MB) — use a 5-second
+clip or compress images to avoid bloating the repository. Large reference bags
+(> 100 MB) should be stored on the NVMe and referenced by path.
diff --git a/jetson/ros2_ws/src/saltybot_social_tests/bags/record_social_test.sh b/jetson/ros2_ws/src/saltybot_social_tests/bags/record_social_test.sh
new file mode 100755
index 0000000..48ed677
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_social_tests/bags/record_social_test.sh
@@ -0,0 +1,100 @@
+#!/bin/bash
+# record_social_test.sh — Record a rosbag for social-bot integration test replay.
+#
+# Records all topics required by the social-bot CI test suite.
+# Output: bags/social_test_YYYYMMDD_HHMMSS/ (ROS2 bag format)
+#
+# Usage:
+# # Record 30s with live hardware:
+# ./bags/record_social_test.sh --duration 30
+#
+# # Record from already-running mock sensor stack:
+# MOCK_ONLY=1 ./bags/record_social_test.sh --duration 15
+#
+# Requirements: ROS2 Humble sourced, social-bot stack running
+
+set -euo pipefail
+SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
+BAGS_DIR="$SCRIPT_DIR"
+
+# ── Args ──────────────────────────────────────────────────────────────────────
+DURATION=30
+BAG_NAME="social_test_$(date +%Y%m%d_%H%M%S)"
+MOCK_ONLY="${MOCK_ONLY:-0}"
+
+while [[ $# -gt 0 ]]; do
+ case "$1" in
+ --duration) DURATION="$2"; shift 2 ;;
+ --name) BAG_NAME="$2"; shift 2 ;;
+ *) echo "Unknown arg: $1"; exit 1 ;;
+ esac
+done
+
+BAG_PATH="$BAGS_DIR/$BAG_NAME"
+
+# ── Source ROS2 ───────────────────────────────────────────────────────────────
+source /opt/ros/humble/setup.bash 2>/dev/null || true
+if [ -f /ros2_ws/install/setup.bash ]; then
+ source /ros2_ws/install/setup.bash
+fi
+
+echo "[record] Target bag: $BAG_PATH"
+echo "[record] Duration: ${DURATION}s"
+
+# ── Topics to record ──────────────────────────────────────────────────────────
+TOPICS=(
+ # Mock / hardware sensor topics
+ "/camera/color/image_raw"
+ "/camera/color/camera_info"
+ "/camera/depth/image_rect_raw"
+ "/uwb/target"
+ "/scan"
+ "/odom"
+ "/map"
+ # Social-bot output topics
+ "/social/faces/detections"
+ "/social/faces/embeddings"
+ "/social/tracking/fused_target"
+ "/social/persons"
+ "/social/orchestrator/state"
+ "/social/speech/vad_state"
+ "/social/speech/transcript"
+ "/social/conversation/response"
+ "/social/nav/mode"
+ "/social/nav/status"
+ "/social/attention/target_id"
+ "/cmd_vel"
+ # TF for coordinate frames
+ "/tf"
+ "/tf_static"
+)
+
+# ── Start mock sensor pub if MOCK_ONLY ────────────────────────────────────────
+MOCK_PID=""
+if [[ "$MOCK_ONLY" == "1" ]]; then
+ echo "[record] Starting mock sensor publisher..."
+ ros2 run saltybot_social_tests mock_sensor_pub &
+ MOCK_PID=$!
+ sleep 2
+fi
+
+# ── Record ────────────────────────────────────────────────────────────────────
+echo "[record] Recording..."
+ros2 bag record \
+ --output "$BAG_PATH" \
+ --storage sqlite3 \
+ --max-bag-duration "$DURATION" \
+ "${TOPICS[@]}" &
+BAG_PID=$!
+
+sleep "$DURATION"
+wait "$BAG_PID" 2>/dev/null || true
+
+# ── Cleanup ───────────────────────────────────────────────────────────────────
+if [[ -n "$MOCK_PID" ]]; then
+ kill "$MOCK_PID" 2>/dev/null || true
+fi
+
+echo "[record] Done. Bag saved to: $BAG_PATH"
+echo "[record] Contents:"
+ros2 bag info "$BAG_PATH" 2>/dev/null || echo "(ros2 bag info unavailable)"
diff --git a/jetson/ros2_ws/src/saltybot_social_tests/docker/Dockerfile.ci b/jetson/ros2_ws/src/saltybot_social_tests/docker/Dockerfile.ci
new file mode 100644
index 0000000..72627e6
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_social_tests/docker/Dockerfile.ci
@@ -0,0 +1,93 @@
+# Dockerfile.ci — Lightweight CI test container for social-bot integration tests.
+#
+# Does NOT include GPU AI deps (whisper/LLM/TTS/TensorRT).
+# Runs the core ROS2 stack with mock sensors only — no real hardware required.
+#
+# Build:
+# docker build -f docker/Dockerfile.ci \
+# -t saltybot/social-tests-ci:latest \
+# ../../../../ # context = saltylab-firmware root
+#
+# Run (via compose):
+# docker compose -f docker/docker-compose.ci.yml up --exit-code-from test-runner
+
+FROM ros:humble-ros-base-jammy
+
+LABEL maintainer="sl-jetson"
+LABEL description="social-bot integration tests — CI mode (no GPU, mock sensors)"
+
+ENV DEBIAN_FRONTEND=noninteractive
+ENV ROS_DISTRO=humble
+ENV PYTHONDONTWRITEBYTECODE=1
+ENV PYTHONUNBUFFERED=1
+
+# ── System deps ───────────────────────────────────────────────────────────────
+RUN apt-get update && apt-get install -y --no-install-recommends \
+ python3-dev python3-pip python3-setuptools python3-wheel \
+ python3-pytest \
+ ros-humble-launch-testing \
+ ros-humble-launch-testing-ament-cmake \
+ ros-humble-launch-testing-ros \
+ ros-humble-rosbag2 \
+ ros-humble-rosbag2-storage-default-plugins \
+ ros-humble-vision-msgs \
+ ros-humble-tf2-ros \
+ ros-humble-tf2-geometry-msgs \
+ ros-humble-cv-bridge \
+ ros-humble-nav-msgs \
+ ros-humble-geometry-msgs \
+ ros-humble-sensor-msgs \
+ ros-humble-std-srvs \
+ procps \
+ && rm -rf /var/lib/apt/lists/*
+
+# ── Python test deps ──────────────────────────────────────────────────────────
+RUN pip3 install --no-cache-dir \
+ "pytest>=7.0" \
+ "pytest-timeout>=2.1" \
+ "pytest-xdist>=3.0"
+
+# ── Workspace ─────────────────────────────────────────────────────────────────
+RUN mkdir -p /ros2_ws/src
+WORKDIR /ros2_ws
+
+# Copy only the packages needed for the social-bot integration tests
+COPY jetson/ros2_ws/src/saltybot_social_msgs /ros2_ws/src/saltybot_social_msgs/
+COPY jetson/ros2_ws/src/saltybot_social /ros2_ws/src/saltybot_social/
+COPY jetson/ros2_ws/src/saltybot_social_face /ros2_ws/src/saltybot_social_face/
+COPY jetson/ros2_ws/src/saltybot_social_nav /ros2_ws/src/saltybot_social_nav/
+COPY jetson/ros2_ws/src/saltybot_social_enrollment /ros2_ws/src/saltybot_social_enrollment/
+COPY jetson/ros2_ws/src/saltybot_social_tracking /ros2_ws/src/saltybot_social_tracking/
+COPY jetson/ros2_ws/src/saltybot_social_personality /ros2_ws/src/saltybot_social_personality/
+COPY jetson/ros2_ws/src/saltybot_social_tests /ros2_ws/src/saltybot_social_tests/
+
+# ── Build ─────────────────────────────────────────────────────────────────────
+SHELL ["/bin/bash", "-c"]
+RUN source /opt/ros/humble/setup.bash && \
+ cd /ros2_ws && \
+ colcon build \
+ --packages-select \
+ saltybot_social_msgs \
+ saltybot_social \
+ saltybot_social_face \
+ saltybot_social_nav \
+ saltybot_social_enrollment \
+ saltybot_social_tracking \
+ saltybot_social_personality \
+ saltybot_social_tests \
+ --symlink-install \
+ --cmake-args -DBUILD_TESTING=ON \
+ && echo "Build complete"
+
+# ── Test entrypoint ───────────────────────────────────────────────────────────
+COPY jetson/ros2_ws/src/saltybot_social_tests/docker/entrypoint-ci.sh /entrypoint-ci.sh
+RUN chmod +x /entrypoint-ci.sh
+
+# CI env defaults (all can be overridden via docker-compose environment:)
+ENV SOCIAL_TEST_FULL=0
+ENV SOCIAL_TEST_SPEECH=0
+ENV SOCIAL_STACK_RUNNING=0
+ENV ROS_DOMAIN_ID=108
+
+ENTRYPOINT ["/entrypoint-ci.sh"]
+CMD ["pytest", "/ros2_ws/src/saltybot_social_tests/test/", "-v", "--timeout=120"]
diff --git a/jetson/ros2_ws/src/saltybot_social_tests/docker/docker-compose.ci.yml b/jetson/ros2_ws/src/saltybot_social_tests/docker/docker-compose.ci.yml
new file mode 100644
index 0000000..00c510d
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_social_tests/docker/docker-compose.ci.yml
@@ -0,0 +1,75 @@
+# docker-compose.ci.yml — CI test runner for social-bot integration tests.
+#
+# Usage:
+# # Default: core CI tests (no GPU, mock sensors only)
+# docker compose -f docker/docker-compose.ci.yml up --exit-code-from test-runner
+#
+# # Full tests with GPU (requires nvidia container runtime):
+# SOCIAL_TEST_FULL=1 \
+# docker compose -f docker/docker-compose.ci.yml \
+# --profile gpu up --exit-code-from test-runner
+#
+# Build image first:
+# docker compose -f docker/docker-compose.ci.yml build
+
+version: "3.9"
+
+# ── Shared env ────────────────────────────────────────────────────────────────
+x-common-env: &common-env
+ ROS_DOMAIN_ID: "108"
+ PYTHONUNBUFFERED: "1"
+
+services:
+ # ── Core CI test runner (no GPU) ─────────────────────────────────────────────
+ test-runner:
+ build:
+ context: ../../../../ # saltylab-firmware root
+ dockerfile: jetson/ros2_ws/src/saltybot_social_tests/docker/Dockerfile.ci
+ image: saltybot/social-tests-ci:latest
+ container_name: social_tests_ci
+ environment:
+ <<: *common-env
+ SOCIAL_TEST_FULL: "${SOCIAL_TEST_FULL:-0}"
+ SOCIAL_TEST_SPEECH: "${SOCIAL_TEST_SPEECH:-0}"
+ SOCIAL_STACK_RUNNING: "1" # stack is started inside entrypoint
+ command: >
+ pytest
+ /ros2_ws/src/saltybot_social_tests/test/
+ -v
+ --timeout=120
+ --tb=short
+ -m "not gpu_required and not full_stack"
+ --junit-xml=/tmp/test-results/social-tests.xml
+ volumes:
+ - test-results:/tmp/test-results
+ # No network_mode host needed — ROS2 uses shared memory loopback in container
+
+ # ── GPU full-stack tests (Orin / CUDA host) ───────────────────────────────
+ test-runner-gpu:
+ extends:
+ service: test-runner
+ container_name: social_tests_ci_gpu
+ environment:
+ <<: *common-env
+ SOCIAL_TEST_FULL: "1"
+ SOCIAL_TEST_SPEECH: "1"
+ SOCIAL_STACK_RUNNING: "1"
+ command: >
+ pytest
+ /ros2_ws/src/saltybot_social_tests/test/
+ -v
+ --timeout=300
+ --tb=short
+ --junit-xml=/tmp/test-results/social-tests-gpu.xml
+ deploy:
+ resources:
+ reservations:
+ devices:
+ - driver: nvidia
+ count: all
+ capabilities: [gpu]
+ profiles:
+ - gpu
+
+volumes:
+ test-results:
diff --git a/jetson/ros2_ws/src/saltybot_social_tests/docker/entrypoint-ci.sh b/jetson/ros2_ws/src/saltybot_social_tests/docker/entrypoint-ci.sh
new file mode 100644
index 0000000..5528e61
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_social_tests/docker/entrypoint-ci.sh
@@ -0,0 +1,36 @@
+#!/bin/bash
+# entrypoint-ci.sh — CI container entrypoint for social-bot integration tests.
+set -euo pipefail
+
+source /opt/ros/humble/setup.bash
+source /ros2_ws/install/setup.bash
+
+# Start the ROS2 daemon in the background so node discovery works
+ros2 daemon start 2>/dev/null || true
+
+# Launch the test stack in the background using the test launch file
+# (orchestrator + face + tracking + mock_sensors — no GPU speech/LLM/TTS)
+echo "[CI] Launching social-bot test stack..."
+ros2 launch saltybot_social_tests social_test.launch.py \
+ enable_speech:=false \
+ enable_llm:=false \
+ enable_tts:=false \
+ &
+LAUNCH_PID=$!
+
+# Give the stack up to 30s to come up
+echo "[CI] Waiting for stack to initialise (max 30s)..."
+sleep 10
+
+# Run tests (command passed as args to this script)
+echo "[CI] Running tests: $*"
+"$@"
+TEST_EXIT=$?
+
+# Graceful shutdown
+echo "[CI] Shutting down stack (PID $LAUNCH_PID)..."
+kill -SIGTERM "$LAUNCH_PID" 2>/dev/null || true
+sleep 5
+kill -SIGKILL "$LAUNCH_PID" 2>/dev/null || true
+
+exit $TEST_EXIT
diff --git a/jetson/ros2_ws/src/saltybot_social_tests/launch/social_test.launch.py b/jetson/ros2_ws/src/saltybot_social_tests/launch/social_test.launch.py
new file mode 100644
index 0000000..adf47ac
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_social_tests/launch/social_test.launch.py
@@ -0,0 +1,177 @@
+"""social_test.launch.py — Launch social-bot stack in test/CI mode.
+
+Differences from production social_bot.launch.py:
+ - speech/TTS disabled by default (no mic/speaker in CI)
+ - LLM disabled by default (no model file in CI; use CI_FULL=1 to enable)
+ - mock_sensor_pub node started automatically
+ - Shorter TimerAction delays (2s instead of production delays)
+ - ROS_DOMAIN_ID isolated to avoid collision with other test runs
+
+Env vars:
+ SOCIAL_TEST_FULL=1 — enable speech + LLM + TTS (requires GPU + models)
+ SOCIAL_TEST_SPEECH=1 — enable speech pipeline only
+"""
+
+import os
+from ament_index_python.packages import get_package_share_directory
+from launch import LaunchDescription
+from launch.actions import (
+ DeclareLaunchArgument, GroupAction, LogInfo, TimerAction, SetEnvironmentVariable
+)
+from launch.conditions import IfCondition
+from launch.substitutions import LaunchConfiguration, PythonExpression, EnvironmentVariable
+from launch_ros.actions import Node
+
+_FULL = os.environ.get("SOCIAL_TEST_FULL", "0") == "1"
+_SPEECH = _FULL or os.environ.get("SOCIAL_TEST_SPEECH", "0") == "1"
+
+
+def generate_launch_description() -> LaunchDescription:
+ social_pkg = get_package_share_directory("saltybot_social")
+ tests_pkg = get_package_share_directory("saltybot_social_tests")
+
+ args = [
+ DeclareLaunchArgument("enable_speech", default_value="true" if _SPEECH else "false"),
+ DeclareLaunchArgument("enable_llm", default_value="true" if _FULL else "false"),
+ DeclareLaunchArgument("enable_tts", default_value="true" if _FULL else "false"),
+ DeclareLaunchArgument("enable_orchestrator", default_value="true"),
+ DeclareLaunchArgument("enable_face", default_value="true"),
+ DeclareLaunchArgument("enable_tracking", default_value="true"),
+ DeclareLaunchArgument("enable_nav", default_value="true"),
+ DeclareLaunchArgument("enable_mock_sensors", default_value="true"),
+ ]
+
+ # ── Mock sensor publisher (always first) ─────────────────────────────────
+ mock_sensors = Node(
+ package="saltybot_social_tests",
+ executable="mock_sensor_pub",
+ name="mock_sensor_pub",
+ output="screen",
+ condition=IfCondition(LaunchConfiguration("enable_mock_sensors")),
+ )
+
+ # ── Orchestrator (t=0s) ──────────────────────────────────────────────────
+ orchestrator = Node(
+ package="saltybot_social",
+ executable="orchestrator_node",
+ name="orchestrator_node",
+ parameters=[
+ os.path.join(social_pkg, "config", "orchestrator_params.yaml"),
+ {"watchdog_timeout_s": 60.0, # more lenient in CI
+ "gpu_mem_throttle_mb": 500.0}, # low floor for CI without GPU
+ ],
+ condition=IfCondition(LaunchConfiguration("enable_orchestrator")),
+ output="screen",
+ )
+
+ # ── Face recognition (t=1s) ──────────────────────────────────────────────
+ face = TimerAction(period=1.0, actions=[
+ GroupAction(
+ condition=IfCondition(LaunchConfiguration("enable_face")),
+ actions=[
+ LogInfo(msg="[social_test] Starting face recognition..."),
+ Node(
+ package="saltybot_social_face",
+ executable="face_recognition",
+ name="face_recognition_node",
+ parameters=[{"use_trt": False, # CPU ONNX in CI
+ "detection_threshold": 0.5,
+ "gallery_path": "/tmp/social_test_gallery"}],
+ output="screen",
+ ),
+ ]
+ )
+ ])
+
+ # ── Tracking fusion (t=1s) ───────────────────────────────────────────────
+ tracking = TimerAction(period=1.0, actions=[
+ GroupAction(
+ condition=IfCondition(LaunchConfiguration("enable_tracking")),
+ actions=[
+ Node(
+ package="saltybot_social_tracking",
+ executable="tracking_fusion_node",
+ name="tracking_fusion_node",
+ output="screen",
+ ),
+ ]
+ )
+ ])
+
+ # ── Speech pipeline (t=2s, optional) ────────────────────────────────────
+ speech = TimerAction(period=2.0, actions=[
+ GroupAction(
+ condition=IfCondition(LaunchConfiguration("enable_speech")),
+ actions=[
+ LogInfo(msg="[social_test] Starting speech pipeline..."),
+ Node(
+ package="saltybot_social",
+ executable="speech_pipeline_node",
+ name="speech_pipeline_node",
+ parameters=[
+ os.path.join(social_pkg, "config", "speech_params.yaml"),
+ {"whisper_model": "tiny", # faster in CI
+ "use_silero_vad": False}, # avoid download in CI
+ ],
+ output="screen",
+ ),
+ ]
+ )
+ ])
+
+ # ── LLM + TTS (t=3s, optional) ──────────────────────────────────────────
+ llm = TimerAction(period=3.0, actions=[
+ GroupAction(
+ condition=IfCondition(LaunchConfiguration("enable_llm")),
+ actions=[
+ Node(
+ package="saltybot_social",
+ executable="conversation_node",
+ name="conversation_node",
+ parameters=[
+ os.path.join(social_pkg, "config", "conversation_params.yaml"),
+ ],
+ output="screen",
+ ),
+ ]
+ )
+ ])
+
+ tts = TimerAction(period=3.0, actions=[
+ GroupAction(
+ condition=IfCondition(LaunchConfiguration("enable_tts")),
+ actions=[
+ Node(
+ package="saltybot_social",
+ executable="tts_node",
+ name="tts_node",
+ parameters=[
+ os.path.join(social_pkg, "config", "tts_params.yaml"),
+ {"playback_enabled": False}, # no speaker in CI
+ ],
+ output="screen",
+ ),
+ ]
+ )
+ ])
+
+ # ── Social nav (t=2s) ────────────────────────────────────────────────────
+ nav = TimerAction(period=2.0, actions=[
+ GroupAction(
+ condition=IfCondition(LaunchConfiguration("enable_nav")),
+ actions=[
+ Node(
+ package="saltybot_social_nav",
+ executable="social_nav",
+ name="social_nav_node",
+ parameters=[{"use_midas_depth": False}], # no GPU depth in CI
+ output="screen",
+ ),
+ ]
+ )
+ ])
+
+ return LaunchDescription(args + [
+ mock_sensors, orchestrator,
+ face, tracking, speech, llm, tts, nav,
+ ])
diff --git a/jetson/ros2_ws/src/saltybot_social_tests/package.xml b/jetson/ros2_ws/src/saltybot_social_tests/package.xml
new file mode 100644
index 0000000..9d14bf4
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_social_tests/package.xml
@@ -0,0 +1,43 @@
+
+
+
+ saltybot_social_tests
+ 0.1.0
+
+ Integration test suite for the social-bot pipeline (Issue #108).
+ Tests: launch verification, topic rates, services, GPU memory budget,
+ end-to-end latency profiling, graceful shutdown.
+ CI-compatible via Docker + mock sensor publishers.
+
+ sl-jetson
+ MIT
+
+
+ rclpy
+ std_msgs
+ sensor_msgs
+ geometry_msgs
+ nav_msgs
+
+
+ saltybot_social
+ saltybot_social_msgs
+ saltybot_social_face
+ saltybot_social_nav
+ saltybot_social_enrollment
+ saltybot_social_tracking
+ saltybot_social_personality
+
+
+ ament_copyright
+ ament_flake8
+ ament_pep257
+ python3-pytest
+ launch_testing
+ launch_testing_ament_cmake
+ launch_testing_ros
+
+
+ ament_python
+
+
diff --git a/jetson/ros2_ws/src/saltybot_social_tests/pytest.ini b/jetson/ros2_ws/src/saltybot_social_tests/pytest.ini
new file mode 100644
index 0000000..d6420e6
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_social_tests/pytest.ini
@@ -0,0 +1,27 @@
+[pytest]
+# social-bot integration test configuration
+
+# Markers registered here to silence PytestUnknownMarkWarning
+markers =
+ gpu_required: test requires an NVIDIA GPU
+ full_stack: test requires SOCIAL_TEST_FULL=1
+ stack_running: test requires SOCIAL_STACK_RUNNING=1
+ slow: test takes more than 30 seconds
+ launch_test: launch_testing integration test
+
+# Default test timeout (seconds) — individual tests can override
+timeout = 120
+
+# Show locals on failure for easier debugging
+showlocals = false
+
+# Brief summary: failed (f) + errors (e) + skipped (s) + passed (p)
+addopts = -ra --tb=short
+
+# Test paths
+testpaths = test
+
+# Filters
+filterwarnings =
+ ignore::DeprecationWarning:launch_testing
+ ignore::UserWarning:rclpy
diff --git a/jetson/ros2_ws/src/saltybot_social_tests/resource/saltybot_social_tests b/jetson/ros2_ws/src/saltybot_social_tests/resource/saltybot_social_tests
new file mode 100644
index 0000000..e69de29
diff --git a/jetson/ros2_ws/src/saltybot_social_tests/saltybot_social_tests/__init__.py b/jetson/ros2_ws/src/saltybot_social_tests/saltybot_social_tests/__init__.py
new file mode 100644
index 0000000..bb01954
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_social_tests/saltybot_social_tests/__init__.py
@@ -0,0 +1 @@
+# saltybot_social_tests — integration test harness for social-bot pipeline
diff --git a/jetson/ros2_ws/src/saltybot_social_tests/saltybot_social_tests/mock_sensors.py b/jetson/ros2_ws/src/saltybot_social_tests/saltybot_social_tests/mock_sensors.py
new file mode 100644
index 0000000..55ab044
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_social_tests/saltybot_social_tests/mock_sensors.py
@@ -0,0 +1,228 @@
+"""mock_sensors.py — Mock sensor publishers for CI integration tests.
+
+Publishes minimal valid data on all topics that social-bot nodes subscribe to,
+so the pipeline can start and produce output without real hardware.
+
+Topics published:
+ /camera/color/image_raw — blank 640×480 RGB8 image at 15Hz
+ /social/speech/transcript — canned SpeechTranscript at 0.5Hz
+ /social/faces/detections — empty FaceDetectionArray at 15Hz
+ /social/tracking/fused_target — synthetic FusedTarget at 10Hz
+ /social/persons — single synthetic PersonStateArray at 5Hz
+ /uwb/target — synthetic PoseStamped at 10Hz
+
+Usage (standalone):
+ ros2 run saltybot_social_tests mock_sensor_pub
+
+Usage (programmatic):
+ from saltybot_social_tests.mock_sensors import MockSensorPublisher
+ pub = MockSensorPublisher(node)
+ pub.start()
+ ...
+ pub.stop()
+"""
+
+from __future__ import annotations
+
+import threading
+import time
+import struct
+from typing import Optional
+
+import rclpy
+from rclpy.node import Node
+from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSDurabilityPolicy
+
+from std_msgs.msg import Header, String
+from sensor_msgs.msg import Image
+from geometry_msgs.msg import PoseStamped, Point, Quaternion, Vector3
+
+
+# ── Helpers ───────────────────────────────────────────────────────────────────
+
+def _now_header(node: Node, frame_id: str = "camera_color_optical_frame") -> Header:
+ h = Header()
+ h.stamp = node.get_clock().now().to_msg()
+ h.frame_id = frame_id
+ return h
+
+
+def _blank_rgb_image(node: Node, width: int = 640, height: int = 480) -> Image:
+ img = Image()
+ img.header = _now_header(node)
+ img.width = width
+ img.height = height
+ img.encoding = "rgb8"
+ img.step = width * 3
+ img.data = bytes(width * height * 3) # all-black frame
+ return img
+
+
+# ── Mock publisher class ──────────────────────────────────────────────────────
+
+class MockSensorPublisher:
+ """Publishes mock sensor data for all social-bot pipeline inputs."""
+
+ def __init__(self, node: Node) -> None:
+ self._node = node
+ self._running = False
+ self._threads: list = []
+
+ be_qos = QoSProfile(
+ reliability=QoSReliabilityPolicy.BEST_EFFORT, depth=5
+ )
+ rel_qos = QoSProfile(
+ reliability=QoSReliabilityPolicy.RELIABLE, depth=10
+ )
+
+ # Camera image (required by face detection + segmentation)
+ self._img_pub = node.create_publisher(Image, "/camera/color/image_raw", be_qos)
+
+ # Fused tracking target (required by social_nav, person_follower)
+ try:
+ from saltybot_social_msgs.msg import FusedTarget, FaceDetectionArray, PersonStateArray
+ self._fused_pub = node.create_publisher(
+ FusedTarget, "/social/tracking/fused_target", rel_qos
+ )
+ self._faces_pub = node.create_publisher(
+ FaceDetectionArray, "/social/faces/detections", be_qos
+ )
+ self._persons_pub = node.create_publisher(
+ PersonStateArray, "/social/persons", rel_qos
+ )
+ self._has_social_msgs = True
+ except ImportError:
+ self._has_social_msgs = False
+ node.get_logger().warn("saltybot_social_msgs not found — limited mock data")
+
+ try:
+ from saltybot_social_msgs.msg import SpeechTranscript
+ self._transcript_pub = node.create_publisher(
+ SpeechTranscript, "/social/speech/transcript", rel_qos
+ )
+ self._has_speech_msgs = True
+ except ImportError:
+ self._has_speech_msgs = False
+
+ # UWB target (required by person_follower fusion)
+ self._uwb_pub = node.create_publisher(PoseStamped, "/uwb/target", rel_qos)
+
+ self._canned_phrases = [
+ "Hello Salty, how are you?",
+ "What time is it?",
+ "Follow me please.",
+ "Tell me a joke.",
+ ]
+ self._phrase_idx = 0
+
+ def start(self) -> None:
+ """Start all mock publisher threads."""
+ self._running = True
+ specs = [
+ ("camera", self._pub_camera, 1.0 / 15.0),
+ ("faces", self._pub_faces, 1.0 / 15.0),
+ ("fused", self._pub_fused, 1.0 / 10.0),
+ ("persons", self._pub_persons, 1.0 / 5.0),
+ ("uwb", self._pub_uwb, 1.0 / 10.0),
+ ("transcript",self._pub_transcript, 2.0),
+ ]
+ for name, fn, interval in specs:
+ t = threading.Thread(
+ target=self._loop, args=(fn, interval),
+ name=f"mock_{name}", daemon=True
+ )
+ t.start()
+ self._threads.append(t)
+
+ def stop(self) -> None:
+ self._running = False
+
+ def _loop(self, publish_fn, interval_s: float) -> None:
+ while self._running:
+ try:
+ publish_fn()
+ except Exception:
+ pass
+ time.sleep(interval_s)
+
+ def _pub_camera(self) -> None:
+ self._img_pub.publish(_blank_rgb_image(self._node))
+
+ def _pub_faces(self) -> None:
+ if not self._has_social_msgs:
+ return
+ from saltybot_social_msgs.msg import FaceDetectionArray
+ msg = FaceDetectionArray()
+ msg.header = _now_header(self._node)
+ # Empty array — camera is "working" but no face in frame
+ self._faces_pub.publish(msg)
+
+ def _pub_fused(self) -> None:
+ if not self._has_social_msgs:
+ return
+ from saltybot_social_msgs.msg import FusedTarget
+ msg = FusedTarget()
+ msg.header = _now_header(self._node, "base_link")
+ msg.position.x = 1.5 # 1.5m ahead
+ msg.position.y = 0.0
+ msg.velocity.x = 0.0
+ msg.track_id = 1
+ msg.confidence = 0.85
+ self._fused_pub.publish(msg)
+
+ def _pub_persons(self) -> None:
+ if not self._has_social_msgs:
+ return
+ from saltybot_social_msgs.msg import PersonStateArray, PersonState
+ msg = PersonStateArray()
+ msg.header = _now_header(self._node, "base_link")
+ p = PersonState()
+ p.header = msg.header
+ p.person_id = 1
+ p.person_name = "TestPerson"
+ p.state = PersonState.STATE_ENGAGED
+ p.distance = 1.5
+ p.bearing_deg = 0.0
+ p.engagement_score = 0.8
+ msg.persons = [p]
+ self._persons_pub.publish(msg)
+
+ def _pub_uwb(self) -> None:
+ msg = PoseStamped()
+ msg.header = _now_header(self._node, "base_link")
+ msg.pose.position.x = 1.5
+ msg.pose.position.y = 0.1
+ msg.pose.orientation.w = 1.0
+ self._uwb_pub.publish(msg)
+
+ def _pub_transcript(self) -> None:
+ if not self._has_speech_msgs:
+ return
+ from saltybot_social_msgs.msg import SpeechTranscript
+ msg = SpeechTranscript()
+ msg.header = _now_header(self._node)
+ msg.text = self._canned_phrases[self._phrase_idx % len(self._canned_phrases)]
+ msg.speaker_id = "test_person"
+ msg.confidence = 0.95
+ msg.audio_duration = 1.2
+ msg.is_partial = False
+ self._transcript_pub.publish(msg)
+ self._phrase_idx += 1
+
+
+# ── Standalone entry point ────────────────────────────────────────────────────
+
+def main(args=None) -> None:
+ rclpy.init(args=args)
+ node = rclpy.create_node("mock_sensor_pub")
+ pub = MockSensorPublisher(node)
+ pub.start()
+ node.get_logger().info("Mock sensor publisher running — Ctrl+C to stop")
+ try:
+ rclpy.spin(node)
+ except KeyboardInterrupt:
+ pass
+ finally:
+ pub.stop()
+ node.destroy_node()
+ rclpy.shutdown()
diff --git a/jetson/ros2_ws/src/saltybot_social_tests/saltybot_social_tests/test_helpers.py b/jetson/ros2_ws/src/saltybot_social_tests/saltybot_social_tests/test_helpers.py
new file mode 100644
index 0000000..d75e130
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_social_tests/saltybot_social_tests/test_helpers.py
@@ -0,0 +1,262 @@
+"""test_helpers.py — Shared helpers for social-bot integration tests.
+
+No hardware dependencies. Provides:
+- TopicRateChecker: measure actual publish rate on a topic
+- NodeChecker: verify a named node is alive in the graph
+- ServiceChecker: call a service and verify it responds
+- GpuMemoryChecker: measure GPU allocation via pycuda / nvidia-smi
+- TimeoutPoller: generic poll-until-true with timeout
+"""
+
+from __future__ import annotations
+
+import subprocess
+import threading
+import time
+from typing import Any, Callable, List, Optional
+
+import rclpy
+from rclpy.node import Node
+from rclpy.qos import (
+ QoSProfile, QoSReliabilityPolicy, QoSDurabilityPolicy, QoSHistoryPolicy
+)
+
+
+# ── Timeout poller ────────────────────────────────────────────────────────────
+
+def poll_until(
+ condition: Callable[[], bool],
+ timeout_s: float = 30.0,
+ interval_s: float = 0.5,
+) -> bool:
+ """Return True if condition() becomes True within timeout_s, else False."""
+ deadline = time.time() + timeout_s
+ while time.time() < deadline:
+ if condition():
+ return True
+ time.sleep(interval_s)
+ return False
+
+
+# ── Node checker ──────────────────────────────────────────────────────────────
+
+class NodeChecker:
+ """Check whether ROS2 nodes are alive in the graph."""
+
+ def __init__(self, probe_node: Node) -> None:
+ self._node = probe_node
+
+ def is_alive(self, node_name: str, namespace: str = "/") -> bool:
+ """Return True if node_name is present in the node graph."""
+ names_and_ns = self._node.get_node_names_and_namespaces()
+ for name, ns in names_and_ns:
+ if name == node_name:
+ return True
+ if ns.rstrip("/") + "/" + name == node_name:
+ return True
+ return False
+
+ def wait_for_nodes(
+ self,
+ node_names: List[str],
+ timeout_s: float = 30.0,
+ ) -> dict:
+ """Block until all nodes appear or timeout. Returns {name: found}."""
+ results = {n: False for n in node_names}
+ deadline = time.time() + timeout_s
+ while time.time() < deadline:
+ for name in node_names:
+ if not results[name]:
+ results[name] = self.is_alive(name)
+ if all(results.values()):
+ break
+ time.sleep(0.5)
+ return results
+
+
+# ── Topic rate checker ─────────────────────────────────────────────────────────
+
+class TopicRateChecker:
+ """Measure the actual publish rate on a topic over a measurement window.
+
+ Usage:
+ checker = TopicRateChecker(node, "/social/faces/detections",
+ FaceDetectionArray, window_s=3.0)
+ checker.start()
+ time.sleep(5.0)
+ hz = checker.measured_hz()
+ checker.stop()
+ """
+
+ def __init__(
+ self,
+ node: Node,
+ topic: str,
+ msg_type: Any,
+ window_s: float = 3.0,
+ qos: Optional[QoSProfile] = None,
+ ) -> None:
+ self._window_s = window_s
+ self._timestamps: List[float] = []
+ self._lock = threading.Lock()
+ self._running = False
+
+ if qos is None:
+ qos = QoSProfile(
+ reliability=QoSReliabilityPolicy.BEST_EFFORT,
+ durability=QoSDurabilityPolicy.VOLATILE,
+ history=QoSHistoryPolicy.KEEP_LAST,
+ depth=10,
+ )
+
+ self._sub = node.create_subscription(
+ msg_type, topic, self._cb, qos
+ )
+
+ def _cb(self, _msg: Any) -> None:
+ now = time.time()
+ with self._lock:
+ self._timestamps.append(now)
+ # Prune old samples outside window
+ cutoff = now - self._window_s * 2
+ self._timestamps = [t for t in self._timestamps if t > cutoff]
+
+ def measured_hz(self) -> float:
+ """Return Hz measured over the last window_s seconds."""
+ now = time.time()
+ cutoff = now - self._window_s
+ with self._lock:
+ recent = [t for t in self._timestamps if t > cutoff]
+ if len(recent) < 2:
+ return 0.0
+ return (len(recent) - 1) / (recent[-1] - recent[0])
+
+ def message_count(self) -> int:
+ """Total messages received."""
+ with self._lock:
+ return len(self._timestamps)
+
+ def received_any(self) -> bool:
+ with self._lock:
+ return len(self._timestamps) > 0
+
+
+# ── Service checker ────────────────────────────────────────────────────────────
+
+class ServiceChecker:
+ """Verify a ROS2 service is available and call it."""
+
+ def __init__(self, node: Node) -> None:
+ self._node = node
+
+ def is_available(self, service_name: str, srv_type: Any,
+ timeout_s: float = 5.0) -> bool:
+ """Return True if the service becomes available within timeout_s."""
+ client = self._node.create_client(srv_type, service_name)
+ available = client.wait_for_service(timeout_sec=timeout_s)
+ self._node.destroy_client(client)
+ return available
+
+ def call_service(
+ self,
+ service_name: str,
+ srv_type: Any,
+ request: Any,
+ timeout_s: float = 10.0,
+ ) -> Optional[Any]:
+ """Call a service and return the response, or None on failure."""
+ import rclpy.executors
+ client = self._node.create_client(srv_type, service_name)
+ if not client.wait_for_service(timeout_sec=timeout_s):
+ self._node.destroy_client(client)
+ return None
+ future = client.call_async(request)
+ # Spin until done
+ deadline = time.time() + timeout_s
+ while not future.done() and time.time() < deadline:
+ rclpy.spin_once(self._node, timeout_sec=0.1)
+ self._node.destroy_client(client)
+ if future.done():
+ return future.result()
+ return None
+
+
+# ── GPU memory checker ────────────────────────────────────────────────────────
+
+class GpuMemoryChecker:
+ """Query GPU memory usage. Gracefully no-ops if no GPU available."""
+
+ @staticmethod
+ def total_mb() -> Optional[float]:
+ """Total GPU memory in MB."""
+ return GpuMemoryChecker._query("memory.total")
+
+ @staticmethod
+ def free_mb() -> Optional[float]:
+ """Free GPU memory in MB."""
+ return GpuMemoryChecker._query("memory.free")
+
+ @staticmethod
+ def used_mb() -> Optional[float]:
+ """Used GPU memory in MB."""
+ return GpuMemoryChecker._query("memory.used")
+
+ @staticmethod
+ def _query(field: str) -> Optional[float]:
+ try:
+ out = subprocess.check_output(
+ ["nvidia-smi", f"--query-gpu={field}",
+ "--format=csv,noheader,nounits"],
+ timeout=5,
+ ).decode().strip()
+ return float(out.split("\n")[0].strip())
+ except Exception:
+ pass
+ # Fallback: pycuda
+ try:
+ import pycuda.driver as drv
+ drv.init()
+ ctx = drv.Device(0).make_context()
+ free_b, total_b = drv.mem_get_info()
+ ctx.pop()
+ if field == "memory.free":
+ return free_b / 1024 / 1024
+ elif field == "memory.total":
+ return total_b / 1024 / 1024
+ elif field == "memory.used":
+ return (total_b - free_b) / 1024 / 1024
+ except Exception:
+ pass
+ return None
+
+ @staticmethod
+ def gpu_available() -> bool:
+ return GpuMemoryChecker.total_mb() is not None
+
+
+# ── Process helpers ───────────────────────────────────────────────────────────
+
+def count_zombie_processes(pattern: str = "ros") -> int:
+ """Count zombie processes matching pattern via ps."""
+ try:
+ out = subprocess.check_output(
+ ["ps", "aux"], timeout=5
+ ).decode()
+ count = 0
+ for line in out.splitlines():
+ if "" in line and pattern in line:
+ count += 1
+ return count
+ except Exception:
+ return 0
+
+
+def get_process_pids(name_pattern: str) -> List[int]:
+ """Return PIDs of processes matching name_pattern."""
+ try:
+ out = subprocess.check_output(
+ ["pgrep", "-f", name_pattern], timeout=5
+ ).decode().strip()
+ return [int(p) for p in out.splitlines() if p.strip()]
+ except Exception:
+ return []
diff --git a/jetson/ros2_ws/src/saltybot_social_tests/setup.cfg b/jetson/ros2_ws/src/saltybot_social_tests/setup.cfg
new file mode 100644
index 0000000..5e361db
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_social_tests/setup.cfg
@@ -0,0 +1,4 @@
+[develop]
+script_dir=$base/lib/saltybot_social_tests
+[install]
+install_scripts=$base/lib/saltybot_social_tests
diff --git a/jetson/ros2_ws/src/saltybot_social_tests/setup.py b/jetson/ros2_ws/src/saltybot_social_tests/setup.py
new file mode 100644
index 0000000..07eaff3
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_social_tests/setup.py
@@ -0,0 +1,30 @@
+from setuptools import find_packages, setup
+import os
+from glob import glob
+
+package_name = 'saltybot_social_tests'
+
+setup(
+ name=package_name,
+ version='0.1.0',
+ packages=find_packages(exclude=['test']),
+ data_files=[
+ ('share/ament_index/resource_index/packages',
+ ['resource/' + package_name]),
+ ('share/' + package_name, ['package.xml']),
+ (os.path.join('share', package_name, 'launch'),
+ glob(os.path.join('launch', '*launch.[pxy][yma]*'))),
+ ],
+ install_requires=['setuptools'],
+ zip_safe=True,
+ maintainer='sl-jetson',
+ maintainer_email='seb@vayrette.com',
+ description='Integration test suite for social-bot pipeline (Issue #108)',
+ license='MIT',
+ tests_require=['pytest'],
+ entry_points={
+ 'console_scripts': [
+ 'mock_sensor_pub = saltybot_social_tests.mock_sensors:main',
+ ],
+ },
+)
diff --git a/jetson/ros2_ws/src/saltybot_social_tests/test/__init__.py b/jetson/ros2_ws/src/saltybot_social_tests/test/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/jetson/ros2_ws/src/saltybot_social_tests/test/conftest.py b/jetson/ros2_ws/src/saltybot_social_tests/test/conftest.py
new file mode 100644
index 0000000..09b90b0
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_social_tests/test/conftest.py
@@ -0,0 +1,113 @@
+"""conftest.py — pytest configuration and shared fixtures for social-bot tests.
+
+Custom markers:
+ @pytest.mark.gpu_required — skip if no GPU / nvidia-smi
+ @pytest.mark.full_stack — skip unless SOCIAL_TEST_FULL=1
+ @pytest.mark.stack_running — skip unless SOCIAL_STACK_RUNNING=1
+ @pytest.mark.slow — long tests (>30s); skipped in fast mode
+ @pytest.mark.launch_test — launch_testing marker (framework)
+
+Fixtures:
+ rclpy_node — temporary rclpy probe node (auto-shutdown)
+ ros_domain_id — asserts ROS_DOMAIN_ID=108 for test isolation
+"""
+
+from __future__ import annotations
+
+import os
+import subprocess
+import sys
+import time
+
+import pytest
+
+# ── Custom markers ────────────────────────────────────────────────────────────
+
+
+def pytest_configure(config):
+ config.addinivalue_line("markers",
+ "gpu_required: test requires an NVIDIA GPU (skipped in headless CI)")
+ config.addinivalue_line("markers",
+ "full_stack: test requires SOCIAL_TEST_FULL=1 (full speech/LLM/TTS)")
+ config.addinivalue_line("markers",
+ "stack_running: test requires SOCIAL_STACK_RUNNING=1")
+ config.addinivalue_line("markers",
+ "slow: test takes more than 30 seconds")
+ config.addinivalue_line("markers",
+ "launch_test: launch_testing integration test")
+
+
+def pytest_collection_modifyitems(config, items):
+ """Automatically skip tests whose markers are not satisfied."""
+ full_stack = os.environ.get("SOCIAL_TEST_FULL", "0") == "1"
+ stack_running = os.environ.get("SOCIAL_STACK_RUNNING", "0") == "1"
+ fast_mode = os.environ.get("CI_FAST", "0") == "1"
+
+ gpu_ok = _nvidia_smi_available()
+
+ for item in items:
+ if item.get_closest_marker("gpu_required") and not gpu_ok:
+ item.add_marker(pytest.mark.skip(
+ reason="No GPU available (set SOCIAL_STACK_RUNNING=1 on hardware)"
+ ))
+ if item.get_closest_marker("full_stack") and not full_stack:
+ item.add_marker(pytest.mark.skip(
+ reason="Set SOCIAL_TEST_FULL=1 to enable full-stack tests"
+ ))
+ if item.get_closest_marker("stack_running") and not stack_running:
+ item.add_marker(pytest.mark.skip(
+ reason="Set SOCIAL_STACK_RUNNING=1 to enable stack-running tests"
+ ))
+ if item.get_closest_marker("slow") and fast_mode:
+ item.add_marker(pytest.mark.skip(
+ reason="Slow test skipped in CI_FAST=1 mode"
+ ))
+
+
+# ── Helpers ───────────────────────────────────────────────────────────────────
+
+def _nvidia_smi_available() -> bool:
+ try:
+ subprocess.check_output(
+ ["nvidia-smi", "--query-gpu=name", "--format=csv,noheader"],
+ timeout=5, stderr=subprocess.DEVNULL,
+ )
+ return True
+ except Exception:
+ return False
+
+
+# ── Fixtures ──────────────────────────────────────────────────────────────────
+
+@pytest.fixture(scope="session")
+def ros_domain_id():
+ """Assert ROS_DOMAIN_ID is set to 108 for test isolation."""
+ domain = os.environ.get("ROS_DOMAIN_ID", "0")
+ assert domain == "108", (
+ f"ROS_DOMAIN_ID must be '108' for isolated test runs (got '{domain}'). "
+ "Set: export ROS_DOMAIN_ID=108"
+ )
+ return int(domain)
+
+
+@pytest.fixture(scope="function")
+def rclpy_node(request):
+ """Provide a temporary rclpy probe node; auto-teardown after each test."""
+ import rclpy
+ if not rclpy.ok():
+ rclpy.init()
+ node_name = f"test_probe_{request.node.name[:20].replace('/', '_')}"
+ node = rclpy.create_node(node_name)
+ yield node
+ node.destroy_node()
+
+
+@pytest.fixture(scope="session", autouse=True)
+def rclpy_session():
+ """Init/shutdown rclpy once for the entire test session."""
+ import rclpy
+ if not rclpy.ok():
+ rclpy.init()
+ yield
+ if rclpy.ok():
+ rclpy.shutdown()
diff --git a/jetson/ros2_ws/src/saltybot_social_tests/test/test_gpu_memory.py b/jetson/ros2_ws/src/saltybot_social_tests/test/test_gpu_memory.py
new file mode 100644
index 0000000..5c17ab3
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_social_tests/test/test_gpu_memory.py
@@ -0,0 +1,197 @@
+"""test_gpu_memory.py — GPU memory budget tests for social-bot stack.
+
+Requirements (Orin Nano Super 8GB shared memory):
+ - Full stack allocation must stay under 6GB total GPU usage
+ - Each major model budget:
+ Face detection (SCRFD-10GF TRT): < 100 MB
+ Face recognition (ArcFace TRT): < 300 MB
+ Speaker embedding (ECAPA-TDNN TRT): < 200 MB
+ Whisper STT (CTranslate2): < 700 MB
+ LLM (Phi-3-mini Q4 GGUF, 20 layers): < 2500 MB
+ Piper TTS (ONNX CPU): < 200 MB
+ Tracking (Kalman, CPU): < 10 MB
+ ─────────────────────────────────────────────────
+ Total budget: < 6000 MB (6 GB)
+ Reserve for OS + ROS2: 2000 MB (2 GB)
+
+Tests:
+ 1. GPU available check (skip gracefully in CI without GPU)
+ 2. Used memory at baseline (before stack loads) < 1GB
+ 3. Used memory during full stack < 6GB
+ 4. Memory does not grow continuously (no leak over 30s)
+ 5. After shutdown, memory returns to near-baseline
+"""
+
+from __future__ import annotations
+
+import os
+import sys
+import time
+import unittest
+
+import pytest
+
+sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
+from saltybot_social_tests.test_helpers import GpuMemoryChecker
+
+# ── Budget constants ──────────────────────────────────────────────────────────
+GPU_TOTAL_MB_EXPECTED = 8192.0 # Orin Nano Super 8GB
+GPU_STACK_BUDGET_MB = 6000.0 # full stack must stay under 6GB used
+GPU_BASELINE_MB = 1024.0 # baseline (OS + CUDA context) < 1GB
+GPU_LEAK_TOLERANCE_MB = 200.0 # acceptable growth over 30s measurement window
+
+# Per-model expected budgets (for documentation / CI without full stack)
+MODEL_BUDGETS_MB = {
+ "scrfd_10g_trt": 100,
+ "arcface_r100_trt": 300,
+ "ecapa_tdnn_trt": 200,
+ "whisper_small": 700,
+ "phi3_mini_gguf": 2500,
+ "piper_tts": 200,
+ "tracking": 10,
+}
+TOTAL_MODEL_BUDGET_MB = sum(MODEL_BUDGETS_MB.values()) # 4010 MB
+
+
+def _skip_if_no_gpu():
+ if not GpuMemoryChecker.gpu_available():
+ pytest.skip("No GPU available — skipping GPU memory tests")
+
+
+class TestGpuMemoryBudget(unittest.TestCase):
+ """GPU memory budget tests for the full social-bot stack."""
+
+ def test_gpu_is_available_or_ci(self):
+ """GPU check: either GPU is available, or we're in CI (skip gracefully)."""
+ if os.environ.get("CI", ""):
+ # In CI without GPU, just log and pass
+ print("[INFO] CI mode detected — GPU memory tests will be skipped if no GPU")
+ available = GpuMemoryChecker.gpu_available()
+ if not available:
+ self.skipTest("No GPU available")
+ total = GpuMemoryChecker.total_mb()
+ self.assertIsNotNone(total)
+ print(f"[INFO] GPU total: {total:.0f} MB")
+
+ def test_gpu_total_memory_matches_orin(self):
+ """Total GPU memory should be ~8GB on Orin Nano Super."""
+ _skip_if_no_gpu()
+ total = GpuMemoryChecker.total_mb()
+ # Allow ±10% from 8192MB (shared memory varies by config)
+ self.assertGreater(total, GPU_TOTAL_MB_EXPECTED * 0.5,
+ f"GPU total {total:.0f}MB seems too low for Orin")
+
+ def test_baseline_memory_under_threshold(self):
+ """Baseline GPU usage (before models load) must be < 1GB."""
+ _skip_if_no_gpu()
+ used = GpuMemoryChecker.used_mb()
+ self.assertIsNotNone(used)
+ print(f"[INFO] Baseline GPU used: {used:.0f} MB")
+ # Note: if running after stack is up, this test would need stack to be down
+ # In standalone mode (no stack running), assert low baseline
+ if os.environ.get("SOCIAL_STACK_RUNNING", "0") == "0":
+ self.assertLess(
+ used, GPU_BASELINE_MB,
+ f"Baseline GPU usage {used:.0f}MB >= {GPU_BASELINE_MB}MB"
+ )
+
+ def test_full_stack_memory_under_budget(self):
+ """Full stack GPU usage must stay under 6GB."""
+ _skip_if_no_gpu()
+ if os.environ.get("SOCIAL_STACK_RUNNING", "0") != "1":
+ self.skipTest("Set SOCIAL_STACK_RUNNING=1 to run with stack active")
+
+ used = GpuMemoryChecker.used_mb()
+ self.assertIsNotNone(used)
+ print(f"[INFO] Stack GPU used: {used:.0f} MB / {GPU_STACK_BUDGET_MB:.0f} MB budget")
+ self.assertLess(
+ used, GPU_STACK_BUDGET_MB,
+ f"GPU usage {used:.0f}MB exceeds {GPU_STACK_BUDGET_MB:.0f}MB budget"
+ )
+
+ def test_no_memory_leak_over_30s(self):
+ """GPU memory must not grow by more than 200MB over 30 seconds."""
+ _skip_if_no_gpu()
+ if os.environ.get("SOCIAL_STACK_RUNNING", "0") != "1":
+ self.skipTest("Set SOCIAL_STACK_RUNNING=1 to run with stack active")
+
+ used_start = GpuMemoryChecker.used_mb()
+ print(f"[INFO] GPU used at start: {used_start:.0f} MB")
+
+ # Wait 30s while stack processes mock sensor data
+ time.sleep(30.0)
+
+ used_end = GpuMemoryChecker.used_mb()
+ growth = used_end - used_start
+ print(f"[INFO] GPU used after 30s: {used_end:.0f} MB (growth: {growth:+.0f} MB)")
+
+ self.assertLess(
+ growth, GPU_LEAK_TOLERANCE_MB,
+ f"GPU memory grew by {growth:.0f}MB over 30s — possible leak"
+ )
+
+ def test_memory_frees_after_context_release(self):
+ """Test that pycuda context releases memory properly."""
+ _skip_if_no_gpu()
+ try:
+ import pycuda.driver as drv
+ drv.init()
+
+ ctx = drv.Device(0).make_context()
+ free_in, total_in = drv.mem_get_info()
+
+ # Allocate 100MB
+ alloc = drv.mem_alloc(100 * 1024 * 1024)
+ free_after_alloc, _ = drv.mem_get_info()
+ growth = (free_in - free_after_alloc) / 1024 / 1024
+ self.assertGreater(growth, 50,
+ "100MB allocation didn't register in GPU memory")
+
+ # Free it
+ del alloc
+ free_after_free, _ = drv.mem_get_info()
+ recovered = (free_after_free - free_after_alloc) / 1024 / 1024
+ ctx.pop()
+
+ self.assertGreater(recovered, 50,
+ f"Only {recovered:.0f}MB recovered after freeing 100MB alloc")
+ except ImportError:
+ self.skipTest("pycuda not available")
+
+
+class TestModelBudgetDocumentation(unittest.TestCase):
+ """Verify total model budget is within spec (pure math, no GPU needed)."""
+
+ def test_total_model_budget_under_6gb(self):
+ """Sum of all model budgets must be < 6000MB."""
+ total = sum(MODEL_BUDGETS_MB.values())
+ self.assertLess(
+ total, GPU_STACK_BUDGET_MB,
+ f"Model budgets sum to {total}MB, exceeds {GPU_STACK_BUDGET_MB}MB limit"
+ )
+
+ def test_individual_model_budgets_reasonable(self):
+ """No single model should claim more than 3GB."""
+ for model, budget in MODEL_BUDGETS_MB.items():
+ self.assertLess(
+ budget, 3000,
+ f"Model {model} budget {budget}MB seems unreasonably high"
+ )
+
+ def test_budget_includes_all_models(self):
+ """Verify all expected models are accounted for in the budget."""
+ expected_keys = {
+ "scrfd_10g_trt", "arcface_r100_trt", "ecapa_tdnn_trt",
+ "whisper_small", "phi3_mini_gguf", "piper_tts", "tracking"
+ }
+ self.assertEqual(set(MODEL_BUDGETS_MB.keys()), expected_keys)
+
+ def test_budget_summary(self):
+ """Print human-readable budget summary."""
+ print("\n[GPU Budget Summary]")
+ for model, mb in sorted(MODEL_BUDGETS_MB.items(), key=lambda x: -x[1]):
+ bar = "█" * (mb // 100)
+ print(f" {model:<25} {mb:>5} MB {bar}")
+ print(f" {'TOTAL':<25} {TOTAL_MODEL_BUDGET_MB:>5} MB")
+ print(f" {'LIMIT':<25} {GPU_STACK_BUDGET_MB:>5} MB")
+ print(f" {'RESERVE':<25} {GPU_STACK_BUDGET_MB - TOTAL_MODEL_BUDGET_MB:>5} MB")
diff --git a/jetson/ros2_ws/src/saltybot_social_tests/test/test_latency.py b/jetson/ros2_ws/src/saltybot_social_tests/test/test_latency.py
new file mode 100644
index 0000000..2c9bee7
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_social_tests/test/test_latency.py
@@ -0,0 +1,323 @@
+"""test_latency.py — End-to-end latency profiling for social-bot pipeline.
+
+Pipeline stages and SLAs:
+ Stage SLA (p95) Notes
+ ───────────────────────────── ───────── ─────────────────────────────────
+ wake_word → VAD onset < 100ms OpenWakeWord inference
+ VAD onset → transcript (STT) < 500ms Whisper small, 1s utterance
+ transcript → LLM first token < 500ms Phi-3-mini GGUF, TTFT
+ LLM first token → TTS first < 200ms Piper synthesis, first sentence
+ ─────────────────────────────────────────────────────────────────────────────
+ Full end-to-end (wake→speaker) < 1200ms total pipeline SLA
+
+Tests:
+ 1. STT latency: inject synthetic transcript, measure to LLM response
+ 2. LLM latency: inject transcript, measure time to ConversationResponse
+ 3. TTS latency: inject ConversationResponse, verify audio starts quickly
+ 4. E2E latency: orchestrator state transitions (LISTENING→THINKING→SPEAKING)
+ 5. Percentile report: print p50/p95/p99 for each stage over N runs
+
+In CI (no GPU/models): tests use orchestrator state timestamps to infer latency
+bounds without actually running STT/LLM/TTS.
+"""
+
+from __future__ import annotations
+
+import json
+import os
+import sys
+import time
+import threading
+import unittest
+from collections import defaultdict
+from typing import List, Optional
+
+import rclpy
+from rclpy.node import Node
+from rclpy.qos import QoSProfile
+
+sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
+from saltybot_social_tests.test_helpers import poll_until
+
+# ── SLA constants (milliseconds) ─────────────────────────────────────────────
+SLA_STT_P95_MS = 500.0
+SLA_LLM_TTFT_P95_MS = 500.0
+SLA_TTS_FIRST_P95_MS = 200.0
+SLA_E2E_P95_MS = 1200.0
+
+N_PROBE_RUNS = 5 # number of injection runs for percentile calculation
+
+
+def _percentile(values: List[float], p: float) -> float:
+ if not values:
+ return 0.0
+ s = sorted(values)
+ idx = int(len(s) * p / 100)
+ return s[min(idx, len(s) - 1)]
+
+
+class LatencyProbe(Node):
+ """Injects synthetic messages and measures pipeline response latencies."""
+
+ def __init__(self) -> None:
+ super().__init__("latency_probe")
+ qos = QoSProfile(depth=10)
+
+ # Injectors
+ try:
+ from saltybot_social_msgs.msg import SpeechTranscript, ConversationResponse
+ self._transcript_pub = self.create_publisher(
+ SpeechTranscript, "/social/speech/transcript", qos
+ )
+ self._response_pub = self.create_publisher(
+ ConversationResponse, "/social/conversation/response", qos
+ )
+ self._has_social = True
+ except ImportError:
+ self._has_social = False
+
+ from std_msgs.msg import String
+ self._state_pub = self.create_publisher(String, "/social/test/inject", qos)
+
+ # Receivers
+ from std_msgs.msg import String
+ self._state_sub = self.create_subscription(
+ String, "/social/orchestrator/state",
+ self._on_state, qos
+ )
+
+ if self._has_social:
+ from saltybot_social_msgs.msg import ConversationResponse
+ self._response_sub = self.create_subscription(
+ ConversationResponse, "/social/conversation/response",
+ self._on_response, qos
+ )
+
+ # Timing data
+ self._inject_t: Optional[float] = None
+ self._response_times: List[float] = []
+ self._state_transitions: List[tuple] = [] # (from, to, elapsed_ms)
+ self._last_state: Optional[str] = None
+ self._last_state_t: float = time.time()
+ self._lock = threading.Lock()
+
+ def _on_state(self, msg) -> None:
+ try:
+ data = json.loads(msg.data)
+ new_state = data.get("state", "")
+ now = time.time()
+ with self._lock:
+ if self._last_state and self._last_state != new_state:
+ elapsed = (now - self._last_state_t) * 1000
+ self._state_transitions.append(
+ (self._last_state, new_state, elapsed)
+ )
+ self._last_state = new_state
+ self._last_state_t = now
+ except Exception:
+ pass
+
+ def _on_response(self, msg) -> None:
+ with self._lock:
+ if self._inject_t is not None:
+ latency_ms = (time.time() - self._inject_t) * 1000
+ self._response_times.append(latency_ms)
+ self._inject_t = None
+
+ def inject_transcript(self, text: str = "Hello Salty, what time is it?") -> None:
+ """Publish a synthetic final transcript to trigger LLM."""
+ if not self._has_social:
+ return
+ from saltybot_social_msgs.msg import SpeechTranscript
+ msg = SpeechTranscript()
+ msg.header.stamp = self.get_clock().now().to_msg()
+ msg.text = text
+ msg.speaker_id = "test_user"
+ msg.confidence = 0.99
+ msg.audio_duration = 1.2
+ msg.is_partial = False
+ with self._lock:
+ self._inject_t = time.time()
+ self._transcript_pub.publish(msg)
+
+ def get_response_latencies(self) -> List[float]:
+ with self._lock:
+ return list(self._response_times)
+
+ def get_state_transitions(self) -> List[tuple]:
+ with self._lock:
+ return list(self._state_transitions)
+
+ def clear(self) -> None:
+ with self._lock:
+ self._response_times.clear()
+ self._state_transitions.clear()
+ self._inject_t = None
+
+
+class TestPipelineLatency(unittest.TestCase):
+ """End-to-end pipeline latency tests."""
+
+ @classmethod
+ def setUpClass(cls):
+ if not rclpy.ok():
+ rclpy.init()
+ cls.probe = LatencyProbe()
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.probe.destroy_node()
+
+ def _spin_for(self, duration_s: float) -> None:
+ deadline = time.time() + duration_s
+ while time.time() < deadline:
+ rclpy.spin_once(self.probe, timeout_sec=0.05)
+
+ def test_orchestrator_state_transitions_logged(self):
+ """Verify orchestrator transitions are captured by the probe."""
+ self.probe.clear()
+ self._spin_for(3.0) # wait for state messages
+
+ # Inject a transcript to trigger IDLE→LISTENING→THINKING
+ if self.probe._has_social:
+ self.probe.inject_transcript()
+ self._spin_for(5.0)
+
+ transitions = self.probe.get_state_transitions()
+ # If the stack is running with conversation_node, we should see transitions.
+ # In CI without LLM, orchestrator stays IDLE — that's OK.
+ print(f"\n[Latency] State transitions captured: {len(transitions)}")
+ for frm, to, ms in transitions:
+ print(f" {frm} → {to}: {ms:.0f}ms")
+
+ def test_llm_response_latency_sla(self):
+ """LLM must produce first ConversationResponse within 500ms p95."""
+ if os.environ.get("SOCIAL_TEST_FULL", "0") != "1":
+ self.skipTest("Set SOCIAL_TEST_FULL=1 to test with LLM running")
+ if not self.probe._has_social:
+ self.skipTest("saltybot_social_msgs not available")
+
+ self.probe.clear()
+ latencies = []
+
+ for i in range(N_PROBE_RUNS):
+ self.probe.clear()
+ self.probe.inject_transcript(f"Hello, run number {i}")
+ # Wait up to 5s for response
+ deadline = time.time() + 5.0
+ while time.time() < deadline:
+ rclpy.spin_once(self.probe, timeout_sec=0.05)
+ lats = self.probe.get_response_latencies()
+ if lats:
+ latencies.extend(lats)
+ break
+ time.sleep(1.0) # cooldown between runs
+
+ if not latencies:
+ self.fail("No LLM responses received in any of the test runs")
+
+ p50 = _percentile(latencies, 50)
+ p95 = _percentile(latencies, 95)
+ print(f"\n[Latency] LLM response: n={len(latencies)}, "
+ f"p50={p50:.0f}ms, p95={p95:.0f}ms")
+
+ self.assertLess(
+ p95, SLA_LLM_TTFT_P95_MS,
+ f"LLM p95 latency {p95:.0f}ms exceeds SLA {SLA_LLM_TTFT_P95_MS:.0f}ms"
+ )
+
+ def test_e2e_state_machine_latency(self):
+ """IDLE→THINKING transition must complete within 1200ms p95."""
+ if not self.probe._has_social:
+ self.skipTest("saltybot_social_msgs not available")
+
+ self.probe.clear()
+ thinking_latencies = []
+
+ for _ in range(N_PROBE_RUNS):
+ self.probe.clear()
+
+ # Wait for IDLE state
+ def is_idle():
+ rclpy.spin_once(self.probe, timeout_sec=0.1)
+ return self.probe._last_state == "idle"
+ poll_until(is_idle, timeout_s=5.0)
+
+ # Inject transcript and measure time to THINKING
+ t0 = time.time()
+ self.probe.inject_transcript()
+
+ def is_thinking():
+ rclpy.spin_once(self.probe, timeout_sec=0.1)
+ return self.probe._last_state in ("thinking", "speaking")
+ reached = poll_until(is_thinking, timeout_s=5.0)
+
+ if reached:
+ elapsed_ms = (time.time() - t0) * 1000
+ thinking_latencies.append(elapsed_ms)
+
+ time.sleep(2.0) # allow pipeline to return to IDLE
+
+ if not thinking_latencies:
+ # In CI without full stack, this is expected
+ print("[Latency] No THINKING transitions — LLM/speech may not be running")
+ return
+
+ p95 = _percentile(thinking_latencies, 95)
+ print(f"\n[Latency] Inject→THINKING: n={len(thinking_latencies)}, "
+ f"p95={p95:.0f}ms")
+
+ self.assertLess(
+ p95, SLA_E2E_P95_MS,
+ f"E2E p95 {p95:.0f}ms exceeds SLA {SLA_E2E_P95_MS:.0f}ms"
+ )
+
+
+class TestLatencyReport(unittest.TestCase):
+ """Generate a latency report from orchestrator state data."""
+
+ @classmethod
+ def setUpClass(cls):
+ if not rclpy.ok():
+ rclpy.init()
+ cls.node = rclpy.create_node("latency_report_probe")
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.node.destroy_node()
+
+ def test_orchestrator_reports_latency_stats(self):
+ """Orchestrator state JSON must include latency field when profiling enabled."""
+ from std_msgs.msg import String
+ received = []
+ qos = QoSProfile(depth=10)
+ sub = self.node.create_subscription(
+ String, "/social/orchestrator/state",
+ lambda msg: received.append(msg.data), qos
+ )
+
+ deadline = time.time() + 5.0
+ while time.time() < deadline and not received:
+ rclpy.spin_once(self.node, timeout_sec=0.1)
+ self.node.destroy_subscription(sub)
+
+ if not received:
+ self.skipTest("Orchestrator not running")
+
+ state = json.loads(received[-1])
+ self.assertIn("latency", state,
+ "Orchestrator state JSON missing 'latency' field")
+ self.assertIn("ts", state)
+ print(f"\n[Latency] Orchestrator report: {json.dumps(state['latency'], indent=2)}")
+
+ def test_latency_sla_table(self):
+ """Print SLA table for documentation — always passes."""
+ print("\n[Latency SLA Table]")
+ slas = [
+ ("wake_word → transcript (STT)", SLA_STT_P95_MS),
+ ("transcript → LLM first token", SLA_LLM_TTFT_P95_MS),
+ ("LLM token → TTS first chunk", SLA_TTS_FIRST_P95_MS),
+ ("end-to-end (wake → speaker)", SLA_E2E_P95_MS),
+ ]
+ for stage, sla in slas:
+ print(f" {stage:<40} p95 < {sla:.0f}ms")
diff --git a/jetson/ros2_ws/src/saltybot_social_tests/test/test_launch.py b/jetson/ros2_ws/src/saltybot_social_tests/test/test_launch.py
new file mode 100644
index 0000000..4f602d2
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_social_tests/test/test_launch.py
@@ -0,0 +1,208 @@
+"""test_launch.py — Launch integration test: verify all social-bot nodes start.
+
+Uses launch_testing framework to:
+ 1. Start social_test.launch.py
+ 2. Verify all expected nodes appear in the graph within 30s
+ 3. Verify no node exits with an error code
+ 4. Verify key topics are advertising (even if no messages yet)
+
+Run with:
+ pytest test_launch.py -v
+ ros2 launch launch_testing launch_test.py /test_launch.py
+"""
+
+import os
+import sys
+import time
+import unittest
+
+import launch
+import launch_ros
+import launch_testing
+import launch_testing.actions
+import launch_testing.markers
+import pytest
+import rclpy
+from ament_index_python.packages import get_package_share_directory
+from launch import LaunchDescription
+from launch.actions import IncludeLaunchDescription, TimerAction
+from launch.launch_description_sources import PythonLaunchDescriptionSource
+from launch_ros.actions import Node
+
+import sys
+sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
+from saltybot_social_tests.test_helpers import NodeChecker, poll_until
+
+
+# ── Nodes that must be alive within 30s in default CI mode ───────────────────
+REQUIRED_NODES = [
+ "orchestrator_node",
+ "face_recognition_node",
+ "tracking_fusion_node",
+ "mock_sensor_pub",
+]
+
+# Nodes only required in full mode (SOCIAL_TEST_FULL=1)
+FULL_MODE_NODES = [
+ "speech_pipeline_node",
+ "conversation_node",
+ "tts_node",
+]
+
+NODE_STARTUP_TIMEOUT_S = 30.0
+
+
+# ── launch_testing fixture ────────────────────────────────────────────────────
+
+@pytest.mark.launch_test
+def generate_test_description():
+ """Return the LaunchDescription to use for the launch test."""
+ tests_pkg = get_package_share_directory("saltybot_social_tests")
+ launch_file = os.path.join(tests_pkg, "launch", "social_test.launch.py")
+
+ return LaunchDescription([
+ IncludeLaunchDescription(
+ PythonLaunchDescriptionSource(launch_file),
+ launch_arguments={
+ "enable_speech": "false",
+ "enable_llm": "false",
+ "enable_tts": "false",
+ }.items(),
+ ),
+ # Signal launch_testing that setup is done
+ launch_testing.actions.ReadyToTest(),
+ ])
+
+
+# ── Test cases ────────────────────────────────────────────────────────────────
+
+class TestSocialBotLaunch(unittest.TestCase):
+ """Integration tests that run against the launched social-bot stack."""
+
+ @classmethod
+ def setUpClass(cls):
+ rclpy.init()
+ cls.node = rclpy.create_node("social_launch_test_probe")
+ cls.checker = NodeChecker(cls.node)
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.node.destroy_node()
+ rclpy.shutdown()
+
+ def _spin_briefly(self, duration_s: float = 0.5) -> None:
+ deadline = time.time() + duration_s
+ while time.time() < deadline:
+ rclpy.spin_once(self.node, timeout_sec=0.1)
+
+ # ── Test 1: All required nodes alive within 30s ───────────────────────────
+
+ def test_required_nodes_start(self):
+ """All required nodes must appear in the graph within 30 seconds."""
+ results = self.checker.wait_for_nodes(
+ REQUIRED_NODES, timeout_s=NODE_STARTUP_TIMEOUT_S
+ )
+ missing = [n for n, found in results.items() if not found]
+ self.assertEqual(
+ missing, [],
+ f"Nodes did not start within {NODE_STARTUP_TIMEOUT_S}s: {missing}"
+ )
+
+ # ── Test 2: Expected topics are advertised ────────────────────────────────
+
+ def test_expected_topics_advertised(self):
+ """Key topics must exist in the topic graph."""
+ self._spin_briefly(2.0)
+
+ required_topics = [
+ "/social/orchestrator/state",
+ "/social/faces/detections",
+ "/social/tracking/fused_target",
+ # mock publishers
+ "/camera/color/image_raw",
+ "/uwb/target",
+ ]
+
+ all_topics = {name for name, _ in self.node.get_topic_names_and_types()}
+ missing = [t for t in required_topics if t not in all_topics]
+ self.assertEqual(
+ missing, [],
+ f"Topics not advertised: {missing}\nAll topics: {sorted(all_topics)}"
+ )
+
+ # ── Test 3: Orchestrator publishes state ──────────────────────────────────
+
+ def test_orchestrator_state_publishes(self):
+ """orchestrator_node must publish /social/orchestrator/state within 5s."""
+ from std_msgs.msg import String
+ received = []
+
+ sub = self.node.create_subscription(
+ String, "/social/orchestrator/state",
+ lambda msg: received.append(msg.data), 10
+ )
+ deadline = time.time() + 5.0
+ while time.time() < deadline and not received:
+ rclpy.spin_once(self.node, timeout_sec=0.2)
+
+ self.node.destroy_subscription(sub)
+ self.assertTrue(
+ len(received) > 0,
+ "orchestrator_node did not publish /social/orchestrator/state within 5s"
+ )
+ # Validate JSON structure
+ import json
+ state = json.loads(received[0])
+ self.assertIn("state", state)
+ self.assertIn(state["state"],
+ ["idle", "listening", "thinking", "speaking", "throttled"])
+
+ # ── Test 4: Face detection topic advertised at correct QoS ───────────────
+
+ def test_face_detection_topic_qos(self):
+ """Face detection topic must exist and accept BEST_EFFORT subscribers."""
+ from rclpy.qos import QoSProfile, QoSReliabilityPolicy
+ from saltybot_social_msgs.msg import FaceDetectionArray
+
+ received = []
+ qos = QoSProfile(reliability=QoSReliabilityPolicy.BEST_EFFORT, depth=10)
+ sub = self.node.create_subscription(
+ FaceDetectionArray, "/social/faces/detections",
+ lambda msg: received.append(True), qos
+ )
+ deadline = time.time() + 5.0
+ while time.time() < deadline and not received:
+ rclpy.spin_once(self.node, timeout_sec=0.2)
+ self.node.destroy_subscription(sub)
+ self.assertTrue(
+ len(received) > 0,
+ "/social/faces/detections received no messages within 5s"
+ )
+
+ # ── Test 5: No node exits with error immediately ──────────────────────────
+
+ def test_no_immediate_node_exits(self):
+ """All required nodes should still be alive after 5s (no instant crash)."""
+ time.sleep(2.0)
+ self._spin_briefly(1.0)
+ results = self.checker.wait_for_nodes(REQUIRED_NODES, timeout_s=2.0)
+ crashed = [n for n, found in results.items() if not found]
+ self.assertEqual(
+ crashed, [],
+ f"Nodes crashed or exited: {crashed}"
+ )
+
+
+# ── Post-shutdown checks (run after the launch is torn down) ──────────────────
+
+@launch_testing.post_shutdown_test()
+class TestSocialBotShutdownBehavior(unittest.TestCase):
+ """Tests that run after the launch has been shut down."""
+
+ def test_processes_exited_cleanly(self, proc_info):
+ """All launched processes must exit with code 0 or SIGTERM (-15)."""
+ # Allow SIGTERM (returncode -15) as a graceful shutdown
+ launch_testing.asserts.assertExitCodes(
+ proc_info,
+ allowable_exit_codes=[0, -15, launch_testing.asserts.EXIT_OK],
+ )
diff --git a/jetson/ros2_ws/src/saltybot_social_tests/test/test_services.py b/jetson/ros2_ws/src/saltybot_social_tests/test/test_services.py
new file mode 100644
index 0000000..ef633c9
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_social_tests/test/test_services.py
@@ -0,0 +1,423 @@
+"""test_services.py — Verify social-bot ROS2 services are available and respond.
+
+Services tested:
+ /social/enroll (saltybot_social_msgs/EnrollPerson)
+ /social/persons/list (saltybot_social_msgs/ListPersons)
+ /social/persons/delete (saltybot_social_msgs/DeletePerson)
+ /social/persons/update (saltybot_social_msgs/UpdatePerson)
+ /social/query_mood (saltybot_social_msgs/QueryMood)
+ /social/nav/set_mode (std_srvs/SetString or saltybot_social_msgs/SetNavMode)
+
+Each test:
+ 1. Verifies the service is available (wait_for_service <= 10s)
+ 2. Calls the service with a valid request
+ 3. Asserts response is non-None and contains expected fields
+"""
+
+from __future__ import annotations
+
+import os
+import sys
+import time
+import unittest
+
+import rclpy
+from rclpy.node import Node
+
+sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
+from saltybot_social_tests.test_helpers import ServiceChecker
+
+SERVICE_AVAILABILITY_TIMEOUT_S = 10.0
+
+
+class TestSocialServices(unittest.TestCase):
+ """Integration tests for social-bot ROS2 service interface."""
+
+ @classmethod
+ def setUpClass(cls):
+ if not rclpy.ok():
+ rclpy.init()
+ cls.node = rclpy.create_node("service_test_probe")
+ cls.checker = ServiceChecker(cls.node)
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.node.destroy_node()
+
+ # ── Enroll service ─────────────────────────────────────────────────────────
+
+ def test_enroll_service_available(self):
+ """/ social/enroll must be available within 10s."""
+ try:
+ from saltybot_social_msgs.srv import EnrollPerson
+ except ImportError:
+ self.skipTest("saltybot_social_msgs not installed")
+
+ available = self.checker.is_available(
+ "/social/enroll", EnrollPerson,
+ timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
+ )
+ self.assertTrue(available, "/social/enroll service not available")
+
+ def test_enroll_service_responds(self):
+ """EnrollPerson service must respond with success or graceful failure."""
+ try:
+ from saltybot_social_msgs.srv import EnrollPerson
+ except ImportError:
+ self.skipTest("saltybot_social_msgs not installed")
+
+ req = EnrollPerson.Request()
+ req.name = "TestPerson_CI"
+ req.n_samples = 1 # minimal for CI
+ req.mode = "face"
+
+ response = self.checker.call_service(
+ "/social/enroll", EnrollPerson, req,
+ timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
+ )
+ self.assertIsNotNone(response, "/social/enroll did not respond")
+ # Response can be success=False (no camera data) — that's fine
+ # We just need a response, not success
+ self.assertTrue(
+ hasattr(response, "success"),
+ "EnrollPerson response missing 'success' field"
+ )
+
+ # ── List persons service ───────────────────────────────────────────────────
+
+ def test_list_persons_service_available(self):
+ """/social/persons/list must be available within 10s."""
+ try:
+ from saltybot_social_msgs.srv import ListPersons
+ except ImportError:
+ self.skipTest("saltybot_social_msgs not installed")
+
+ available = self.checker.is_available(
+ "/social/persons/list", ListPersons,
+ timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
+ )
+ self.assertTrue(available, "/social/persons/list service not available")
+
+ def test_list_persons_service_responds(self):
+ """/social/persons/list must return a list (possibly empty)."""
+ try:
+ from saltybot_social_msgs.srv import ListPersons
+ except ImportError:
+ self.skipTest("saltybot_social_msgs not installed")
+
+ req = ListPersons.Request()
+ response = self.checker.call_service(
+ "/social/persons/list", ListPersons, req,
+ timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
+ )
+ self.assertIsNotNone(response, "/social/persons/list did not respond")
+ self.assertTrue(
+ hasattr(response, "persons"),
+ "ListPersons response missing 'persons' field"
+ )
+ self.assertIsInstance(response.persons, list)
+
+ # ── Delete person service ─────────────────────────────────────────────────
+
+ def test_delete_person_service_available(self):
+ """/social/persons/delete must be available."""
+ try:
+ from saltybot_social_msgs.srv import DeletePerson
+ except ImportError:
+ self.skipTest("saltybot_social_msgs not installed")
+
+ available = self.checker.is_available(
+ "/social/persons/delete", DeletePerson,
+ timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
+ )
+ self.assertTrue(available, "/social/persons/delete not available")
+
+ def test_delete_nonexistent_person_returns_graceful_error(self):
+ """Deleting a non-existent person must not crash — graceful False."""
+ try:
+ from saltybot_social_msgs.srv import DeletePerson
+ except ImportError:
+ self.skipTest("saltybot_social_msgs not installed")
+
+ req = DeletePerson.Request()
+ req.person_id = 99999 # definitely doesn't exist
+
+ response = self.checker.call_service(
+ "/social/persons/delete", DeletePerson, req,
+ timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
+ )
+ self.assertIsNotNone(response)
+ # Should return success=False, not crash
+ self.assertFalse(
+ response.success,
+ "Deleting non-existent person should return success=False"
+ )
+
+ # ── Update person service ─────────────────────────────────────────────────
+
+ def test_update_person_service_available(self):
+ """/social/persons/update must be available."""
+ try:
+ from saltybot_social_msgs.srv import UpdatePerson
+ except ImportError:
+ self.skipTest("saltybot_social_msgs not installed")
+
+ available = self.checker.is_available(
+ "/social/persons/update", UpdatePerson,
+ timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
+ )
+ self.assertTrue(available, "/social/persons/update not available")
+
+ # ── Mood query service ────────────────────────────────────────────────────
+
+ def test_query_mood_service_available(self):
+ """/social/query_mood must be available (personality_node)."""
+ try:
+ from saltybot_social_msgs.srv import QueryMood
+ except ImportError:
+ self.skipTest("saltybot_social_msgs not installed")
+
+ available = self.checker.is_available(
+ "/social/query_mood", QueryMood,
+ timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
+ )
+ self.assertTrue(available, "/social/query_mood not available")
+
+ def test_query_mood_returns_valid_state(self):
+ """QueryMood must return a valid mood with valence in [-1, 1]."""
+ try:
+ from saltybot_social_msgs.srv import QueryMood
+ except ImportError:
+ self.skipTest("saltybot_social_msgs not installed")
+
+ req = QueryMood.Request()
+ response = self.checker.call_service(
+ "/social/query_mood", QueryMood, req,
+ timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
+ )
+ self.assertIsNotNone(response, "/social/query_mood did not respond")
+ if hasattr(response, "mood"):
+ mood = response.mood
+ if hasattr(mood, "valence"):
+ self.assertGreaterEqual(mood.valence, -1.0)
+ self.assertLessEqual(mood.valence, 1.0)
+
+
+# ── Nav set-mode service ──────────────────────────────────────────────────────
+
+class TestNavSetModeService(unittest.TestCase):
+ """/social/nav/set_mode — change follow mode at runtime."""
+
+ # Valid follow modes understood by social_nav_node
+ VALID_MODES = ["shadow", "lead", "side", "orbit", "loose", "tight", "stop"]
+
+ @classmethod
+ def setUpClass(cls):
+ if not rclpy.ok():
+ rclpy.init()
+ cls.node = rclpy.create_node("nav_set_mode_test_probe")
+ cls.checker = ServiceChecker(cls.node)
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.node.destroy_node()
+
+ def _try_import_set_mode_srv(self):
+ """Return the SetNavMode srv type, falling back to std_srvs."""
+ try:
+ # Custom service (future): saltybot_social_msgs/srv/SetNavMode
+ from saltybot_social_msgs.srv import SetNavMode
+ return SetNavMode, "custom"
+ except ImportError:
+ pass
+ try:
+ # Generic string service fallback (std_srvs doesn't have SetString
+ # in Humble; use rcl_interfaces/srv/SetParameters as canary)
+ from std_srvs.srv import Trigger
+ return Trigger, "trigger"
+ except ImportError:
+ return None, None
+
+ def test_nav_set_mode_service_available(self):
+ """/social/nav/set_mode must be available within 10s."""
+ srv_type, variant = self._try_import_set_mode_srv()
+ if srv_type is None:
+ self.skipTest("No suitable srv type found for /social/nav/set_mode")
+
+ available = self.checker.is_available(
+ "/social/nav/set_mode", srv_type,
+ timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
+ )
+ self.assertTrue(
+ available,
+ "/social/nav/set_mode service not available within "
+ f"{SERVICE_AVAILABILITY_TIMEOUT_S}s"
+ )
+
+ def test_nav_set_mode_shadow_responds(self):
+ """Setting mode 'shadow' must return a response (not crash)."""
+ srv_type, variant = self._try_import_set_mode_srv()
+ if srv_type is None:
+ self.skipTest("No suitable srv type for /social/nav/set_mode")
+
+ if variant == "custom":
+ req = srv_type.Request()
+ req.mode = "shadow"
+ else:
+ req = srv_type.Request() # Trigger: no payload
+
+ response = self.checker.call_service(
+ "/social/nav/set_mode", srv_type, req,
+ timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
+ )
+ self.assertIsNotNone(
+ response, "/social/nav/set_mode did not respond to 'shadow' mode"
+ )
+ if variant == "custom":
+ self.assertTrue(
+ hasattr(response, "success"),
+ "SetNavMode response missing 'success' field"
+ )
+
+ def test_nav_set_mode_invalid_mode_returns_graceful_error(self):
+ """Sending an invalid mode string must not crash the node."""
+ srv_type, variant = self._try_import_set_mode_srv()
+ if srv_type is None or variant != "custom":
+ self.skipTest("Custom SetNavMode srv required for invalid-mode test")
+
+ req = srv_type.Request()
+ req.mode = "teleport_to_moon" # definitely invalid
+
+ response = self.checker.call_service(
+ "/social/nav/set_mode", srv_type, req,
+ timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
+ )
+ self.assertIsNotNone(
+ response, "/social/nav/set_mode crashed on invalid mode"
+ )
+ # Must return success=False for unknown mode, not crash
+ self.assertFalse(
+ response.success,
+ "Invalid mode 'teleport_to_moon' should return success=False"
+ )
+
+ def test_nav_set_mode_cycles_all_valid_modes(self):
+ """Cycle through all valid modes and verify each returns a response."""
+ srv_type, variant = self._try_import_set_mode_srv()
+ if srv_type is None or variant != "custom":
+ self.skipTest("Custom SetNavMode srv required for mode cycling test")
+
+ for mode in self.VALID_MODES:
+ req = srv_type.Request()
+ req.mode = mode
+ response = self.checker.call_service(
+ "/social/nav/set_mode", srv_type, req,
+ timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
+ )
+ self.assertIsNotNone(
+ response, f"/social/nav/set_mode did not respond to mode '{mode}'"
+ )
+
+ def test_nav_mode_topic_reflects_service_call(self):
+ """/social/nav/mode topic must reflect the mode set via the service."""
+ from std_msgs.msg import String
+ srv_type, variant = self._try_import_set_mode_srv()
+ if srv_type is None or variant != "custom":
+ self.skipTest("Custom SetNavMode srv required for mode-reflect test")
+
+ # Subscribe to mode topic
+ received_modes = []
+ sub = self.node.create_subscription(
+ String, "/social/nav/mode",
+ lambda msg: received_modes.append(msg.data), 10
+ )
+
+ # Request LEAD mode
+ req = srv_type.Request()
+ req.mode = "lead"
+ response = self.checker.call_service(
+ "/social/nav/set_mode", srv_type, req,
+ timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
+ )
+ if response is None or not response.success:
+ self.node.destroy_subscription(sub)
+ self.skipTest("/social/nav/set_mode did not accept 'lead' mode")
+
+ # Wait for mode topic to update
+ import rclpy as _rclpy
+ deadline = time.time() + 3.0
+ while time.time() < deadline and "lead" not in received_modes:
+ _rclpy.spin_once(self.node, timeout_sec=0.1)
+ self.node.destroy_subscription(sub)
+
+ self.assertIn(
+ "lead", received_modes,
+ "/social/nav/mode did not reflect 'lead' after service call"
+ )
+
+
+class TestServiceResponseTimes(unittest.TestCase):
+ """Verify services respond within acceptable latency."""
+
+ SLA_MS = 500.0 # services must respond within 500ms
+
+ @classmethod
+ def setUpClass(cls):
+ if not rclpy.ok():
+ rclpy.init()
+ cls.node = rclpy.create_node("service_latency_probe")
+ cls.checker = ServiceChecker(cls.node)
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.node.destroy_node()
+
+ def _measure_service_latency(self, svc_name: str, srv_type, request) -> float:
+ """Return response time in ms, or -1 on failure."""
+ client = self.node.create_client(srv_type, svc_name)
+ if not client.wait_for_service(timeout_sec=10.0):
+ self.node.destroy_client(client)
+ return -1.0
+ t0 = time.perf_counter()
+ future = client.call_async(request)
+ deadline = time.time() + 5.0
+ while not future.done() and time.time() < deadline:
+ rclpy.spin_once(self.node, timeout_sec=0.05)
+ self.node.destroy_client(client)
+ if future.done():
+ return (time.perf_counter() - t0) * 1000
+ return -1.0
+
+ def test_list_persons_response_time(self):
+ """ListPersons must respond within 500ms."""
+ try:
+ from saltybot_social_msgs.srv import ListPersons
+ except ImportError:
+ self.skipTest("saltybot_social_msgs not installed")
+
+ latency_ms = self._measure_service_latency(
+ "/social/persons/list", ListPersons, ListPersons.Request()
+ )
+ self.assertGreater(latency_ms, 0,
+ "/social/persons/list did not respond")
+ self.assertLess(
+ latency_ms, self.SLA_MS,
+ f"/social/persons/list response time {latency_ms:.0f}ms > {self.SLA_MS}ms SLA"
+ )
+
+ def test_query_mood_response_time(self):
+ """QueryMood must respond within 500ms."""
+ try:
+ from saltybot_social_msgs.srv import QueryMood
+ except ImportError:
+ self.skipTest("saltybot_social_msgs not installed")
+
+ latency_ms = self._measure_service_latency(
+ "/social/query_mood", QueryMood, QueryMood.Request()
+ )
+ self.assertGreater(latency_ms, 0,
+ "/social/query_mood did not respond")
+ self.assertLess(
+ latency_ms, self.SLA_MS,
+ f"/social/query_mood response time {latency_ms:.0f}ms > {self.SLA_MS}ms SLA"
+ )
diff --git a/jetson/ros2_ws/src/saltybot_social_tests/test/test_shutdown.py b/jetson/ros2_ws/src/saltybot_social_tests/test/test_shutdown.py
new file mode 100644
index 0000000..8d9440d
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_social_tests/test/test_shutdown.py
@@ -0,0 +1,227 @@
+"""test_shutdown.py — Graceful shutdown verification for social-bot stack.
+
+Tests:
+ 1. Clean shutdown: all nodes terminate within 10s of SIGTERM
+ 2. No zombie processes remain after shutdown
+ 3. GPU memory returns to near-baseline after shutdown
+ 4. No file descriptor leaks (via /proc on Linux)
+ 5. No orphaned rosbag / pycuda / audio processes
+ 6. PyAudio streams properly closed (no held /dev/snd device)
+
+These tests are designed to run AFTER the stack has been shut down.
+They can also be run standalone to verify post-shutdown state.
+"""
+
+from __future__ import annotations
+
+import os
+import subprocess
+import sys
+import time
+import unittest
+
+sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
+from saltybot_social_tests.test_helpers import (
+ GpuMemoryChecker, count_zombie_processes, get_process_pids
+)
+
+# ── Shutdown timing constants ─────────────────────────────────────────────────
+MAX_SHUTDOWN_S = 10.0 # nodes must stop within 10s of receiving SIGTERM
+BASELINE_DRIFT_MB = 300.0 # GPU memory allowed to drift above pre-launch baseline
+
+
+class TestGracefulShutdown(unittest.TestCase):
+ """Verify clean shutdown with no zombies or resource leaks."""
+
+ # ── Test 1: No zombie processes ────────────────────────────────────────────
+
+ def test_no_zombie_ros_processes(self):
+ """No zombie (defunct) ROS2 processes after stack shutdown."""
+ count = count_zombie_processes("ros")
+ self.assertEqual(
+ count, 0,
+ f"Found {count} zombie ROS2 processes after shutdown"
+ )
+
+ def test_no_zombie_python_processes(self):
+ """No zombie Python processes (from node children) after shutdown."""
+ count = count_zombie_processes("python")
+ self.assertEqual(
+ count, 0,
+ f"Found {count} zombie Python processes after shutdown"
+ )
+
+ # ── Test 2: All social-bot nodes terminated ───────────────────────────────
+
+ def test_orchestrator_node_terminated(self):
+ """orchestrator_node must not have any live PIDs after shutdown."""
+ pids = get_process_pids("orchestrator_node")
+ self.assertEqual(
+ pids, [],
+ f"orchestrator_node still running with PIDs: {pids}"
+ )
+
+ def test_speech_pipeline_node_terminated(self):
+ """speech_pipeline_node must not have any live PIDs after shutdown."""
+ pids = get_process_pids("speech_pipeline_node")
+ self.assertEqual(
+ pids, [],
+ f"speech_pipeline_node still running: {pids}"
+ )
+
+ def test_face_recognition_node_terminated(self):
+ """face_recognition_node must not have any live PIDs after shutdown."""
+ pids = get_process_pids("face_recognition_node")
+ self.assertEqual(
+ pids, [],
+ f"face_recognition_node still running: {pids}"
+ )
+
+ def test_conversation_node_terminated(self):
+ pids = get_process_pids("conversation_node")
+ self.assertEqual(pids, [],
+ f"conversation_node still running: {pids}")
+
+ def test_tts_node_terminated(self):
+ pids = get_process_pids("tts_node")
+ self.assertEqual(pids, [],
+ f"tts_node still running: {pids}")
+
+ # ── Test 3: GPU memory released ───────────────────────────────────────────
+
+ def test_gpu_memory_released_after_shutdown(self):
+ """GPU used memory must not be significantly above baseline."""
+ if not GpuMemoryChecker.gpu_available():
+ self.skipTest("No GPU available")
+ if os.environ.get("GPU_BASELINE_MB", "") == "":
+ self.skipTest("Set GPU_BASELINE_MB env var to enable this test")
+
+ baseline_mb = float(os.environ["GPU_BASELINE_MB"])
+ current_used = GpuMemoryChecker.used_mb()
+ drift = current_used - baseline_mb
+ print(f"[Shutdown] GPU: baseline={baseline_mb:.0f}MB, "
+ f"now={current_used:.0f}MB, drift={drift:+.0f}MB")
+
+ self.assertLess(
+ drift, BASELINE_DRIFT_MB,
+ f"GPU memory {drift:+.0f}MB above baseline after shutdown — possible leak"
+ )
+
+ # ── Test 4: Audio device released ─────────────────────────────────────────
+
+ def test_audio_device_released(self):
+ """No process should hold /dev/snd after shutdown (Linux only)."""
+ if not os.path.exists("/dev/snd"):
+ self.skipTest("/dev/snd not present (not Linux or no sound card)")
+
+ try:
+ # fuser returns exit code 0 if device is in use
+ result = subprocess.run(
+ ["fuser", "/dev/snd"],
+ capture_output=True, timeout=5
+ )
+ holders = result.stdout.decode().strip()
+ if holders:
+ # Check if any of those PIDs are our social-bot nodes
+ social_patterns = [
+ "speech_pipeline", "tts_node", "mock_sensor", "pyaudio"
+ ]
+ held_by_social = False
+ for pid_str in holders.split():
+ try:
+ pid = int(pid_str)
+ cmdline = open(f"/proc/{pid}/cmdline").read()
+ if any(p in cmdline for p in social_patterns):
+ held_by_social = True
+ break
+ except Exception:
+ continue
+
+ self.assertFalse(
+ held_by_social,
+ f"Audio device /dev/snd held by social-bot process after shutdown: {holders}"
+ )
+ except FileNotFoundError:
+ self.skipTest("fuser command not found")
+
+ # ── Test 5: No orphaned subprocesses ──────────────────────────────────────
+
+ def test_no_orphaned_pycuda_processes(self):
+ """No orphaned pycuda/CUDA processes left from TRT inference."""
+ # Check for any lingering nvidia-cuda-mps processes
+ pids = get_process_pids("nvidia-cuda-mps")
+ # MPS is OK if it was running before — just check for spikes
+ # In practice, just verify no new cuda-related orphans
+ pass # informational check only
+
+ def test_ros_daemon_still_healthy(self):
+ """ros2 daemon should still be healthy after stack shutdown."""
+ try:
+ result = subprocess.run(
+ ["ros2", "daemon", "status"],
+ capture_output=True, timeout=10, text=True
+ )
+ # Should output something about the daemon status
+ # A crashed daemon would affect subsequent tests
+ self.assertNotIn("error", result.stdout.lower(),
+ f"ros2 daemon unhealthy: {result.stdout}")
+ except FileNotFoundError:
+ self.skipTest("ros2 command not found — not in ROS environment")
+ except subprocess.TimeoutExpired:
+ self.fail("ros2 daemon status timed out")
+
+
+class TestShutdownTiming(unittest.TestCase):
+ """Test that nodes respond to SIGTERM within the shutdown window."""
+
+ def test_shutdown_timeout_constant_is_reasonable(self):
+ """Shutdown timeout must be >= 5s (some nodes need time to flush)."""
+ self.assertGreaterEqual(MAX_SHUTDOWN_S, 5.0)
+ self.assertLessEqual(MAX_SHUTDOWN_S, 30.0,
+ "Shutdown timeout > 30s is too long for CI")
+
+ def test_no_ros_nodes_running_at_test_start(self):
+ """Verify we're running in isolation (no leftover nodes from prev run)."""
+ social_nodes = [
+ "orchestrator_node", "face_recognition_node",
+ "speech_pipeline_node", "conversation_node", "tts_node",
+ "tracking_fusion_node", "social_nav_node",
+ ]
+ already_running = []
+ for node_name in social_nodes:
+ pids = get_process_pids(node_name)
+ if pids:
+ already_running.append((node_name, pids))
+
+ if already_running:
+ # Warn but don't fail — stack may be legitimately running
+ print(f"\n[WARNING] Nodes still running from previous session: "
+ f"{already_running}")
+
+
+class TestPostShutdownPure(unittest.TestCase):
+ """Pure-Python post-shutdown checks (no ROS2 required)."""
+
+ def test_gpu_budget_constants_valid(self):
+ """MAX_SHUTDOWN_S and BASELINE_DRIFT_MB must be positive."""
+ self.assertGreater(MAX_SHUTDOWN_S, 0)
+ self.assertGreater(BASELINE_DRIFT_MB, 0)
+
+ def test_zombie_counter_returns_int(self):
+ """count_zombie_processes must return a non-negative int."""
+ result = count_zombie_processes("nonexistent_process_xyz")
+ self.assertIsInstance(result, int)
+ self.assertGreaterEqual(result, 0)
+
+ def test_pid_list_returns_list(self):
+ """get_process_pids must return a list."""
+ result = get_process_pids("nonexistent_process_xyz")
+ self.assertIsInstance(result, list)
+
+ def test_gpu_checker_no_gpu_returns_none(self):
+ """GpuMemoryChecker returns None gracefully when no GPU."""
+ # This test always passes — we just verify no exception is raised
+ # (On machines with GPU, this returns a value; without GPU, returns None)
+ result = GpuMemoryChecker.free_mb()
+ if result is not None:
+ self.assertGreater(result, 0)
diff --git a/jetson/ros2_ws/src/saltybot_social_tests/test/test_topic_rates.py b/jetson/ros2_ws/src/saltybot_social_tests/test/test_topic_rates.py
new file mode 100644
index 0000000..6731dc8
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_social_tests/test/test_topic_rates.py
@@ -0,0 +1,174 @@
+"""test_topic_rates.py — Verify social-bot topics publish at expected rates.
+
+Tests measure actual Hz over a 3-second window and assert:
+ /social/faces/detections >= 10 Hz (face detector, camera@15Hz input)
+ /social/tracking/fused_target >= 8 Hz (tracking fusion)
+ /social/orchestrator/state >= 1 Hz (orchestrator heartbeat)
+ /camera/color/image_raw >= 12 Hz (mock sensor publisher)
+ /uwb/target >= 8 Hz (mock UWB)
+ /social/persons >= 3 Hz (person state tracker)
+
+Run as part of a live stack (requires social_test.launch.py to be running):
+ pytest test_topic_rates.py -v --stack-running
+
+Or run standalone with mock sensor pub only for camera/uwb tests.
+"""
+
+from __future__ import annotations
+
+import os
+import sys
+import time
+import unittest
+
+import pytest
+import rclpy
+from rclpy.node import Node
+
+sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
+from saltybot_social_tests.test_helpers import TopicRateChecker, poll_until
+
+
+# ── Rate requirements ─────────────────────────────────────────────────────────
+# (topic, msg_type_path, min_hz, warm_up_s)
+RATE_SPECS = [
+ # Mock-published topics — should always pass
+ ("camera_image", "/camera/color/image_raw", "sensor_msgs.msg.Image", 12.0, 2.0),
+ ("uwb_target", "/uwb/target", "geometry_msgs.msg.PoseStamped", 8.0, 2.0),
+ # Social-bot topics
+ ("faces_detections", "/social/faces/detections", "saltybot_social_msgs.msg.FaceDetectionArray", 10.0, 5.0),
+ ("fused_target", "/social/tracking/fused_target", "saltybot_social_msgs.msg.FusedTarget", 8.0, 5.0),
+ ("orchestrator_state", "/social/orchestrator/state", "std_msgs.msg.String", 1.0, 3.0),
+ ("persons", "/social/persons", "saltybot_social_msgs.msg.PersonStateArray", 3.0, 5.0),
+]
+
+MEASUREMENT_WINDOW_S = 3.0
+
+
+def _import_msg_type(path: str):
+ """Dynamically import a message class from dotted path like 'std_msgs.msg.String'."""
+ parts = path.rsplit(".", 1)
+ module = __import__(parts[0], fromlist=[parts[1]])
+ return getattr(module, parts[1])
+
+
+class TestTopicRates(unittest.TestCase):
+ """Verify all social-bot topics publish at ≥ minimum required rate."""
+
+ @classmethod
+ def setUpClass(cls):
+ rclpy.init()
+ cls.node = rclpy.create_node("topic_rate_test_probe")
+ cls.checkers: dict = {}
+
+ # Create all rate checkers
+ for spec_id, topic, msg_path, min_hz, warm_up in RATE_SPECS:
+ try:
+ msg_type = _import_msg_type(msg_path)
+ cls.checkers[spec_id] = TopicRateChecker(
+ cls.node, topic, msg_type, window_s=MEASUREMENT_WINDOW_S
+ )
+ except Exception as e:
+ print(f"[WARN] Could not create checker for {spec_id}: {e}")
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.node.destroy_node()
+ rclpy.shutdown()
+
+ def _spin_for(self, duration_s: float) -> None:
+ deadline = time.time() + duration_s
+ while time.time() < deadline:
+ rclpy.spin_once(self.node, timeout_sec=0.05)
+
+ def _check_rate(self, spec_id: str, min_hz: float, warm_up_s: float) -> None:
+ """Generic rate check: warm up, measure, assert."""
+ checker = self.checkers.get(spec_id)
+ if checker is None:
+ self.skipTest(f"Checker for {spec_id} not available (missing msg type)")
+
+ # Warm-up period
+ self._spin_for(warm_up_s)
+
+ # Measurement window
+ self._spin_for(MEASUREMENT_WINDOW_S + 0.5)
+
+ hz = checker.measured_hz()
+ count = checker.message_count()
+
+ self.assertGreater(
+ hz, min_hz * 0.7, # 30% tolerance for scheduling jitter
+ f"{spec_id}: measured {hz:.2f} Hz < {min_hz:.1f} Hz minimum "
+ f"(received {count} messages total)"
+ )
+
+ # ── Individual test methods (one per topic) ────────────────────────────────
+
+ def test_camera_image_rate(self):
+ """Mock camera must publish at ≥12Hz."""
+ self._check_rate("camera_image", 12.0, warm_up_s=2.0)
+
+ def test_uwb_target_rate(self):
+ """Mock UWB must publish at ≥8Hz."""
+ self._check_rate("uwb_target", 8.0, warm_up_s=2.0)
+
+ def test_face_detection_rate(self):
+ """Face detection must publish at ≥10Hz given 15Hz camera input."""
+ self._check_rate("faces_detections", 10.0, warm_up_s=5.0)
+
+ def test_tracking_fusion_rate(self):
+ """Tracking fusion must publish fused_target at ≥8Hz."""
+ self._check_rate("fused_target", 8.0, warm_up_s=5.0)
+
+ def test_orchestrator_state_rate(self):
+ """Orchestrator must publish state at ≥1Hz (config: 2Hz)."""
+ self._check_rate("orchestrator_state", 1.0, warm_up_s=3.0)
+
+ def test_persons_rate(self):
+ """Person state tracker must publish at ≥3Hz."""
+ self._check_rate("persons", 3.0, warm_up_s=5.0)
+
+
+class TestTopicPresence(unittest.TestCase):
+ """Verify all expected topics exist in the topic graph (not rate)."""
+
+ EXPECTED_TOPICS = [
+ "/social/orchestrator/state",
+ "/social/faces/detections",
+ "/social/faces/embeddings",
+ "/social/tracking/fused_target",
+ "/social/persons",
+ "/social/speech/vad_state",
+ "/social/speech/transcript",
+ "/social/conversation/response",
+ "/social/nav/mode",
+ "/camera/color/image_raw",
+ "/uwb/target",
+ "/cmd_vel",
+ ]
+
+ @classmethod
+ def setUpClass(cls):
+ if not rclpy.ok():
+ rclpy.init()
+ cls.node = rclpy.create_node("topic_presence_test_probe")
+ # Wait briefly for topics to register
+ deadline = time.time() + 10.0
+ while time.time() < deadline:
+ rclpy.spin_once(cls.node, timeout_sec=0.2)
+ known = {n for n, _ in cls.node.get_topic_names_and_types()}
+ if "/social/orchestrator/state" in known:
+ break
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.node.destroy_node()
+
+ def test_all_expected_topics_advertised(self):
+ """Every expected topic must appear in the topic graph."""
+ known = {n for n, _ in self.node.get_topic_names_and_types()}
+ missing = [t for t in self.EXPECTED_TOPICS if t not in known]
+ self.assertEqual(
+ missing, [],
+ f"Topics not advertised: {missing}"
+ )