feat(tests): social-bot integration test suite (Issue #108) #118
239
.gitea/workflows/social-tests-ci.yml
Normal file
239
.gitea/workflows/social-tests-ci.yml
Normal file
@ -0,0 +1,239 @@
|
||||
# .gitea/workflows/social-tests-ci.yml
|
||||
# Gitea Actions CI pipeline for social-bot integration tests (Issue #108)
|
||||
#
|
||||
# Triggers: push or PR to main, and any branch matching sl-jetson/*
|
||||
#
|
||||
# Jobs:
|
||||
# 1. lint — flake8 + pep257 on saltybot_social_tests
|
||||
# 2. core-tests — launch + topic + services + GPU-budget + shutdown (no GPU)
|
||||
# 3. latency — end-to-end latency profiling (GPU runner, optional)
|
||||
|
||||
name: social-bot integration tests
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- main
|
||||
- "sl-jetson/**"
|
||||
paths:
|
||||
- "jetson/ros2_ws/src/saltybot_social/**"
|
||||
- "jetson/ros2_ws/src/saltybot_social_tests/**"
|
||||
- "jetson/ros2_ws/src/saltybot_social_msgs/**"
|
||||
- "jetson/ros2_ws/src/saltybot_social_nav/**"
|
||||
- "jetson/ros2_ws/src/saltybot_social_face/**"
|
||||
- "jetson/ros2_ws/src/saltybot_social_tracking/**"
|
||||
- ".gitea/workflows/social-tests-ci.yml"
|
||||
pull_request:
|
||||
branches:
|
||||
- main
|
||||
paths:
|
||||
- "jetson/ros2_ws/src/saltybot_social*/**"
|
||||
- ".gitea/workflows/social-tests-ci.yml"
|
||||
|
||||
env:
|
||||
ROS_DOMAIN_ID: "108"
|
||||
SOCIAL_TEST_FULL: "0"
|
||||
SOCIAL_STACK_RUNNING: "1"
|
||||
|
||||
jobs:
|
||||
# ── 1. Lint ──────────────────────────────────────────────────────────────────
|
||||
lint:
|
||||
name: Lint (flake8 + pep257)
|
||||
runs-on: ubuntu-latest
|
||||
container:
|
||||
image: ros:humble-ros-base-jammy
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Install lint tools
|
||||
run: |
|
||||
pip3 install --no-cache-dir flake8 pydocstyle
|
||||
|
||||
- name: flake8 — saltybot_social_tests
|
||||
run: |
|
||||
flake8 \
|
||||
jetson/ros2_ws/src/saltybot_social_tests/saltybot_social_tests/ \
|
||||
jetson/ros2_ws/src/saltybot_social_tests/test/ \
|
||||
--max-line-length=100 \
|
||||
--ignore=E501,W503 \
|
||||
--exclude=__pycache__
|
||||
|
||||
- name: pep257 — saltybot_social_tests
|
||||
run: |
|
||||
pydocstyle \
|
||||
jetson/ros2_ws/src/saltybot_social_tests/saltybot_social_tests/ \
|
||||
jetson/ros2_ws/src/saltybot_social_tests/test/ \
|
||||
--add-ignore=D100,D104,D205,D400 \
|
||||
--match='(?!test_helpers|mock_sensors).*\.py' || true
|
||||
|
||||
# ── 2. Core integration tests (no GPU) ───────────────────────────────────────
|
||||
core-tests:
|
||||
name: Core integration tests (mock sensors, no GPU)
|
||||
needs: lint
|
||||
runs-on: ubuntu-latest
|
||||
container:
|
||||
image: ros:humble-ros-base-jammy
|
||||
env:
|
||||
ROS_DOMAIN_ID: "108"
|
||||
SOCIAL_TEST_FULL: "0"
|
||||
SOCIAL_STACK_RUNNING: "1"
|
||||
PYTHONUNBUFFERED: "1"
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
apt-get update -q && apt-get install -y --no-install-recommends \
|
||||
ros-humble-launch-testing \
|
||||
ros-humble-launch-testing-ros \
|
||||
ros-humble-vision-msgs \
|
||||
ros-humble-tf2-ros \
|
||||
ros-humble-tf2-geometry-msgs \
|
||||
ros-humble-cv-bridge \
|
||||
ros-humble-nav-msgs \
|
||||
ros-humble-std-srvs \
|
||||
procps \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
pip3 install --no-cache-dir "pytest>=7.0" "pytest-timeout>=2.1"
|
||||
|
||||
- name: Build workspace
|
||||
run: |
|
||||
. /opt/ros/humble/setup.bash
|
||||
cd /
|
||||
# Symlink source into a temp workspace to avoid long paths
|
||||
mkdir -p /ros2_ws/src
|
||||
for pkg in saltybot_social_msgs saltybot_social saltybot_social_face \
|
||||
saltybot_social_nav saltybot_social_enrollment \
|
||||
saltybot_social_tracking saltybot_social_personality \
|
||||
saltybot_social_tests; do
|
||||
ln -sfn \
|
||||
"$GITHUB_WORKSPACE/jetson/ros2_ws/src/$pkg" \
|
||||
"/ros2_ws/src/$pkg"
|
||||
done
|
||||
cd /ros2_ws
|
||||
colcon build \
|
||||
--packages-select \
|
||||
saltybot_social_msgs \
|
||||
saltybot_social \
|
||||
saltybot_social_face \
|
||||
saltybot_social_nav \
|
||||
saltybot_social_enrollment \
|
||||
saltybot_social_tracking \
|
||||
saltybot_social_personality \
|
||||
saltybot_social_tests \
|
||||
--symlink-install \
|
||||
--event-handlers console_cohesion+
|
||||
|
||||
- name: Launch test stack (background)
|
||||
run: |
|
||||
. /opt/ros/humble/setup.bash
|
||||
. /ros2_ws/install/setup.bash
|
||||
ros2 daemon start || true
|
||||
ros2 launch saltybot_social_tests social_test.launch.py \
|
||||
enable_speech:=false enable_llm:=false enable_tts:=false \
|
||||
&
|
||||
echo "LAUNCH_PID=$!" >> "$GITHUB_ENV"
|
||||
# Wait for stack to come up
|
||||
sleep 15
|
||||
|
||||
- name: Run core tests
|
||||
run: |
|
||||
. /opt/ros/humble/setup.bash
|
||||
. /ros2_ws/install/setup.bash
|
||||
pytest \
|
||||
"$GITHUB_WORKSPACE/jetson/ros2_ws/src/saltybot_social_tests/test/" \
|
||||
-v \
|
||||
--timeout=120 \
|
||||
--tb=short \
|
||||
-m "not gpu_required and not full_stack and not slow" \
|
||||
--junit-xml=test-results-core.xml \
|
||||
-k "not test_latency_sla and not test_shutdown"
|
||||
|
||||
- name: Shutdown test stack
|
||||
if: always()
|
||||
run: |
|
||||
[ -n "${LAUNCH_PID:-}" ] && kill -SIGTERM "$LAUNCH_PID" 2>/dev/null || true
|
||||
sleep 3
|
||||
|
||||
- name: Run shutdown tests
|
||||
run: |
|
||||
. /opt/ros/humble/setup.bash
|
||||
. /ros2_ws/install/setup.bash
|
||||
pytest \
|
||||
"$GITHUB_WORKSPACE/jetson/ros2_ws/src/saltybot_social_tests/test/test_shutdown.py" \
|
||||
-v --timeout=60 --tb=short \
|
||||
--junit-xml=test-results-shutdown.xml
|
||||
|
||||
- name: Upload test results
|
||||
if: always()
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: test-results-core
|
||||
path: test-results-*.xml
|
||||
|
||||
# ── 3. GPU latency tests (Orin runner, optional) ──────────────────────────────
|
||||
latency-gpu:
|
||||
name: Latency profiling (GPU, Orin)
|
||||
needs: core-tests
|
||||
runs-on: [self-hosted, orin, gpu]
|
||||
if: github.ref == 'refs/heads/main' || contains(github.event.pull_request.labels.*.name, 'run-gpu-tests')
|
||||
env:
|
||||
ROS_DOMAIN_ID: "108"
|
||||
SOCIAL_TEST_FULL: "1"
|
||||
SOCIAL_TEST_SPEECH: "0"
|
||||
SOCIAL_STACK_RUNNING: "1"
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Build workspace (Orin)
|
||||
run: |
|
||||
. /opt/ros/humble/setup.bash
|
||||
cd /ros2_ws
|
||||
colcon build \
|
||||
--packages-select saltybot_social_tests saltybot_social \
|
||||
--symlink-install
|
||||
|
||||
- name: Launch full stack (with LLM, no speech/TTS)
|
||||
run: |
|
||||
. /opt/ros/humble/setup.bash
|
||||
. /ros2_ws/install/setup.bash
|
||||
SOCIAL_TEST_FULL=1 \
|
||||
ros2 launch saltybot_social_tests social_test.launch.py \
|
||||
enable_speech:=false enable_tts:=false \
|
||||
&
|
||||
echo "LAUNCH_PID=$!" >> "$GITHUB_ENV"
|
||||
sleep 20
|
||||
|
||||
- name: GPU memory check
|
||||
run: |
|
||||
. /opt/ros/humble/setup.bash
|
||||
. /ros2_ws/install/setup.bash
|
||||
SOCIAL_STACK_RUNNING=1 \
|
||||
GPU_BASELINE_MB=$(nvidia-smi --query-gpu=memory.used --format=csv,noheader,nounits) \
|
||||
pytest \
|
||||
"$GITHUB_WORKSPACE/jetson/ros2_ws/src/saltybot_social_tests/test/test_gpu_memory.py" \
|
||||
-v --timeout=120 --tb=short \
|
||||
--junit-xml=test-results-gpu.xml
|
||||
|
||||
- name: Latency profiling
|
||||
run: |
|
||||
. /opt/ros/humble/setup.bash
|
||||
. /ros2_ws/install/setup.bash
|
||||
SOCIAL_TEST_FULL=1 SOCIAL_STACK_RUNNING=1 \
|
||||
pytest \
|
||||
"$GITHUB_WORKSPACE/jetson/ros2_ws/src/saltybot_social_tests/test/test_latency.py" \
|
||||
-v --timeout=300 --tb=short \
|
||||
--junit-xml=test-results-latency.xml
|
||||
|
||||
- name: Shutdown
|
||||
if: always()
|
||||
run: |
|
||||
[ -n "${LAUNCH_PID:-}" ] && kill -SIGTERM "$LAUNCH_PID" 2>/dev/null || true
|
||||
sleep 5
|
||||
|
||||
- name: Upload GPU results
|
||||
if: always()
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: test-results-gpu
|
||||
path: test-results-*.xml
|
||||
55
jetson/ros2_ws/src/saltybot_social_tests/bags/README.md
Normal file
55
jetson/ros2_ws/src/saltybot_social_tests/bags/README.md
Normal file
@ -0,0 +1,55 @@
|
||||
# Social-bot Test Bags
|
||||
|
||||
This directory holds rosbag2 recordings used for replaying mock sensor data in CI.
|
||||
|
||||
## Bag contents
|
||||
|
||||
Each test bag records the following topics for **30 seconds** at real-time rate:
|
||||
|
||||
| Topic | Type | Rate | Notes |
|
||||
|---|---|---|---|
|
||||
| `/camera/color/image_raw` | `sensor_msgs/Image` | 15 Hz | 640×480 RGB8 |
|
||||
| `/uwb/target` | `geometry_msgs/PoseStamped` | 10 Hz | simulated UWB |
|
||||
| `/social/faces/detections` | `FaceDetectionArray` | 15 Hz | may be empty |
|
||||
| `/social/tracking/fused_target` | `FusedTarget` | 10 Hz | single person |
|
||||
| `/social/persons` | `PersonStateArray` | 5 Hz | one engaged person |
|
||||
| `/social/orchestrator/state` | `std_msgs/String` | 2 Hz | JSON state |
|
||||
| `/odom` | `nav_msgs/Odometry` | 20 Hz | zero motion |
|
||||
| `/tf`, `/tf_static` | `TF2` | varies | camera → base_link |
|
||||
|
||||
## Recording a new bag
|
||||
|
||||
On hardware with the full social-bot stack running:
|
||||
|
||||
```bash
|
||||
# Record 30s of live sensor data
|
||||
./bags/record_social_test.sh --duration 30
|
||||
|
||||
# Record from mock sensor pub only (no hardware)
|
||||
MOCK_ONLY=1 ./bags/record_social_test.sh --duration 15 --name social_test_mock
|
||||
```
|
||||
|
||||
## Replaying a bag in CI
|
||||
|
||||
The Docker CI entrypoint uses live mock publishers (not rosbag replay) for the
|
||||
default CI tests. To use a recorded bag instead:
|
||||
|
||||
```bash
|
||||
# In a sourced ROS2 shell:
|
||||
ros2 bag play bags/social_test_YYYYMMDD_HHMMSS/ --loop --rate 1.0
|
||||
```
|
||||
|
||||
Then run the test suite against the replayed data:
|
||||
|
||||
```bash
|
||||
export SOCIAL_STACK_RUNNING=1
|
||||
pytest test/ -v -m "not full_stack"
|
||||
```
|
||||
|
||||
## Bag naming convention
|
||||
|
||||
`social_test_YYYYMMDD_HHMMSS` — date/time of recording.
|
||||
|
||||
Bags committed to this directory should be small (< 50 MB) — use a 5-second
|
||||
clip or compress images to avoid bloating the repository. Large reference bags
|
||||
(> 100 MB) should be stored on the NVMe and referenced by path.
|
||||
100
jetson/ros2_ws/src/saltybot_social_tests/bags/record_social_test.sh
Executable file
100
jetson/ros2_ws/src/saltybot_social_tests/bags/record_social_test.sh
Executable file
@ -0,0 +1,100 @@
|
||||
#!/bin/bash
|
||||
# record_social_test.sh — Record a rosbag for social-bot integration test replay.
|
||||
#
|
||||
# Records all topics required by the social-bot CI test suite.
|
||||
# Output: bags/social_test_YYYYMMDD_HHMMSS/ (ROS2 bag format)
|
||||
#
|
||||
# Usage:
|
||||
# # Record 30s with live hardware:
|
||||
# ./bags/record_social_test.sh --duration 30
|
||||
#
|
||||
# # Record from already-running mock sensor stack:
|
||||
# MOCK_ONLY=1 ./bags/record_social_test.sh --duration 15
|
||||
#
|
||||
# Requirements: ROS2 Humble sourced, social-bot stack running
|
||||
|
||||
set -euo pipefail
|
||||
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
|
||||
BAGS_DIR="$SCRIPT_DIR"
|
||||
|
||||
# ── Args ──────────────────────────────────────────────────────────────────────
|
||||
DURATION=30
|
||||
BAG_NAME="social_test_$(date +%Y%m%d_%H%M%S)"
|
||||
MOCK_ONLY="${MOCK_ONLY:-0}"
|
||||
|
||||
while [[ $# -gt 0 ]]; do
|
||||
case "$1" in
|
||||
--duration) DURATION="$2"; shift 2 ;;
|
||||
--name) BAG_NAME="$2"; shift 2 ;;
|
||||
*) echo "Unknown arg: $1"; exit 1 ;;
|
||||
esac
|
||||
done
|
||||
|
||||
BAG_PATH="$BAGS_DIR/$BAG_NAME"
|
||||
|
||||
# ── Source ROS2 ───────────────────────────────────────────────────────────────
|
||||
source /opt/ros/humble/setup.bash 2>/dev/null || true
|
||||
if [ -f /ros2_ws/install/setup.bash ]; then
|
||||
source /ros2_ws/install/setup.bash
|
||||
fi
|
||||
|
||||
echo "[record] Target bag: $BAG_PATH"
|
||||
echo "[record] Duration: ${DURATION}s"
|
||||
|
||||
# ── Topics to record ──────────────────────────────────────────────────────────
|
||||
TOPICS=(
|
||||
# Mock / hardware sensor topics
|
||||
"/camera/color/image_raw"
|
||||
"/camera/color/camera_info"
|
||||
"/camera/depth/image_rect_raw"
|
||||
"/uwb/target"
|
||||
"/scan"
|
||||
"/odom"
|
||||
"/map"
|
||||
# Social-bot output topics
|
||||
"/social/faces/detections"
|
||||
"/social/faces/embeddings"
|
||||
"/social/tracking/fused_target"
|
||||
"/social/persons"
|
||||
"/social/orchestrator/state"
|
||||
"/social/speech/vad_state"
|
||||
"/social/speech/transcript"
|
||||
"/social/conversation/response"
|
||||
"/social/nav/mode"
|
||||
"/social/nav/status"
|
||||
"/social/attention/target_id"
|
||||
"/cmd_vel"
|
||||
# TF for coordinate frames
|
||||
"/tf"
|
||||
"/tf_static"
|
||||
)
|
||||
|
||||
# ── Start mock sensor pub if MOCK_ONLY ────────────────────────────────────────
|
||||
MOCK_PID=""
|
||||
if [[ "$MOCK_ONLY" == "1" ]]; then
|
||||
echo "[record] Starting mock sensor publisher..."
|
||||
ros2 run saltybot_social_tests mock_sensor_pub &
|
||||
MOCK_PID=$!
|
||||
sleep 2
|
||||
fi
|
||||
|
||||
# ── Record ────────────────────────────────────────────────────────────────────
|
||||
echo "[record] Recording..."
|
||||
ros2 bag record \
|
||||
--output "$BAG_PATH" \
|
||||
--storage sqlite3 \
|
||||
--max-bag-duration "$DURATION" \
|
||||
"${TOPICS[@]}" &
|
||||
BAG_PID=$!
|
||||
|
||||
sleep "$DURATION"
|
||||
wait "$BAG_PID" 2>/dev/null || true
|
||||
|
||||
# ── Cleanup ───────────────────────────────────────────────────────────────────
|
||||
if [[ -n "$MOCK_PID" ]]; then
|
||||
kill "$MOCK_PID" 2>/dev/null || true
|
||||
fi
|
||||
|
||||
echo "[record] Done. Bag saved to: $BAG_PATH"
|
||||
echo "[record] Contents:"
|
||||
ros2 bag info "$BAG_PATH" 2>/dev/null || echo "(ros2 bag info unavailable)"
|
||||
@ -0,0 +1,93 @@
|
||||
# Dockerfile.ci — Lightweight CI test container for social-bot integration tests.
|
||||
#
|
||||
# Does NOT include GPU AI deps (whisper/LLM/TTS/TensorRT).
|
||||
# Runs the core ROS2 stack with mock sensors only — no real hardware required.
|
||||
#
|
||||
# Build:
|
||||
# docker build -f docker/Dockerfile.ci \
|
||||
# -t saltybot/social-tests-ci:latest \
|
||||
# ../../../../ # context = saltylab-firmware root
|
||||
#
|
||||
# Run (via compose):
|
||||
# docker compose -f docker/docker-compose.ci.yml up --exit-code-from test-runner
|
||||
|
||||
FROM ros:humble-ros-base-jammy
|
||||
|
||||
LABEL maintainer="sl-jetson"
|
||||
LABEL description="social-bot integration tests — CI mode (no GPU, mock sensors)"
|
||||
|
||||
ENV DEBIAN_FRONTEND=noninteractive
|
||||
ENV ROS_DISTRO=humble
|
||||
ENV PYTHONDONTWRITEBYTECODE=1
|
||||
ENV PYTHONUNBUFFERED=1
|
||||
|
||||
# ── System deps ───────────────────────────────────────────────────────────────
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
python3-dev python3-pip python3-setuptools python3-wheel \
|
||||
python3-pytest \
|
||||
ros-humble-launch-testing \
|
||||
ros-humble-launch-testing-ament-cmake \
|
||||
ros-humble-launch-testing-ros \
|
||||
ros-humble-rosbag2 \
|
||||
ros-humble-rosbag2-storage-default-plugins \
|
||||
ros-humble-vision-msgs \
|
||||
ros-humble-tf2-ros \
|
||||
ros-humble-tf2-geometry-msgs \
|
||||
ros-humble-cv-bridge \
|
||||
ros-humble-nav-msgs \
|
||||
ros-humble-geometry-msgs \
|
||||
ros-humble-sensor-msgs \
|
||||
ros-humble-std-srvs \
|
||||
procps \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# ── Python test deps ──────────────────────────────────────────────────────────
|
||||
RUN pip3 install --no-cache-dir \
|
||||
"pytest>=7.0" \
|
||||
"pytest-timeout>=2.1" \
|
||||
"pytest-xdist>=3.0"
|
||||
|
||||
# ── Workspace ─────────────────────────────────────────────────────────────────
|
||||
RUN mkdir -p /ros2_ws/src
|
||||
WORKDIR /ros2_ws
|
||||
|
||||
# Copy only the packages needed for the social-bot integration tests
|
||||
COPY jetson/ros2_ws/src/saltybot_social_msgs /ros2_ws/src/saltybot_social_msgs/
|
||||
COPY jetson/ros2_ws/src/saltybot_social /ros2_ws/src/saltybot_social/
|
||||
COPY jetson/ros2_ws/src/saltybot_social_face /ros2_ws/src/saltybot_social_face/
|
||||
COPY jetson/ros2_ws/src/saltybot_social_nav /ros2_ws/src/saltybot_social_nav/
|
||||
COPY jetson/ros2_ws/src/saltybot_social_enrollment /ros2_ws/src/saltybot_social_enrollment/
|
||||
COPY jetson/ros2_ws/src/saltybot_social_tracking /ros2_ws/src/saltybot_social_tracking/
|
||||
COPY jetson/ros2_ws/src/saltybot_social_personality /ros2_ws/src/saltybot_social_personality/
|
||||
COPY jetson/ros2_ws/src/saltybot_social_tests /ros2_ws/src/saltybot_social_tests/
|
||||
|
||||
# ── Build ─────────────────────────────────────────────────────────────────────
|
||||
SHELL ["/bin/bash", "-c"]
|
||||
RUN source /opt/ros/humble/setup.bash && \
|
||||
cd /ros2_ws && \
|
||||
colcon build \
|
||||
--packages-select \
|
||||
saltybot_social_msgs \
|
||||
saltybot_social \
|
||||
saltybot_social_face \
|
||||
saltybot_social_nav \
|
||||
saltybot_social_enrollment \
|
||||
saltybot_social_tracking \
|
||||
saltybot_social_personality \
|
||||
saltybot_social_tests \
|
||||
--symlink-install \
|
||||
--cmake-args -DBUILD_TESTING=ON \
|
||||
&& echo "Build complete"
|
||||
|
||||
# ── Test entrypoint ───────────────────────────────────────────────────────────
|
||||
COPY jetson/ros2_ws/src/saltybot_social_tests/docker/entrypoint-ci.sh /entrypoint-ci.sh
|
||||
RUN chmod +x /entrypoint-ci.sh
|
||||
|
||||
# CI env defaults (all can be overridden via docker-compose environment:)
|
||||
ENV SOCIAL_TEST_FULL=0
|
||||
ENV SOCIAL_TEST_SPEECH=0
|
||||
ENV SOCIAL_STACK_RUNNING=0
|
||||
ENV ROS_DOMAIN_ID=108
|
||||
|
||||
ENTRYPOINT ["/entrypoint-ci.sh"]
|
||||
CMD ["pytest", "/ros2_ws/src/saltybot_social_tests/test/", "-v", "--timeout=120"]
|
||||
@ -0,0 +1,75 @@
|
||||
# docker-compose.ci.yml — CI test runner for social-bot integration tests.
|
||||
#
|
||||
# Usage:
|
||||
# # Default: core CI tests (no GPU, mock sensors only)
|
||||
# docker compose -f docker/docker-compose.ci.yml up --exit-code-from test-runner
|
||||
#
|
||||
# # Full tests with GPU (requires nvidia container runtime):
|
||||
# SOCIAL_TEST_FULL=1 \
|
||||
# docker compose -f docker/docker-compose.ci.yml \
|
||||
# --profile gpu up --exit-code-from test-runner
|
||||
#
|
||||
# Build image first:
|
||||
# docker compose -f docker/docker-compose.ci.yml build
|
||||
|
||||
version: "3.9"
|
||||
|
||||
# ── Shared env ────────────────────────────────────────────────────────────────
|
||||
x-common-env: &common-env
|
||||
ROS_DOMAIN_ID: "108"
|
||||
PYTHONUNBUFFERED: "1"
|
||||
|
||||
services:
|
||||
# ── Core CI test runner (no GPU) ─────────────────────────────────────────────
|
||||
test-runner:
|
||||
build:
|
||||
context: ../../../../ # saltylab-firmware root
|
||||
dockerfile: jetson/ros2_ws/src/saltybot_social_tests/docker/Dockerfile.ci
|
||||
image: saltybot/social-tests-ci:latest
|
||||
container_name: social_tests_ci
|
||||
environment:
|
||||
<<: *common-env
|
||||
SOCIAL_TEST_FULL: "${SOCIAL_TEST_FULL:-0}"
|
||||
SOCIAL_TEST_SPEECH: "${SOCIAL_TEST_SPEECH:-0}"
|
||||
SOCIAL_STACK_RUNNING: "1" # stack is started inside entrypoint
|
||||
command: >
|
||||
pytest
|
||||
/ros2_ws/src/saltybot_social_tests/test/
|
||||
-v
|
||||
--timeout=120
|
||||
--tb=short
|
||||
-m "not gpu_required and not full_stack"
|
||||
--junit-xml=/tmp/test-results/social-tests.xml
|
||||
volumes:
|
||||
- test-results:/tmp/test-results
|
||||
# No network_mode host needed — ROS2 uses shared memory loopback in container
|
||||
|
||||
# ── GPU full-stack tests (Orin / CUDA host) ───────────────────────────────
|
||||
test-runner-gpu:
|
||||
extends:
|
||||
service: test-runner
|
||||
container_name: social_tests_ci_gpu
|
||||
environment:
|
||||
<<: *common-env
|
||||
SOCIAL_TEST_FULL: "1"
|
||||
SOCIAL_TEST_SPEECH: "1"
|
||||
SOCIAL_STACK_RUNNING: "1"
|
||||
command: >
|
||||
pytest
|
||||
/ros2_ws/src/saltybot_social_tests/test/
|
||||
-v
|
||||
--timeout=300
|
||||
--tb=short
|
||||
--junit-xml=/tmp/test-results/social-tests-gpu.xml
|
||||
deploy:
|
||||
resources:
|
||||
reservations:
|
||||
devices:
|
||||
- driver: nvidia
|
||||
count: all
|
||||
capabilities: [gpu]
|
||||
profiles:
|
||||
- gpu
|
||||
|
||||
volumes:
|
||||
test-results:
|
||||
@ -0,0 +1,36 @@
|
||||
#!/bin/bash
|
||||
# entrypoint-ci.sh — CI container entrypoint for social-bot integration tests.
|
||||
set -euo pipefail
|
||||
|
||||
source /opt/ros/humble/setup.bash
|
||||
source /ros2_ws/install/setup.bash
|
||||
|
||||
# Start the ROS2 daemon in the background so node discovery works
|
||||
ros2 daemon start 2>/dev/null || true
|
||||
|
||||
# Launch the test stack in the background using the test launch file
|
||||
# (orchestrator + face + tracking + mock_sensors — no GPU speech/LLM/TTS)
|
||||
echo "[CI] Launching social-bot test stack..."
|
||||
ros2 launch saltybot_social_tests social_test.launch.py \
|
||||
enable_speech:=false \
|
||||
enable_llm:=false \
|
||||
enable_tts:=false \
|
||||
&
|
||||
LAUNCH_PID=$!
|
||||
|
||||
# Give the stack up to 30s to come up
|
||||
echo "[CI] Waiting for stack to initialise (max 30s)..."
|
||||
sleep 10
|
||||
|
||||
# Run tests (command passed as args to this script)
|
||||
echo "[CI] Running tests: $*"
|
||||
"$@"
|
||||
TEST_EXIT=$?
|
||||
|
||||
# Graceful shutdown
|
||||
echo "[CI] Shutting down stack (PID $LAUNCH_PID)..."
|
||||
kill -SIGTERM "$LAUNCH_PID" 2>/dev/null || true
|
||||
sleep 5
|
||||
kill -SIGKILL "$LAUNCH_PID" 2>/dev/null || true
|
||||
|
||||
exit $TEST_EXIT
|
||||
@ -0,0 +1,177 @@
|
||||
"""social_test.launch.py — Launch social-bot stack in test/CI mode.
|
||||
|
||||
Differences from production social_bot.launch.py:
|
||||
- speech/TTS disabled by default (no mic/speaker in CI)
|
||||
- LLM disabled by default (no model file in CI; use CI_FULL=1 to enable)
|
||||
- mock_sensor_pub node started automatically
|
||||
- Shorter TimerAction delays (2s instead of production delays)
|
||||
- ROS_DOMAIN_ID isolated to avoid collision with other test runs
|
||||
|
||||
Env vars:
|
||||
SOCIAL_TEST_FULL=1 — enable speech + LLM + TTS (requires GPU + models)
|
||||
SOCIAL_TEST_SPEECH=1 — enable speech pipeline only
|
||||
"""
|
||||
|
||||
import os
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
from launch import LaunchDescription
|
||||
from launch.actions import (
|
||||
DeclareLaunchArgument, GroupAction, LogInfo, TimerAction, SetEnvironmentVariable
|
||||
)
|
||||
from launch.conditions import IfCondition
|
||||
from launch.substitutions import LaunchConfiguration, PythonExpression, EnvironmentVariable
|
||||
from launch_ros.actions import Node
|
||||
|
||||
_FULL = os.environ.get("SOCIAL_TEST_FULL", "0") == "1"
|
||||
_SPEECH = _FULL or os.environ.get("SOCIAL_TEST_SPEECH", "0") == "1"
|
||||
|
||||
|
||||
def generate_launch_description() -> LaunchDescription:
|
||||
social_pkg = get_package_share_directory("saltybot_social")
|
||||
tests_pkg = get_package_share_directory("saltybot_social_tests")
|
||||
|
||||
args = [
|
||||
DeclareLaunchArgument("enable_speech", default_value="true" if _SPEECH else "false"),
|
||||
DeclareLaunchArgument("enable_llm", default_value="true" if _FULL else "false"),
|
||||
DeclareLaunchArgument("enable_tts", default_value="true" if _FULL else "false"),
|
||||
DeclareLaunchArgument("enable_orchestrator", default_value="true"),
|
||||
DeclareLaunchArgument("enable_face", default_value="true"),
|
||||
DeclareLaunchArgument("enable_tracking", default_value="true"),
|
||||
DeclareLaunchArgument("enable_nav", default_value="true"),
|
||||
DeclareLaunchArgument("enable_mock_sensors", default_value="true"),
|
||||
]
|
||||
|
||||
# ── Mock sensor publisher (always first) ─────────────────────────────────
|
||||
mock_sensors = Node(
|
||||
package="saltybot_social_tests",
|
||||
executable="mock_sensor_pub",
|
||||
name="mock_sensor_pub",
|
||||
output="screen",
|
||||
condition=IfCondition(LaunchConfiguration("enable_mock_sensors")),
|
||||
)
|
||||
|
||||
# ── Orchestrator (t=0s) ──────────────────────────────────────────────────
|
||||
orchestrator = Node(
|
||||
package="saltybot_social",
|
||||
executable="orchestrator_node",
|
||||
name="orchestrator_node",
|
||||
parameters=[
|
||||
os.path.join(social_pkg, "config", "orchestrator_params.yaml"),
|
||||
{"watchdog_timeout_s": 60.0, # more lenient in CI
|
||||
"gpu_mem_throttle_mb": 500.0}, # low floor for CI without GPU
|
||||
],
|
||||
condition=IfCondition(LaunchConfiguration("enable_orchestrator")),
|
||||
output="screen",
|
||||
)
|
||||
|
||||
# ── Face recognition (t=1s) ──────────────────────────────────────────────
|
||||
face = TimerAction(period=1.0, actions=[
|
||||
GroupAction(
|
||||
condition=IfCondition(LaunchConfiguration("enable_face")),
|
||||
actions=[
|
||||
LogInfo(msg="[social_test] Starting face recognition..."),
|
||||
Node(
|
||||
package="saltybot_social_face",
|
||||
executable="face_recognition",
|
||||
name="face_recognition_node",
|
||||
parameters=[{"use_trt": False, # CPU ONNX in CI
|
||||
"detection_threshold": 0.5,
|
||||
"gallery_path": "/tmp/social_test_gallery"}],
|
||||
output="screen",
|
||||
),
|
||||
]
|
||||
)
|
||||
])
|
||||
|
||||
# ── Tracking fusion (t=1s) ───────────────────────────────────────────────
|
||||
tracking = TimerAction(period=1.0, actions=[
|
||||
GroupAction(
|
||||
condition=IfCondition(LaunchConfiguration("enable_tracking")),
|
||||
actions=[
|
||||
Node(
|
||||
package="saltybot_social_tracking",
|
||||
executable="tracking_fusion_node",
|
||||
name="tracking_fusion_node",
|
||||
output="screen",
|
||||
),
|
||||
]
|
||||
)
|
||||
])
|
||||
|
||||
# ── Speech pipeline (t=2s, optional) ────────────────────────────────────
|
||||
speech = TimerAction(period=2.0, actions=[
|
||||
GroupAction(
|
||||
condition=IfCondition(LaunchConfiguration("enable_speech")),
|
||||
actions=[
|
||||
LogInfo(msg="[social_test] Starting speech pipeline..."),
|
||||
Node(
|
||||
package="saltybot_social",
|
||||
executable="speech_pipeline_node",
|
||||
name="speech_pipeline_node",
|
||||
parameters=[
|
||||
os.path.join(social_pkg, "config", "speech_params.yaml"),
|
||||
{"whisper_model": "tiny", # faster in CI
|
||||
"use_silero_vad": False}, # avoid download in CI
|
||||
],
|
||||
output="screen",
|
||||
),
|
||||
]
|
||||
)
|
||||
])
|
||||
|
||||
# ── LLM + TTS (t=3s, optional) ──────────────────────────────────────────
|
||||
llm = TimerAction(period=3.0, actions=[
|
||||
GroupAction(
|
||||
condition=IfCondition(LaunchConfiguration("enable_llm")),
|
||||
actions=[
|
||||
Node(
|
||||
package="saltybot_social",
|
||||
executable="conversation_node",
|
||||
name="conversation_node",
|
||||
parameters=[
|
||||
os.path.join(social_pkg, "config", "conversation_params.yaml"),
|
||||
],
|
||||
output="screen",
|
||||
),
|
||||
]
|
||||
)
|
||||
])
|
||||
|
||||
tts = TimerAction(period=3.0, actions=[
|
||||
GroupAction(
|
||||
condition=IfCondition(LaunchConfiguration("enable_tts")),
|
||||
actions=[
|
||||
Node(
|
||||
package="saltybot_social",
|
||||
executable="tts_node",
|
||||
name="tts_node",
|
||||
parameters=[
|
||||
os.path.join(social_pkg, "config", "tts_params.yaml"),
|
||||
{"playback_enabled": False}, # no speaker in CI
|
||||
],
|
||||
output="screen",
|
||||
),
|
||||
]
|
||||
)
|
||||
])
|
||||
|
||||
# ── Social nav (t=2s) ────────────────────────────────────────────────────
|
||||
nav = TimerAction(period=2.0, actions=[
|
||||
GroupAction(
|
||||
condition=IfCondition(LaunchConfiguration("enable_nav")),
|
||||
actions=[
|
||||
Node(
|
||||
package="saltybot_social_nav",
|
||||
executable="social_nav",
|
||||
name="social_nav_node",
|
||||
parameters=[{"use_midas_depth": False}], # no GPU depth in CI
|
||||
output="screen",
|
||||
),
|
||||
]
|
||||
)
|
||||
])
|
||||
|
||||
return LaunchDescription(args + [
|
||||
mock_sensors, orchestrator,
|
||||
face, tracking, speech, llm, tts, nav,
|
||||
])
|
||||
43
jetson/ros2_ws/src/saltybot_social_tests/package.xml
Normal file
43
jetson/ros2_ws/src/saltybot_social_tests/package.xml
Normal file
@ -0,0 +1,43 @@
|
||||
<?xml version="1.0"?>
|
||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="3">
|
||||
<name>saltybot_social_tests</name>
|
||||
<version>0.1.0</version>
|
||||
<description>
|
||||
Integration test suite for the social-bot pipeline (Issue #108).
|
||||
Tests: launch verification, topic rates, services, GPU memory budget,
|
||||
end-to-end latency profiling, graceful shutdown.
|
||||
CI-compatible via Docker + mock sensor publishers.
|
||||
</description>
|
||||
<maintainer email="seb@vayrette.com">sl-jetson</maintainer>
|
||||
<license>MIT</license>
|
||||
|
||||
<!-- Runtime deps for test nodes -->
|
||||
<depend>rclpy</depend>
|
||||
<depend>std_msgs</depend>
|
||||
<depend>sensor_msgs</depend>
|
||||
<depend>geometry_msgs</depend>
|
||||
<depend>nav_msgs</depend>
|
||||
|
||||
<!-- Social-bot packages under test -->
|
||||
<depend>saltybot_social</depend>
|
||||
<depend>saltybot_social_msgs</depend>
|
||||
<depend>saltybot_social_face</depend>
|
||||
<depend>saltybot_social_nav</depend>
|
||||
<depend>saltybot_social_enrollment</depend>
|
||||
<depend>saltybot_social_tracking</depend>
|
||||
<depend>saltybot_social_personality</depend>
|
||||
|
||||
<!-- Test framework -->
|
||||
<test_depend>ament_copyright</test_depend>
|
||||
<test_depend>ament_flake8</test_depend>
|
||||
<test_depend>ament_pep257</test_depend>
|
||||
<test_depend>python3-pytest</test_depend>
|
||||
<test_depend>launch_testing</test_depend>
|
||||
<test_depend>launch_testing_ament_cmake</test_depend>
|
||||
<test_depend>launch_testing_ros</test_depend>
|
||||
|
||||
<export>
|
||||
<build_type>ament_python</build_type>
|
||||
</export>
|
||||
</package>
|
||||
27
jetson/ros2_ws/src/saltybot_social_tests/pytest.ini
Normal file
27
jetson/ros2_ws/src/saltybot_social_tests/pytest.ini
Normal file
@ -0,0 +1,27 @@
|
||||
[pytest]
|
||||
# social-bot integration test configuration
|
||||
|
||||
# Markers registered here to silence PytestUnknownMarkWarning
|
||||
markers =
|
||||
gpu_required: test requires an NVIDIA GPU
|
||||
full_stack: test requires SOCIAL_TEST_FULL=1
|
||||
stack_running: test requires SOCIAL_STACK_RUNNING=1
|
||||
slow: test takes more than 30 seconds
|
||||
launch_test: launch_testing integration test
|
||||
|
||||
# Default test timeout (seconds) — individual tests can override
|
||||
timeout = 120
|
||||
|
||||
# Show locals on failure for easier debugging
|
||||
showlocals = false
|
||||
|
||||
# Brief summary: failed (f) + errors (e) + skipped (s) + passed (p)
|
||||
addopts = -ra --tb=short
|
||||
|
||||
# Test paths
|
||||
testpaths = test
|
||||
|
||||
# Filters
|
||||
filterwarnings =
|
||||
ignore::DeprecationWarning:launch_testing
|
||||
ignore::UserWarning:rclpy
|
||||
@ -0,0 +1 @@
|
||||
# saltybot_social_tests — integration test harness for social-bot pipeline
|
||||
@ -0,0 +1,228 @@
|
||||
"""mock_sensors.py — Mock sensor publishers for CI integration tests.
|
||||
|
||||
Publishes minimal valid data on all topics that social-bot nodes subscribe to,
|
||||
so the pipeline can start and produce output without real hardware.
|
||||
|
||||
Topics published:
|
||||
/camera/color/image_raw — blank 640×480 RGB8 image at 15Hz
|
||||
/social/speech/transcript — canned SpeechTranscript at 0.5Hz
|
||||
/social/faces/detections — empty FaceDetectionArray at 15Hz
|
||||
/social/tracking/fused_target — synthetic FusedTarget at 10Hz
|
||||
/social/persons — single synthetic PersonStateArray at 5Hz
|
||||
/uwb/target — synthetic PoseStamped at 10Hz
|
||||
|
||||
Usage (standalone):
|
||||
ros2 run saltybot_social_tests mock_sensor_pub
|
||||
|
||||
Usage (programmatic):
|
||||
from saltybot_social_tests.mock_sensors import MockSensorPublisher
|
||||
pub = MockSensorPublisher(node)
|
||||
pub.start()
|
||||
...
|
||||
pub.stop()
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
import time
|
||||
import struct
|
||||
from typing import Optional
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSDurabilityPolicy
|
||||
|
||||
from std_msgs.msg import Header, String
|
||||
from sensor_msgs.msg import Image
|
||||
from geometry_msgs.msg import PoseStamped, Point, Quaternion, Vector3
|
||||
|
||||
|
||||
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||||
|
||||
def _now_header(node: Node, frame_id: str = "camera_color_optical_frame") -> Header:
|
||||
h = Header()
|
||||
h.stamp = node.get_clock().now().to_msg()
|
||||
h.frame_id = frame_id
|
||||
return h
|
||||
|
||||
|
||||
def _blank_rgb_image(node: Node, width: int = 640, height: int = 480) -> Image:
|
||||
img = Image()
|
||||
img.header = _now_header(node)
|
||||
img.width = width
|
||||
img.height = height
|
||||
img.encoding = "rgb8"
|
||||
img.step = width * 3
|
||||
img.data = bytes(width * height * 3) # all-black frame
|
||||
return img
|
||||
|
||||
|
||||
# ── Mock publisher class ──────────────────────────────────────────────────────
|
||||
|
||||
class MockSensorPublisher:
|
||||
"""Publishes mock sensor data for all social-bot pipeline inputs."""
|
||||
|
||||
def __init__(self, node: Node) -> None:
|
||||
self._node = node
|
||||
self._running = False
|
||||
self._threads: list = []
|
||||
|
||||
be_qos = QoSProfile(
|
||||
reliability=QoSReliabilityPolicy.BEST_EFFORT, depth=5
|
||||
)
|
||||
rel_qos = QoSProfile(
|
||||
reliability=QoSReliabilityPolicy.RELIABLE, depth=10
|
||||
)
|
||||
|
||||
# Camera image (required by face detection + segmentation)
|
||||
self._img_pub = node.create_publisher(Image, "/camera/color/image_raw", be_qos)
|
||||
|
||||
# Fused tracking target (required by social_nav, person_follower)
|
||||
try:
|
||||
from saltybot_social_msgs.msg import FusedTarget, FaceDetectionArray, PersonStateArray
|
||||
self._fused_pub = node.create_publisher(
|
||||
FusedTarget, "/social/tracking/fused_target", rel_qos
|
||||
)
|
||||
self._faces_pub = node.create_publisher(
|
||||
FaceDetectionArray, "/social/faces/detections", be_qos
|
||||
)
|
||||
self._persons_pub = node.create_publisher(
|
||||
PersonStateArray, "/social/persons", rel_qos
|
||||
)
|
||||
self._has_social_msgs = True
|
||||
except ImportError:
|
||||
self._has_social_msgs = False
|
||||
node.get_logger().warn("saltybot_social_msgs not found — limited mock data")
|
||||
|
||||
try:
|
||||
from saltybot_social_msgs.msg import SpeechTranscript
|
||||
self._transcript_pub = node.create_publisher(
|
||||
SpeechTranscript, "/social/speech/transcript", rel_qos
|
||||
)
|
||||
self._has_speech_msgs = True
|
||||
except ImportError:
|
||||
self._has_speech_msgs = False
|
||||
|
||||
# UWB target (required by person_follower fusion)
|
||||
self._uwb_pub = node.create_publisher(PoseStamped, "/uwb/target", rel_qos)
|
||||
|
||||
self._canned_phrases = [
|
||||
"Hello Salty, how are you?",
|
||||
"What time is it?",
|
||||
"Follow me please.",
|
||||
"Tell me a joke.",
|
||||
]
|
||||
self._phrase_idx = 0
|
||||
|
||||
def start(self) -> None:
|
||||
"""Start all mock publisher threads."""
|
||||
self._running = True
|
||||
specs = [
|
||||
("camera", self._pub_camera, 1.0 / 15.0),
|
||||
("faces", self._pub_faces, 1.0 / 15.0),
|
||||
("fused", self._pub_fused, 1.0 / 10.0),
|
||||
("persons", self._pub_persons, 1.0 / 5.0),
|
||||
("uwb", self._pub_uwb, 1.0 / 10.0),
|
||||
("transcript",self._pub_transcript, 2.0),
|
||||
]
|
||||
for name, fn, interval in specs:
|
||||
t = threading.Thread(
|
||||
target=self._loop, args=(fn, interval),
|
||||
name=f"mock_{name}", daemon=True
|
||||
)
|
||||
t.start()
|
||||
self._threads.append(t)
|
||||
|
||||
def stop(self) -> None:
|
||||
self._running = False
|
||||
|
||||
def _loop(self, publish_fn, interval_s: float) -> None:
|
||||
while self._running:
|
||||
try:
|
||||
publish_fn()
|
||||
except Exception:
|
||||
pass
|
||||
time.sleep(interval_s)
|
||||
|
||||
def _pub_camera(self) -> None:
|
||||
self._img_pub.publish(_blank_rgb_image(self._node))
|
||||
|
||||
def _pub_faces(self) -> None:
|
||||
if not self._has_social_msgs:
|
||||
return
|
||||
from saltybot_social_msgs.msg import FaceDetectionArray
|
||||
msg = FaceDetectionArray()
|
||||
msg.header = _now_header(self._node)
|
||||
# Empty array — camera is "working" but no face in frame
|
||||
self._faces_pub.publish(msg)
|
||||
|
||||
def _pub_fused(self) -> None:
|
||||
if not self._has_social_msgs:
|
||||
return
|
||||
from saltybot_social_msgs.msg import FusedTarget
|
||||
msg = FusedTarget()
|
||||
msg.header = _now_header(self._node, "base_link")
|
||||
msg.position.x = 1.5 # 1.5m ahead
|
||||
msg.position.y = 0.0
|
||||
msg.velocity.x = 0.0
|
||||
msg.track_id = 1
|
||||
msg.confidence = 0.85
|
||||
self._fused_pub.publish(msg)
|
||||
|
||||
def _pub_persons(self) -> None:
|
||||
if not self._has_social_msgs:
|
||||
return
|
||||
from saltybot_social_msgs.msg import PersonStateArray, PersonState
|
||||
msg = PersonStateArray()
|
||||
msg.header = _now_header(self._node, "base_link")
|
||||
p = PersonState()
|
||||
p.header = msg.header
|
||||
p.person_id = 1
|
||||
p.person_name = "TestPerson"
|
||||
p.state = PersonState.STATE_ENGAGED
|
||||
p.distance = 1.5
|
||||
p.bearing_deg = 0.0
|
||||
p.engagement_score = 0.8
|
||||
msg.persons = [p]
|
||||
self._persons_pub.publish(msg)
|
||||
|
||||
def _pub_uwb(self) -> None:
|
||||
msg = PoseStamped()
|
||||
msg.header = _now_header(self._node, "base_link")
|
||||
msg.pose.position.x = 1.5
|
||||
msg.pose.position.y = 0.1
|
||||
msg.pose.orientation.w = 1.0
|
||||
self._uwb_pub.publish(msg)
|
||||
|
||||
def _pub_transcript(self) -> None:
|
||||
if not self._has_speech_msgs:
|
||||
return
|
||||
from saltybot_social_msgs.msg import SpeechTranscript
|
||||
msg = SpeechTranscript()
|
||||
msg.header = _now_header(self._node)
|
||||
msg.text = self._canned_phrases[self._phrase_idx % len(self._canned_phrases)]
|
||||
msg.speaker_id = "test_person"
|
||||
msg.confidence = 0.95
|
||||
msg.audio_duration = 1.2
|
||||
msg.is_partial = False
|
||||
self._transcript_pub.publish(msg)
|
||||
self._phrase_idx += 1
|
||||
|
||||
|
||||
# ── Standalone entry point ────────────────────────────────────────────────────
|
||||
|
||||
def main(args=None) -> None:
|
||||
rclpy.init(args=args)
|
||||
node = rclpy.create_node("mock_sensor_pub")
|
||||
pub = MockSensorPublisher(node)
|
||||
pub.start()
|
||||
node.get_logger().info("Mock sensor publisher running — Ctrl+C to stop")
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
pub.stop()
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
@ -0,0 +1,262 @@
|
||||
"""test_helpers.py — Shared helpers for social-bot integration tests.
|
||||
|
||||
No hardware dependencies. Provides:
|
||||
- TopicRateChecker: measure actual publish rate on a topic
|
||||
- NodeChecker: verify a named node is alive in the graph
|
||||
- ServiceChecker: call a service and verify it responds
|
||||
- GpuMemoryChecker: measure GPU allocation via pycuda / nvidia-smi
|
||||
- TimeoutPoller: generic poll-until-true with timeout
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import subprocess
|
||||
import threading
|
||||
import time
|
||||
from typing import Any, Callable, List, Optional
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.qos import (
|
||||
QoSProfile, QoSReliabilityPolicy, QoSDurabilityPolicy, QoSHistoryPolicy
|
||||
)
|
||||
|
||||
|
||||
# ── Timeout poller ────────────────────────────────────────────────────────────
|
||||
|
||||
def poll_until(
|
||||
condition: Callable[[], bool],
|
||||
timeout_s: float = 30.0,
|
||||
interval_s: float = 0.5,
|
||||
) -> bool:
|
||||
"""Return True if condition() becomes True within timeout_s, else False."""
|
||||
deadline = time.time() + timeout_s
|
||||
while time.time() < deadline:
|
||||
if condition():
|
||||
return True
|
||||
time.sleep(interval_s)
|
||||
return False
|
||||
|
||||
|
||||
# ── Node checker ──────────────────────────────────────────────────────────────
|
||||
|
||||
class NodeChecker:
|
||||
"""Check whether ROS2 nodes are alive in the graph."""
|
||||
|
||||
def __init__(self, probe_node: Node) -> None:
|
||||
self._node = probe_node
|
||||
|
||||
def is_alive(self, node_name: str, namespace: str = "/") -> bool:
|
||||
"""Return True if node_name is present in the node graph."""
|
||||
names_and_ns = self._node.get_node_names_and_namespaces()
|
||||
for name, ns in names_and_ns:
|
||||
if name == node_name:
|
||||
return True
|
||||
if ns.rstrip("/") + "/" + name == node_name:
|
||||
return True
|
||||
return False
|
||||
|
||||
def wait_for_nodes(
|
||||
self,
|
||||
node_names: List[str],
|
||||
timeout_s: float = 30.0,
|
||||
) -> dict:
|
||||
"""Block until all nodes appear or timeout. Returns {name: found}."""
|
||||
results = {n: False for n in node_names}
|
||||
deadline = time.time() + timeout_s
|
||||
while time.time() < deadline:
|
||||
for name in node_names:
|
||||
if not results[name]:
|
||||
results[name] = self.is_alive(name)
|
||||
if all(results.values()):
|
||||
break
|
||||
time.sleep(0.5)
|
||||
return results
|
||||
|
||||
|
||||
# ── Topic rate checker ─────────────────────────────────────────────────────────
|
||||
|
||||
class TopicRateChecker:
|
||||
"""Measure the actual publish rate on a topic over a measurement window.
|
||||
|
||||
Usage:
|
||||
checker = TopicRateChecker(node, "/social/faces/detections",
|
||||
FaceDetectionArray, window_s=3.0)
|
||||
checker.start()
|
||||
time.sleep(5.0)
|
||||
hz = checker.measured_hz()
|
||||
checker.stop()
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
node: Node,
|
||||
topic: str,
|
||||
msg_type: Any,
|
||||
window_s: float = 3.0,
|
||||
qos: Optional[QoSProfile] = None,
|
||||
) -> None:
|
||||
self._window_s = window_s
|
||||
self._timestamps: List[float] = []
|
||||
self._lock = threading.Lock()
|
||||
self._running = False
|
||||
|
||||
if qos is None:
|
||||
qos = QoSProfile(
|
||||
reliability=QoSReliabilityPolicy.BEST_EFFORT,
|
||||
durability=QoSDurabilityPolicy.VOLATILE,
|
||||
history=QoSHistoryPolicy.KEEP_LAST,
|
||||
depth=10,
|
||||
)
|
||||
|
||||
self._sub = node.create_subscription(
|
||||
msg_type, topic, self._cb, qos
|
||||
)
|
||||
|
||||
def _cb(self, _msg: Any) -> None:
|
||||
now = time.time()
|
||||
with self._lock:
|
||||
self._timestamps.append(now)
|
||||
# Prune old samples outside window
|
||||
cutoff = now - self._window_s * 2
|
||||
self._timestamps = [t for t in self._timestamps if t > cutoff]
|
||||
|
||||
def measured_hz(self) -> float:
|
||||
"""Return Hz measured over the last window_s seconds."""
|
||||
now = time.time()
|
||||
cutoff = now - self._window_s
|
||||
with self._lock:
|
||||
recent = [t for t in self._timestamps if t > cutoff]
|
||||
if len(recent) < 2:
|
||||
return 0.0
|
||||
return (len(recent) - 1) / (recent[-1] - recent[0])
|
||||
|
||||
def message_count(self) -> int:
|
||||
"""Total messages received."""
|
||||
with self._lock:
|
||||
return len(self._timestamps)
|
||||
|
||||
def received_any(self) -> bool:
|
||||
with self._lock:
|
||||
return len(self._timestamps) > 0
|
||||
|
||||
|
||||
# ── Service checker ────────────────────────────────────────────────────────────
|
||||
|
||||
class ServiceChecker:
|
||||
"""Verify a ROS2 service is available and call it."""
|
||||
|
||||
def __init__(self, node: Node) -> None:
|
||||
self._node = node
|
||||
|
||||
def is_available(self, service_name: str, srv_type: Any,
|
||||
timeout_s: float = 5.0) -> bool:
|
||||
"""Return True if the service becomes available within timeout_s."""
|
||||
client = self._node.create_client(srv_type, service_name)
|
||||
available = client.wait_for_service(timeout_sec=timeout_s)
|
||||
self._node.destroy_client(client)
|
||||
return available
|
||||
|
||||
def call_service(
|
||||
self,
|
||||
service_name: str,
|
||||
srv_type: Any,
|
||||
request: Any,
|
||||
timeout_s: float = 10.0,
|
||||
) -> Optional[Any]:
|
||||
"""Call a service and return the response, or None on failure."""
|
||||
import rclpy.executors
|
||||
client = self._node.create_client(srv_type, service_name)
|
||||
if not client.wait_for_service(timeout_sec=timeout_s):
|
||||
self._node.destroy_client(client)
|
||||
return None
|
||||
future = client.call_async(request)
|
||||
# Spin until done
|
||||
deadline = time.time() + timeout_s
|
||||
while not future.done() and time.time() < deadline:
|
||||
rclpy.spin_once(self._node, timeout_sec=0.1)
|
||||
self._node.destroy_client(client)
|
||||
if future.done():
|
||||
return future.result()
|
||||
return None
|
||||
|
||||
|
||||
# ── GPU memory checker ────────────────────────────────────────────────────────
|
||||
|
||||
class GpuMemoryChecker:
|
||||
"""Query GPU memory usage. Gracefully no-ops if no GPU available."""
|
||||
|
||||
@staticmethod
|
||||
def total_mb() -> Optional[float]:
|
||||
"""Total GPU memory in MB."""
|
||||
return GpuMemoryChecker._query("memory.total")
|
||||
|
||||
@staticmethod
|
||||
def free_mb() -> Optional[float]:
|
||||
"""Free GPU memory in MB."""
|
||||
return GpuMemoryChecker._query("memory.free")
|
||||
|
||||
@staticmethod
|
||||
def used_mb() -> Optional[float]:
|
||||
"""Used GPU memory in MB."""
|
||||
return GpuMemoryChecker._query("memory.used")
|
||||
|
||||
@staticmethod
|
||||
def _query(field: str) -> Optional[float]:
|
||||
try:
|
||||
out = subprocess.check_output(
|
||||
["nvidia-smi", f"--query-gpu={field}",
|
||||
"--format=csv,noheader,nounits"],
|
||||
timeout=5,
|
||||
).decode().strip()
|
||||
return float(out.split("\n")[0].strip())
|
||||
except Exception:
|
||||
pass
|
||||
# Fallback: pycuda
|
||||
try:
|
||||
import pycuda.driver as drv
|
||||
drv.init()
|
||||
ctx = drv.Device(0).make_context()
|
||||
free_b, total_b = drv.mem_get_info()
|
||||
ctx.pop()
|
||||
if field == "memory.free":
|
||||
return free_b / 1024 / 1024
|
||||
elif field == "memory.total":
|
||||
return total_b / 1024 / 1024
|
||||
elif field == "memory.used":
|
||||
return (total_b - free_b) / 1024 / 1024
|
||||
except Exception:
|
||||
pass
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def gpu_available() -> bool:
|
||||
return GpuMemoryChecker.total_mb() is not None
|
||||
|
||||
|
||||
# ── Process helpers ───────────────────────────────────────────────────────────
|
||||
|
||||
def count_zombie_processes(pattern: str = "ros") -> int:
|
||||
"""Count zombie processes matching pattern via ps."""
|
||||
try:
|
||||
out = subprocess.check_output(
|
||||
["ps", "aux"], timeout=5
|
||||
).decode()
|
||||
count = 0
|
||||
for line in out.splitlines():
|
||||
if "<defunct>" in line and pattern in line:
|
||||
count += 1
|
||||
return count
|
||||
except Exception:
|
||||
return 0
|
||||
|
||||
|
||||
def get_process_pids(name_pattern: str) -> List[int]:
|
||||
"""Return PIDs of processes matching name_pattern."""
|
||||
try:
|
||||
out = subprocess.check_output(
|
||||
["pgrep", "-f", name_pattern], timeout=5
|
||||
).decode().strip()
|
||||
return [int(p) for p in out.splitlines() if p.strip()]
|
||||
except Exception:
|
||||
return []
|
||||
4
jetson/ros2_ws/src/saltybot_social_tests/setup.cfg
Normal file
4
jetson/ros2_ws/src/saltybot_social_tests/setup.cfg
Normal file
@ -0,0 +1,4 @@
|
||||
[develop]
|
||||
script_dir=$base/lib/saltybot_social_tests
|
||||
[install]
|
||||
install_scripts=$base/lib/saltybot_social_tests
|
||||
30
jetson/ros2_ws/src/saltybot_social_tests/setup.py
Normal file
30
jetson/ros2_ws/src/saltybot_social_tests/setup.py
Normal file
@ -0,0 +1,30 @@
|
||||
from setuptools import find_packages, setup
|
||||
import os
|
||||
from glob import glob
|
||||
|
||||
package_name = 'saltybot_social_tests'
|
||||
|
||||
setup(
|
||||
name=package_name,
|
||||
version='0.1.0',
|
||||
packages=find_packages(exclude=['test']),
|
||||
data_files=[
|
||||
('share/ament_index/resource_index/packages',
|
||||
['resource/' + package_name]),
|
||||
('share/' + package_name, ['package.xml']),
|
||||
(os.path.join('share', package_name, 'launch'),
|
||||
glob(os.path.join('launch', '*launch.[pxy][yma]*'))),
|
||||
],
|
||||
install_requires=['setuptools'],
|
||||
zip_safe=True,
|
||||
maintainer='sl-jetson',
|
||||
maintainer_email='seb@vayrette.com',
|
||||
description='Integration test suite for social-bot pipeline (Issue #108)',
|
||||
license='MIT',
|
||||
tests_require=['pytest'],
|
||||
entry_points={
|
||||
'console_scripts': [
|
||||
'mock_sensor_pub = saltybot_social_tests.mock_sensors:main',
|
||||
],
|
||||
},
|
||||
)
|
||||
113
jetson/ros2_ws/src/saltybot_social_tests/test/conftest.py
Normal file
113
jetson/ros2_ws/src/saltybot_social_tests/test/conftest.py
Normal file
@ -0,0 +1,113 @@
|
||||
"""conftest.py — pytest configuration and shared fixtures for social-bot tests.
|
||||
|
||||
Custom markers:
|
||||
@pytest.mark.gpu_required — skip if no GPU / nvidia-smi
|
||||
@pytest.mark.full_stack — skip unless SOCIAL_TEST_FULL=1
|
||||
@pytest.mark.stack_running — skip unless SOCIAL_STACK_RUNNING=1
|
||||
@pytest.mark.slow — long tests (>30s); skipped in fast mode
|
||||
@pytest.mark.launch_test — launch_testing marker (framework)
|
||||
|
||||
Fixtures:
|
||||
rclpy_node — temporary rclpy probe node (auto-shutdown)
|
||||
ros_domain_id — asserts ROS_DOMAIN_ID=108 for test isolation
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
# ── Custom markers ────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def pytest_configure(config):
|
||||
config.addinivalue_line("markers",
|
||||
"gpu_required: test requires an NVIDIA GPU (skipped in headless CI)")
|
||||
config.addinivalue_line("markers",
|
||||
"full_stack: test requires SOCIAL_TEST_FULL=1 (full speech/LLM/TTS)")
|
||||
config.addinivalue_line("markers",
|
||||
"stack_running: test requires SOCIAL_STACK_RUNNING=1")
|
||||
config.addinivalue_line("markers",
|
||||
"slow: test takes more than 30 seconds")
|
||||
config.addinivalue_line("markers",
|
||||
"launch_test: launch_testing integration test")
|
||||
|
||||
|
||||
def pytest_collection_modifyitems(config, items):
|
||||
"""Automatically skip tests whose markers are not satisfied."""
|
||||
full_stack = os.environ.get("SOCIAL_TEST_FULL", "0") == "1"
|
||||
stack_running = os.environ.get("SOCIAL_STACK_RUNNING", "0") == "1"
|
||||
fast_mode = os.environ.get("CI_FAST", "0") == "1"
|
||||
|
||||
gpu_ok = _nvidia_smi_available()
|
||||
|
||||
for item in items:
|
||||
if item.get_closest_marker("gpu_required") and not gpu_ok:
|
||||
item.add_marker(pytest.mark.skip(
|
||||
reason="No GPU available (set SOCIAL_STACK_RUNNING=1 on hardware)"
|
||||
))
|
||||
if item.get_closest_marker("full_stack") and not full_stack:
|
||||
item.add_marker(pytest.mark.skip(
|
||||
reason="Set SOCIAL_TEST_FULL=1 to enable full-stack tests"
|
||||
))
|
||||
if item.get_closest_marker("stack_running") and not stack_running:
|
||||
item.add_marker(pytest.mark.skip(
|
||||
reason="Set SOCIAL_STACK_RUNNING=1 to enable stack-running tests"
|
||||
))
|
||||
if item.get_closest_marker("slow") and fast_mode:
|
||||
item.add_marker(pytest.mark.skip(
|
||||
reason="Slow test skipped in CI_FAST=1 mode"
|
||||
))
|
||||
|
||||
|
||||
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||||
|
||||
def _nvidia_smi_available() -> bool:
|
||||
try:
|
||||
subprocess.check_output(
|
||||
["nvidia-smi", "--query-gpu=name", "--format=csv,noheader"],
|
||||
timeout=5, stderr=subprocess.DEVNULL,
|
||||
)
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
# ── Fixtures ──────────────────────────────────────────────────────────────────
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def ros_domain_id():
|
||||
"""Assert ROS_DOMAIN_ID is set to 108 for test isolation."""
|
||||
domain = os.environ.get("ROS_DOMAIN_ID", "0")
|
||||
assert domain == "108", (
|
||||
f"ROS_DOMAIN_ID must be '108' for isolated test runs (got '{domain}'). "
|
||||
"Set: export ROS_DOMAIN_ID=108"
|
||||
)
|
||||
return int(domain)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def rclpy_node(request):
|
||||
"""Provide a temporary rclpy probe node; auto-teardown after each test."""
|
||||
import rclpy
|
||||
if not rclpy.ok():
|
||||
rclpy.init()
|
||||
node_name = f"test_probe_{request.node.name[:20].replace('/', '_')}"
|
||||
node = rclpy.create_node(node_name)
|
||||
yield node
|
||||
node.destroy_node()
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def rclpy_session():
|
||||
"""Init/shutdown rclpy once for the entire test session."""
|
||||
import rclpy
|
||||
if not rclpy.ok():
|
||||
rclpy.init()
|
||||
yield
|
||||
if rclpy.ok():
|
||||
rclpy.shutdown()
|
||||
197
jetson/ros2_ws/src/saltybot_social_tests/test/test_gpu_memory.py
Normal file
197
jetson/ros2_ws/src/saltybot_social_tests/test/test_gpu_memory.py
Normal file
@ -0,0 +1,197 @@
|
||||
"""test_gpu_memory.py — GPU memory budget tests for social-bot stack.
|
||||
|
||||
Requirements (Orin Nano Super 8GB shared memory):
|
||||
- Full stack allocation must stay under 6GB total GPU usage
|
||||
- Each major model budget:
|
||||
Face detection (SCRFD-10GF TRT): < 100 MB
|
||||
Face recognition (ArcFace TRT): < 300 MB
|
||||
Speaker embedding (ECAPA-TDNN TRT): < 200 MB
|
||||
Whisper STT (CTranslate2): < 700 MB
|
||||
LLM (Phi-3-mini Q4 GGUF, 20 layers): < 2500 MB
|
||||
Piper TTS (ONNX CPU): < 200 MB
|
||||
Tracking (Kalman, CPU): < 10 MB
|
||||
─────────────────────────────────────────────────
|
||||
Total budget: < 6000 MB (6 GB)
|
||||
Reserve for OS + ROS2: 2000 MB (2 GB)
|
||||
|
||||
Tests:
|
||||
1. GPU available check (skip gracefully in CI without GPU)
|
||||
2. Used memory at baseline (before stack loads) < 1GB
|
||||
3. Used memory during full stack < 6GB
|
||||
4. Memory does not grow continuously (no leak over 30s)
|
||||
5. After shutdown, memory returns to near-baseline
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import unittest
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
|
||||
from saltybot_social_tests.test_helpers import GpuMemoryChecker
|
||||
|
||||
# ── Budget constants ──────────────────────────────────────────────────────────
|
||||
GPU_TOTAL_MB_EXPECTED = 8192.0 # Orin Nano Super 8GB
|
||||
GPU_STACK_BUDGET_MB = 6000.0 # full stack must stay under 6GB used
|
||||
GPU_BASELINE_MB = 1024.0 # baseline (OS + CUDA context) < 1GB
|
||||
GPU_LEAK_TOLERANCE_MB = 200.0 # acceptable growth over 30s measurement window
|
||||
|
||||
# Per-model expected budgets (for documentation / CI without full stack)
|
||||
MODEL_BUDGETS_MB = {
|
||||
"scrfd_10g_trt": 100,
|
||||
"arcface_r100_trt": 300,
|
||||
"ecapa_tdnn_trt": 200,
|
||||
"whisper_small": 700,
|
||||
"phi3_mini_gguf": 2500,
|
||||
"piper_tts": 200,
|
||||
"tracking": 10,
|
||||
}
|
||||
TOTAL_MODEL_BUDGET_MB = sum(MODEL_BUDGETS_MB.values()) # 4010 MB
|
||||
|
||||
|
||||
def _skip_if_no_gpu():
|
||||
if not GpuMemoryChecker.gpu_available():
|
||||
pytest.skip("No GPU available — skipping GPU memory tests")
|
||||
|
||||
|
||||
class TestGpuMemoryBudget(unittest.TestCase):
|
||||
"""GPU memory budget tests for the full social-bot stack."""
|
||||
|
||||
def test_gpu_is_available_or_ci(self):
|
||||
"""GPU check: either GPU is available, or we're in CI (skip gracefully)."""
|
||||
if os.environ.get("CI", ""):
|
||||
# In CI without GPU, just log and pass
|
||||
print("[INFO] CI mode detected — GPU memory tests will be skipped if no GPU")
|
||||
available = GpuMemoryChecker.gpu_available()
|
||||
if not available:
|
||||
self.skipTest("No GPU available")
|
||||
total = GpuMemoryChecker.total_mb()
|
||||
self.assertIsNotNone(total)
|
||||
print(f"[INFO] GPU total: {total:.0f} MB")
|
||||
|
||||
def test_gpu_total_memory_matches_orin(self):
|
||||
"""Total GPU memory should be ~8GB on Orin Nano Super."""
|
||||
_skip_if_no_gpu()
|
||||
total = GpuMemoryChecker.total_mb()
|
||||
# Allow ±10% from 8192MB (shared memory varies by config)
|
||||
self.assertGreater(total, GPU_TOTAL_MB_EXPECTED * 0.5,
|
||||
f"GPU total {total:.0f}MB seems too low for Orin")
|
||||
|
||||
def test_baseline_memory_under_threshold(self):
|
||||
"""Baseline GPU usage (before models load) must be < 1GB."""
|
||||
_skip_if_no_gpu()
|
||||
used = GpuMemoryChecker.used_mb()
|
||||
self.assertIsNotNone(used)
|
||||
print(f"[INFO] Baseline GPU used: {used:.0f} MB")
|
||||
# Note: if running after stack is up, this test would need stack to be down
|
||||
# In standalone mode (no stack running), assert low baseline
|
||||
if os.environ.get("SOCIAL_STACK_RUNNING", "0") == "0":
|
||||
self.assertLess(
|
||||
used, GPU_BASELINE_MB,
|
||||
f"Baseline GPU usage {used:.0f}MB >= {GPU_BASELINE_MB}MB"
|
||||
)
|
||||
|
||||
def test_full_stack_memory_under_budget(self):
|
||||
"""Full stack GPU usage must stay under 6GB."""
|
||||
_skip_if_no_gpu()
|
||||
if os.environ.get("SOCIAL_STACK_RUNNING", "0") != "1":
|
||||
self.skipTest("Set SOCIAL_STACK_RUNNING=1 to run with stack active")
|
||||
|
||||
used = GpuMemoryChecker.used_mb()
|
||||
self.assertIsNotNone(used)
|
||||
print(f"[INFO] Stack GPU used: {used:.0f} MB / {GPU_STACK_BUDGET_MB:.0f} MB budget")
|
||||
self.assertLess(
|
||||
used, GPU_STACK_BUDGET_MB,
|
||||
f"GPU usage {used:.0f}MB exceeds {GPU_STACK_BUDGET_MB:.0f}MB budget"
|
||||
)
|
||||
|
||||
def test_no_memory_leak_over_30s(self):
|
||||
"""GPU memory must not grow by more than 200MB over 30 seconds."""
|
||||
_skip_if_no_gpu()
|
||||
if os.environ.get("SOCIAL_STACK_RUNNING", "0") != "1":
|
||||
self.skipTest("Set SOCIAL_STACK_RUNNING=1 to run with stack active")
|
||||
|
||||
used_start = GpuMemoryChecker.used_mb()
|
||||
print(f"[INFO] GPU used at start: {used_start:.0f} MB")
|
||||
|
||||
# Wait 30s while stack processes mock sensor data
|
||||
time.sleep(30.0)
|
||||
|
||||
used_end = GpuMemoryChecker.used_mb()
|
||||
growth = used_end - used_start
|
||||
print(f"[INFO] GPU used after 30s: {used_end:.0f} MB (growth: {growth:+.0f} MB)")
|
||||
|
||||
self.assertLess(
|
||||
growth, GPU_LEAK_TOLERANCE_MB,
|
||||
f"GPU memory grew by {growth:.0f}MB over 30s — possible leak"
|
||||
)
|
||||
|
||||
def test_memory_frees_after_context_release(self):
|
||||
"""Test that pycuda context releases memory properly."""
|
||||
_skip_if_no_gpu()
|
||||
try:
|
||||
import pycuda.driver as drv
|
||||
drv.init()
|
||||
|
||||
ctx = drv.Device(0).make_context()
|
||||
free_in, total_in = drv.mem_get_info()
|
||||
|
||||
# Allocate 100MB
|
||||
alloc = drv.mem_alloc(100 * 1024 * 1024)
|
||||
free_after_alloc, _ = drv.mem_get_info()
|
||||
growth = (free_in - free_after_alloc) / 1024 / 1024
|
||||
self.assertGreater(growth, 50,
|
||||
"100MB allocation didn't register in GPU memory")
|
||||
|
||||
# Free it
|
||||
del alloc
|
||||
free_after_free, _ = drv.mem_get_info()
|
||||
recovered = (free_after_free - free_after_alloc) / 1024 / 1024
|
||||
ctx.pop()
|
||||
|
||||
self.assertGreater(recovered, 50,
|
||||
f"Only {recovered:.0f}MB recovered after freeing 100MB alloc")
|
||||
except ImportError:
|
||||
self.skipTest("pycuda not available")
|
||||
|
||||
|
||||
class TestModelBudgetDocumentation(unittest.TestCase):
|
||||
"""Verify total model budget is within spec (pure math, no GPU needed)."""
|
||||
|
||||
def test_total_model_budget_under_6gb(self):
|
||||
"""Sum of all model budgets must be < 6000MB."""
|
||||
total = sum(MODEL_BUDGETS_MB.values())
|
||||
self.assertLess(
|
||||
total, GPU_STACK_BUDGET_MB,
|
||||
f"Model budgets sum to {total}MB, exceeds {GPU_STACK_BUDGET_MB}MB limit"
|
||||
)
|
||||
|
||||
def test_individual_model_budgets_reasonable(self):
|
||||
"""No single model should claim more than 3GB."""
|
||||
for model, budget in MODEL_BUDGETS_MB.items():
|
||||
self.assertLess(
|
||||
budget, 3000,
|
||||
f"Model {model} budget {budget}MB seems unreasonably high"
|
||||
)
|
||||
|
||||
def test_budget_includes_all_models(self):
|
||||
"""Verify all expected models are accounted for in the budget."""
|
||||
expected_keys = {
|
||||
"scrfd_10g_trt", "arcface_r100_trt", "ecapa_tdnn_trt",
|
||||
"whisper_small", "phi3_mini_gguf", "piper_tts", "tracking"
|
||||
}
|
||||
self.assertEqual(set(MODEL_BUDGETS_MB.keys()), expected_keys)
|
||||
|
||||
def test_budget_summary(self):
|
||||
"""Print human-readable budget summary."""
|
||||
print("\n[GPU Budget Summary]")
|
||||
for model, mb in sorted(MODEL_BUDGETS_MB.items(), key=lambda x: -x[1]):
|
||||
bar = "█" * (mb // 100)
|
||||
print(f" {model:<25} {mb:>5} MB {bar}")
|
||||
print(f" {'TOTAL':<25} {TOTAL_MODEL_BUDGET_MB:>5} MB")
|
||||
print(f" {'LIMIT':<25} {GPU_STACK_BUDGET_MB:>5} MB")
|
||||
print(f" {'RESERVE':<25} {GPU_STACK_BUDGET_MB - TOTAL_MODEL_BUDGET_MB:>5} MB")
|
||||
323
jetson/ros2_ws/src/saltybot_social_tests/test/test_latency.py
Normal file
323
jetson/ros2_ws/src/saltybot_social_tests/test/test_latency.py
Normal file
@ -0,0 +1,323 @@
|
||||
"""test_latency.py — End-to-end latency profiling for social-bot pipeline.
|
||||
|
||||
Pipeline stages and SLAs:
|
||||
Stage SLA (p95) Notes
|
||||
───────────────────────────── ───────── ─────────────────────────────────
|
||||
wake_word → VAD onset < 100ms OpenWakeWord inference
|
||||
VAD onset → transcript (STT) < 500ms Whisper small, 1s utterance
|
||||
transcript → LLM first token < 500ms Phi-3-mini GGUF, TTFT
|
||||
LLM first token → TTS first < 200ms Piper synthesis, first sentence
|
||||
─────────────────────────────────────────────────────────────────────────────
|
||||
Full end-to-end (wake→speaker) < 1200ms total pipeline SLA
|
||||
|
||||
Tests:
|
||||
1. STT latency: inject synthetic transcript, measure to LLM response
|
||||
2. LLM latency: inject transcript, measure time to ConversationResponse
|
||||
3. TTS latency: inject ConversationResponse, verify audio starts quickly
|
||||
4. E2E latency: orchestrator state transitions (LISTENING→THINKING→SPEAKING)
|
||||
5. Percentile report: print p50/p95/p99 for each stage over N runs
|
||||
|
||||
In CI (no GPU/models): tests use orchestrator state timestamps to infer latency
|
||||
bounds without actually running STT/LLM/TTS.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import threading
|
||||
import unittest
|
||||
from collections import defaultdict
|
||||
from typing import List, Optional
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.qos import QoSProfile
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
|
||||
from saltybot_social_tests.test_helpers import poll_until
|
||||
|
||||
# ── SLA constants (milliseconds) ─────────────────────────────────────────────
|
||||
SLA_STT_P95_MS = 500.0
|
||||
SLA_LLM_TTFT_P95_MS = 500.0
|
||||
SLA_TTS_FIRST_P95_MS = 200.0
|
||||
SLA_E2E_P95_MS = 1200.0
|
||||
|
||||
N_PROBE_RUNS = 5 # number of injection runs for percentile calculation
|
||||
|
||||
|
||||
def _percentile(values: List[float], p: float) -> float:
|
||||
if not values:
|
||||
return 0.0
|
||||
s = sorted(values)
|
||||
idx = int(len(s) * p / 100)
|
||||
return s[min(idx, len(s) - 1)]
|
||||
|
||||
|
||||
class LatencyProbe(Node):
|
||||
"""Injects synthetic messages and measures pipeline response latencies."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__("latency_probe")
|
||||
qos = QoSProfile(depth=10)
|
||||
|
||||
# Injectors
|
||||
try:
|
||||
from saltybot_social_msgs.msg import SpeechTranscript, ConversationResponse
|
||||
self._transcript_pub = self.create_publisher(
|
||||
SpeechTranscript, "/social/speech/transcript", qos
|
||||
)
|
||||
self._response_pub = self.create_publisher(
|
||||
ConversationResponse, "/social/conversation/response", qos
|
||||
)
|
||||
self._has_social = True
|
||||
except ImportError:
|
||||
self._has_social = False
|
||||
|
||||
from std_msgs.msg import String
|
||||
self._state_pub = self.create_publisher(String, "/social/test/inject", qos)
|
||||
|
||||
# Receivers
|
||||
from std_msgs.msg import String
|
||||
self._state_sub = self.create_subscription(
|
||||
String, "/social/orchestrator/state",
|
||||
self._on_state, qos
|
||||
)
|
||||
|
||||
if self._has_social:
|
||||
from saltybot_social_msgs.msg import ConversationResponse
|
||||
self._response_sub = self.create_subscription(
|
||||
ConversationResponse, "/social/conversation/response",
|
||||
self._on_response, qos
|
||||
)
|
||||
|
||||
# Timing data
|
||||
self._inject_t: Optional[float] = None
|
||||
self._response_times: List[float] = []
|
||||
self._state_transitions: List[tuple] = [] # (from, to, elapsed_ms)
|
||||
self._last_state: Optional[str] = None
|
||||
self._last_state_t: float = time.time()
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def _on_state(self, msg) -> None:
|
||||
try:
|
||||
data = json.loads(msg.data)
|
||||
new_state = data.get("state", "")
|
||||
now = time.time()
|
||||
with self._lock:
|
||||
if self._last_state and self._last_state != new_state:
|
||||
elapsed = (now - self._last_state_t) * 1000
|
||||
self._state_transitions.append(
|
||||
(self._last_state, new_state, elapsed)
|
||||
)
|
||||
self._last_state = new_state
|
||||
self._last_state_t = now
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _on_response(self, msg) -> None:
|
||||
with self._lock:
|
||||
if self._inject_t is not None:
|
||||
latency_ms = (time.time() - self._inject_t) * 1000
|
||||
self._response_times.append(latency_ms)
|
||||
self._inject_t = None
|
||||
|
||||
def inject_transcript(self, text: str = "Hello Salty, what time is it?") -> None:
|
||||
"""Publish a synthetic final transcript to trigger LLM."""
|
||||
if not self._has_social:
|
||||
return
|
||||
from saltybot_social_msgs.msg import SpeechTranscript
|
||||
msg = SpeechTranscript()
|
||||
msg.header.stamp = self.get_clock().now().to_msg()
|
||||
msg.text = text
|
||||
msg.speaker_id = "test_user"
|
||||
msg.confidence = 0.99
|
||||
msg.audio_duration = 1.2
|
||||
msg.is_partial = False
|
||||
with self._lock:
|
||||
self._inject_t = time.time()
|
||||
self._transcript_pub.publish(msg)
|
||||
|
||||
def get_response_latencies(self) -> List[float]:
|
||||
with self._lock:
|
||||
return list(self._response_times)
|
||||
|
||||
def get_state_transitions(self) -> List[tuple]:
|
||||
with self._lock:
|
||||
return list(self._state_transitions)
|
||||
|
||||
def clear(self) -> None:
|
||||
with self._lock:
|
||||
self._response_times.clear()
|
||||
self._state_transitions.clear()
|
||||
self._inject_t = None
|
||||
|
||||
|
||||
class TestPipelineLatency(unittest.TestCase):
|
||||
"""End-to-end pipeline latency tests."""
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
if not rclpy.ok():
|
||||
rclpy.init()
|
||||
cls.probe = LatencyProbe()
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
cls.probe.destroy_node()
|
||||
|
||||
def _spin_for(self, duration_s: float) -> None:
|
||||
deadline = time.time() + duration_s
|
||||
while time.time() < deadline:
|
||||
rclpy.spin_once(self.probe, timeout_sec=0.05)
|
||||
|
||||
def test_orchestrator_state_transitions_logged(self):
|
||||
"""Verify orchestrator transitions are captured by the probe."""
|
||||
self.probe.clear()
|
||||
self._spin_for(3.0) # wait for state messages
|
||||
|
||||
# Inject a transcript to trigger IDLE→LISTENING→THINKING
|
||||
if self.probe._has_social:
|
||||
self.probe.inject_transcript()
|
||||
self._spin_for(5.0)
|
||||
|
||||
transitions = self.probe.get_state_transitions()
|
||||
# If the stack is running with conversation_node, we should see transitions.
|
||||
# In CI without LLM, orchestrator stays IDLE — that's OK.
|
||||
print(f"\n[Latency] State transitions captured: {len(transitions)}")
|
||||
for frm, to, ms in transitions:
|
||||
print(f" {frm} → {to}: {ms:.0f}ms")
|
||||
|
||||
def test_llm_response_latency_sla(self):
|
||||
"""LLM must produce first ConversationResponse within 500ms p95."""
|
||||
if os.environ.get("SOCIAL_TEST_FULL", "0") != "1":
|
||||
self.skipTest("Set SOCIAL_TEST_FULL=1 to test with LLM running")
|
||||
if not self.probe._has_social:
|
||||
self.skipTest("saltybot_social_msgs not available")
|
||||
|
||||
self.probe.clear()
|
||||
latencies = []
|
||||
|
||||
for i in range(N_PROBE_RUNS):
|
||||
self.probe.clear()
|
||||
self.probe.inject_transcript(f"Hello, run number {i}")
|
||||
# Wait up to 5s for response
|
||||
deadline = time.time() + 5.0
|
||||
while time.time() < deadline:
|
||||
rclpy.spin_once(self.probe, timeout_sec=0.05)
|
||||
lats = self.probe.get_response_latencies()
|
||||
if lats:
|
||||
latencies.extend(lats)
|
||||
break
|
||||
time.sleep(1.0) # cooldown between runs
|
||||
|
||||
if not latencies:
|
||||
self.fail("No LLM responses received in any of the test runs")
|
||||
|
||||
p50 = _percentile(latencies, 50)
|
||||
p95 = _percentile(latencies, 95)
|
||||
print(f"\n[Latency] LLM response: n={len(latencies)}, "
|
||||
f"p50={p50:.0f}ms, p95={p95:.0f}ms")
|
||||
|
||||
self.assertLess(
|
||||
p95, SLA_LLM_TTFT_P95_MS,
|
||||
f"LLM p95 latency {p95:.0f}ms exceeds SLA {SLA_LLM_TTFT_P95_MS:.0f}ms"
|
||||
)
|
||||
|
||||
def test_e2e_state_machine_latency(self):
|
||||
"""IDLE→THINKING transition must complete within 1200ms p95."""
|
||||
if not self.probe._has_social:
|
||||
self.skipTest("saltybot_social_msgs not available")
|
||||
|
||||
self.probe.clear()
|
||||
thinking_latencies = []
|
||||
|
||||
for _ in range(N_PROBE_RUNS):
|
||||
self.probe.clear()
|
||||
|
||||
# Wait for IDLE state
|
||||
def is_idle():
|
||||
rclpy.spin_once(self.probe, timeout_sec=0.1)
|
||||
return self.probe._last_state == "idle"
|
||||
poll_until(is_idle, timeout_s=5.0)
|
||||
|
||||
# Inject transcript and measure time to THINKING
|
||||
t0 = time.time()
|
||||
self.probe.inject_transcript()
|
||||
|
||||
def is_thinking():
|
||||
rclpy.spin_once(self.probe, timeout_sec=0.1)
|
||||
return self.probe._last_state in ("thinking", "speaking")
|
||||
reached = poll_until(is_thinking, timeout_s=5.0)
|
||||
|
||||
if reached:
|
||||
elapsed_ms = (time.time() - t0) * 1000
|
||||
thinking_latencies.append(elapsed_ms)
|
||||
|
||||
time.sleep(2.0) # allow pipeline to return to IDLE
|
||||
|
||||
if not thinking_latencies:
|
||||
# In CI without full stack, this is expected
|
||||
print("[Latency] No THINKING transitions — LLM/speech may not be running")
|
||||
return
|
||||
|
||||
p95 = _percentile(thinking_latencies, 95)
|
||||
print(f"\n[Latency] Inject→THINKING: n={len(thinking_latencies)}, "
|
||||
f"p95={p95:.0f}ms")
|
||||
|
||||
self.assertLess(
|
||||
p95, SLA_E2E_P95_MS,
|
||||
f"E2E p95 {p95:.0f}ms exceeds SLA {SLA_E2E_P95_MS:.0f}ms"
|
||||
)
|
||||
|
||||
|
||||
class TestLatencyReport(unittest.TestCase):
|
||||
"""Generate a latency report from orchestrator state data."""
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
if not rclpy.ok():
|
||||
rclpy.init()
|
||||
cls.node = rclpy.create_node("latency_report_probe")
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
cls.node.destroy_node()
|
||||
|
||||
def test_orchestrator_reports_latency_stats(self):
|
||||
"""Orchestrator state JSON must include latency field when profiling enabled."""
|
||||
from std_msgs.msg import String
|
||||
received = []
|
||||
qos = QoSProfile(depth=10)
|
||||
sub = self.node.create_subscription(
|
||||
String, "/social/orchestrator/state",
|
||||
lambda msg: received.append(msg.data), qos
|
||||
)
|
||||
|
||||
deadline = time.time() + 5.0
|
||||
while time.time() < deadline and not received:
|
||||
rclpy.spin_once(self.node, timeout_sec=0.1)
|
||||
self.node.destroy_subscription(sub)
|
||||
|
||||
if not received:
|
||||
self.skipTest("Orchestrator not running")
|
||||
|
||||
state = json.loads(received[-1])
|
||||
self.assertIn("latency", state,
|
||||
"Orchestrator state JSON missing 'latency' field")
|
||||
self.assertIn("ts", state)
|
||||
print(f"\n[Latency] Orchestrator report: {json.dumps(state['latency'], indent=2)}")
|
||||
|
||||
def test_latency_sla_table(self):
|
||||
"""Print SLA table for documentation — always passes."""
|
||||
print("\n[Latency SLA Table]")
|
||||
slas = [
|
||||
("wake_word → transcript (STT)", SLA_STT_P95_MS),
|
||||
("transcript → LLM first token", SLA_LLM_TTFT_P95_MS),
|
||||
("LLM token → TTS first chunk", SLA_TTS_FIRST_P95_MS),
|
||||
("end-to-end (wake → speaker)", SLA_E2E_P95_MS),
|
||||
]
|
||||
for stage, sla in slas:
|
||||
print(f" {stage:<40} p95 < {sla:.0f}ms")
|
||||
208
jetson/ros2_ws/src/saltybot_social_tests/test/test_launch.py
Normal file
208
jetson/ros2_ws/src/saltybot_social_tests/test/test_launch.py
Normal file
@ -0,0 +1,208 @@
|
||||
"""test_launch.py — Launch integration test: verify all social-bot nodes start.
|
||||
|
||||
Uses launch_testing framework to:
|
||||
1. Start social_test.launch.py
|
||||
2. Verify all expected nodes appear in the graph within 30s
|
||||
3. Verify no node exits with an error code
|
||||
4. Verify key topics are advertising (even if no messages yet)
|
||||
|
||||
Run with:
|
||||
pytest test_launch.py -v
|
||||
ros2 launch launch_testing launch_test.py <path>/test_launch.py
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import unittest
|
||||
|
||||
import launch
|
||||
import launch_ros
|
||||
import launch_testing
|
||||
import launch_testing.actions
|
||||
import launch_testing.markers
|
||||
import pytest
|
||||
import rclpy
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
from launch import LaunchDescription
|
||||
from launch.actions import IncludeLaunchDescription, TimerAction
|
||||
from launch.launch_description_sources import PythonLaunchDescriptionSource
|
||||
from launch_ros.actions import Node
|
||||
|
||||
import sys
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
|
||||
from saltybot_social_tests.test_helpers import NodeChecker, poll_until
|
||||
|
||||
|
||||
# ── Nodes that must be alive within 30s in default CI mode ───────────────────
|
||||
REQUIRED_NODES = [
|
||||
"orchestrator_node",
|
||||
"face_recognition_node",
|
||||
"tracking_fusion_node",
|
||||
"mock_sensor_pub",
|
||||
]
|
||||
|
||||
# Nodes only required in full mode (SOCIAL_TEST_FULL=1)
|
||||
FULL_MODE_NODES = [
|
||||
"speech_pipeline_node",
|
||||
"conversation_node",
|
||||
"tts_node",
|
||||
]
|
||||
|
||||
NODE_STARTUP_TIMEOUT_S = 30.0
|
||||
|
||||
|
||||
# ── launch_testing fixture ────────────────────────────────────────────────────
|
||||
|
||||
@pytest.mark.launch_test
|
||||
def generate_test_description():
|
||||
"""Return the LaunchDescription to use for the launch test."""
|
||||
tests_pkg = get_package_share_directory("saltybot_social_tests")
|
||||
launch_file = os.path.join(tests_pkg, "launch", "social_test.launch.py")
|
||||
|
||||
return LaunchDescription([
|
||||
IncludeLaunchDescription(
|
||||
PythonLaunchDescriptionSource(launch_file),
|
||||
launch_arguments={
|
||||
"enable_speech": "false",
|
||||
"enable_llm": "false",
|
||||
"enable_tts": "false",
|
||||
}.items(),
|
||||
),
|
||||
# Signal launch_testing that setup is done
|
||||
launch_testing.actions.ReadyToTest(),
|
||||
])
|
||||
|
||||
|
||||
# ── Test cases ────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestSocialBotLaunch(unittest.TestCase):
|
||||
"""Integration tests that run against the launched social-bot stack."""
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
rclpy.init()
|
||||
cls.node = rclpy.create_node("social_launch_test_probe")
|
||||
cls.checker = NodeChecker(cls.node)
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
cls.node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
def _spin_briefly(self, duration_s: float = 0.5) -> None:
|
||||
deadline = time.time() + duration_s
|
||||
while time.time() < deadline:
|
||||
rclpy.spin_once(self.node, timeout_sec=0.1)
|
||||
|
||||
# ── Test 1: All required nodes alive within 30s ───────────────────────────
|
||||
|
||||
def test_required_nodes_start(self):
|
||||
"""All required nodes must appear in the graph within 30 seconds."""
|
||||
results = self.checker.wait_for_nodes(
|
||||
REQUIRED_NODES, timeout_s=NODE_STARTUP_TIMEOUT_S
|
||||
)
|
||||
missing = [n for n, found in results.items() if not found]
|
||||
self.assertEqual(
|
||||
missing, [],
|
||||
f"Nodes did not start within {NODE_STARTUP_TIMEOUT_S}s: {missing}"
|
||||
)
|
||||
|
||||
# ── Test 2: Expected topics are advertised ────────────────────────────────
|
||||
|
||||
def test_expected_topics_advertised(self):
|
||||
"""Key topics must exist in the topic graph."""
|
||||
self._spin_briefly(2.0)
|
||||
|
||||
required_topics = [
|
||||
"/social/orchestrator/state",
|
||||
"/social/faces/detections",
|
||||
"/social/tracking/fused_target",
|
||||
# mock publishers
|
||||
"/camera/color/image_raw",
|
||||
"/uwb/target",
|
||||
]
|
||||
|
||||
all_topics = {name for name, _ in self.node.get_topic_names_and_types()}
|
||||
missing = [t for t in required_topics if t not in all_topics]
|
||||
self.assertEqual(
|
||||
missing, [],
|
||||
f"Topics not advertised: {missing}\nAll topics: {sorted(all_topics)}"
|
||||
)
|
||||
|
||||
# ── Test 3: Orchestrator publishes state ──────────────────────────────────
|
||||
|
||||
def test_orchestrator_state_publishes(self):
|
||||
"""orchestrator_node must publish /social/orchestrator/state within 5s."""
|
||||
from std_msgs.msg import String
|
||||
received = []
|
||||
|
||||
sub = self.node.create_subscription(
|
||||
String, "/social/orchestrator/state",
|
||||
lambda msg: received.append(msg.data), 10
|
||||
)
|
||||
deadline = time.time() + 5.0
|
||||
while time.time() < deadline and not received:
|
||||
rclpy.spin_once(self.node, timeout_sec=0.2)
|
||||
|
||||
self.node.destroy_subscription(sub)
|
||||
self.assertTrue(
|
||||
len(received) > 0,
|
||||
"orchestrator_node did not publish /social/orchestrator/state within 5s"
|
||||
)
|
||||
# Validate JSON structure
|
||||
import json
|
||||
state = json.loads(received[0])
|
||||
self.assertIn("state", state)
|
||||
self.assertIn(state["state"],
|
||||
["idle", "listening", "thinking", "speaking", "throttled"])
|
||||
|
||||
# ── Test 4: Face detection topic advertised at correct QoS ───────────────
|
||||
|
||||
def test_face_detection_topic_qos(self):
|
||||
"""Face detection topic must exist and accept BEST_EFFORT subscribers."""
|
||||
from rclpy.qos import QoSProfile, QoSReliabilityPolicy
|
||||
from saltybot_social_msgs.msg import FaceDetectionArray
|
||||
|
||||
received = []
|
||||
qos = QoSProfile(reliability=QoSReliabilityPolicy.BEST_EFFORT, depth=10)
|
||||
sub = self.node.create_subscription(
|
||||
FaceDetectionArray, "/social/faces/detections",
|
||||
lambda msg: received.append(True), qos
|
||||
)
|
||||
deadline = time.time() + 5.0
|
||||
while time.time() < deadline and not received:
|
||||
rclpy.spin_once(self.node, timeout_sec=0.2)
|
||||
self.node.destroy_subscription(sub)
|
||||
self.assertTrue(
|
||||
len(received) > 0,
|
||||
"/social/faces/detections received no messages within 5s"
|
||||
)
|
||||
|
||||
# ── Test 5: No node exits with error immediately ──────────────────────────
|
||||
|
||||
def test_no_immediate_node_exits(self):
|
||||
"""All required nodes should still be alive after 5s (no instant crash)."""
|
||||
time.sleep(2.0)
|
||||
self._spin_briefly(1.0)
|
||||
results = self.checker.wait_for_nodes(REQUIRED_NODES, timeout_s=2.0)
|
||||
crashed = [n for n, found in results.items() if not found]
|
||||
self.assertEqual(
|
||||
crashed, [],
|
||||
f"Nodes crashed or exited: {crashed}"
|
||||
)
|
||||
|
||||
|
||||
# ── Post-shutdown checks (run after the launch is torn down) ──────────────────
|
||||
|
||||
@launch_testing.post_shutdown_test()
|
||||
class TestSocialBotShutdownBehavior(unittest.TestCase):
|
||||
"""Tests that run after the launch has been shut down."""
|
||||
|
||||
def test_processes_exited_cleanly(self, proc_info):
|
||||
"""All launched processes must exit with code 0 or SIGTERM (-15)."""
|
||||
# Allow SIGTERM (returncode -15) as a graceful shutdown
|
||||
launch_testing.asserts.assertExitCodes(
|
||||
proc_info,
|
||||
allowable_exit_codes=[0, -15, launch_testing.asserts.EXIT_OK],
|
||||
)
|
||||
423
jetson/ros2_ws/src/saltybot_social_tests/test/test_services.py
Normal file
423
jetson/ros2_ws/src/saltybot_social_tests/test/test_services.py
Normal file
@ -0,0 +1,423 @@
|
||||
"""test_services.py — Verify social-bot ROS2 services are available and respond.
|
||||
|
||||
Services tested:
|
||||
/social/enroll (saltybot_social_msgs/EnrollPerson)
|
||||
/social/persons/list (saltybot_social_msgs/ListPersons)
|
||||
/social/persons/delete (saltybot_social_msgs/DeletePerson)
|
||||
/social/persons/update (saltybot_social_msgs/UpdatePerson)
|
||||
/social/query_mood (saltybot_social_msgs/QueryMood)
|
||||
/social/nav/set_mode (std_srvs/SetString or saltybot_social_msgs/SetNavMode)
|
||||
|
||||
Each test:
|
||||
1. Verifies the service is available (wait_for_service <= 10s)
|
||||
2. Calls the service with a valid request
|
||||
3. Asserts response is non-None and contains expected fields
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import unittest
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
|
||||
from saltybot_social_tests.test_helpers import ServiceChecker
|
||||
|
||||
SERVICE_AVAILABILITY_TIMEOUT_S = 10.0
|
||||
|
||||
|
||||
class TestSocialServices(unittest.TestCase):
|
||||
"""Integration tests for social-bot ROS2 service interface."""
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
if not rclpy.ok():
|
||||
rclpy.init()
|
||||
cls.node = rclpy.create_node("service_test_probe")
|
||||
cls.checker = ServiceChecker(cls.node)
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
cls.node.destroy_node()
|
||||
|
||||
# ── Enroll service ─────────────────────────────────────────────────────────
|
||||
|
||||
def test_enroll_service_available(self):
|
||||
"""/ social/enroll must be available within 10s."""
|
||||
try:
|
||||
from saltybot_social_msgs.srv import EnrollPerson
|
||||
except ImportError:
|
||||
self.skipTest("saltybot_social_msgs not installed")
|
||||
|
||||
available = self.checker.is_available(
|
||||
"/social/enroll", EnrollPerson,
|
||||
timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
|
||||
)
|
||||
self.assertTrue(available, "/social/enroll service not available")
|
||||
|
||||
def test_enroll_service_responds(self):
|
||||
"""EnrollPerson service must respond with success or graceful failure."""
|
||||
try:
|
||||
from saltybot_social_msgs.srv import EnrollPerson
|
||||
except ImportError:
|
||||
self.skipTest("saltybot_social_msgs not installed")
|
||||
|
||||
req = EnrollPerson.Request()
|
||||
req.name = "TestPerson_CI"
|
||||
req.n_samples = 1 # minimal for CI
|
||||
req.mode = "face"
|
||||
|
||||
response = self.checker.call_service(
|
||||
"/social/enroll", EnrollPerson, req,
|
||||
timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
|
||||
)
|
||||
self.assertIsNotNone(response, "/social/enroll did not respond")
|
||||
# Response can be success=False (no camera data) — that's fine
|
||||
# We just need a response, not success
|
||||
self.assertTrue(
|
||||
hasattr(response, "success"),
|
||||
"EnrollPerson response missing 'success' field"
|
||||
)
|
||||
|
||||
# ── List persons service ───────────────────────────────────────────────────
|
||||
|
||||
def test_list_persons_service_available(self):
|
||||
"""/social/persons/list must be available within 10s."""
|
||||
try:
|
||||
from saltybot_social_msgs.srv import ListPersons
|
||||
except ImportError:
|
||||
self.skipTest("saltybot_social_msgs not installed")
|
||||
|
||||
available = self.checker.is_available(
|
||||
"/social/persons/list", ListPersons,
|
||||
timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
|
||||
)
|
||||
self.assertTrue(available, "/social/persons/list service not available")
|
||||
|
||||
def test_list_persons_service_responds(self):
|
||||
"""/social/persons/list must return a list (possibly empty)."""
|
||||
try:
|
||||
from saltybot_social_msgs.srv import ListPersons
|
||||
except ImportError:
|
||||
self.skipTest("saltybot_social_msgs not installed")
|
||||
|
||||
req = ListPersons.Request()
|
||||
response = self.checker.call_service(
|
||||
"/social/persons/list", ListPersons, req,
|
||||
timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
|
||||
)
|
||||
self.assertIsNotNone(response, "/social/persons/list did not respond")
|
||||
self.assertTrue(
|
||||
hasattr(response, "persons"),
|
||||
"ListPersons response missing 'persons' field"
|
||||
)
|
||||
self.assertIsInstance(response.persons, list)
|
||||
|
||||
# ── Delete person service ─────────────────────────────────────────────────
|
||||
|
||||
def test_delete_person_service_available(self):
|
||||
"""/social/persons/delete must be available."""
|
||||
try:
|
||||
from saltybot_social_msgs.srv import DeletePerson
|
||||
except ImportError:
|
||||
self.skipTest("saltybot_social_msgs not installed")
|
||||
|
||||
available = self.checker.is_available(
|
||||
"/social/persons/delete", DeletePerson,
|
||||
timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
|
||||
)
|
||||
self.assertTrue(available, "/social/persons/delete not available")
|
||||
|
||||
def test_delete_nonexistent_person_returns_graceful_error(self):
|
||||
"""Deleting a non-existent person must not crash — graceful False."""
|
||||
try:
|
||||
from saltybot_social_msgs.srv import DeletePerson
|
||||
except ImportError:
|
||||
self.skipTest("saltybot_social_msgs not installed")
|
||||
|
||||
req = DeletePerson.Request()
|
||||
req.person_id = 99999 # definitely doesn't exist
|
||||
|
||||
response = self.checker.call_service(
|
||||
"/social/persons/delete", DeletePerson, req,
|
||||
timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
|
||||
)
|
||||
self.assertIsNotNone(response)
|
||||
# Should return success=False, not crash
|
||||
self.assertFalse(
|
||||
response.success,
|
||||
"Deleting non-existent person should return success=False"
|
||||
)
|
||||
|
||||
# ── Update person service ─────────────────────────────────────────────────
|
||||
|
||||
def test_update_person_service_available(self):
|
||||
"""/social/persons/update must be available."""
|
||||
try:
|
||||
from saltybot_social_msgs.srv import UpdatePerson
|
||||
except ImportError:
|
||||
self.skipTest("saltybot_social_msgs not installed")
|
||||
|
||||
available = self.checker.is_available(
|
||||
"/social/persons/update", UpdatePerson,
|
||||
timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
|
||||
)
|
||||
self.assertTrue(available, "/social/persons/update not available")
|
||||
|
||||
# ── Mood query service ────────────────────────────────────────────────────
|
||||
|
||||
def test_query_mood_service_available(self):
|
||||
"""/social/query_mood must be available (personality_node)."""
|
||||
try:
|
||||
from saltybot_social_msgs.srv import QueryMood
|
||||
except ImportError:
|
||||
self.skipTest("saltybot_social_msgs not installed")
|
||||
|
||||
available = self.checker.is_available(
|
||||
"/social/query_mood", QueryMood,
|
||||
timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
|
||||
)
|
||||
self.assertTrue(available, "/social/query_mood not available")
|
||||
|
||||
def test_query_mood_returns_valid_state(self):
|
||||
"""QueryMood must return a valid mood with valence in [-1, 1]."""
|
||||
try:
|
||||
from saltybot_social_msgs.srv import QueryMood
|
||||
except ImportError:
|
||||
self.skipTest("saltybot_social_msgs not installed")
|
||||
|
||||
req = QueryMood.Request()
|
||||
response = self.checker.call_service(
|
||||
"/social/query_mood", QueryMood, req,
|
||||
timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
|
||||
)
|
||||
self.assertIsNotNone(response, "/social/query_mood did not respond")
|
||||
if hasattr(response, "mood"):
|
||||
mood = response.mood
|
||||
if hasattr(mood, "valence"):
|
||||
self.assertGreaterEqual(mood.valence, -1.0)
|
||||
self.assertLessEqual(mood.valence, 1.0)
|
||||
|
||||
|
||||
# ── Nav set-mode service ──────────────────────────────────────────────────────
|
||||
|
||||
class TestNavSetModeService(unittest.TestCase):
|
||||
"""/social/nav/set_mode — change follow mode at runtime."""
|
||||
|
||||
# Valid follow modes understood by social_nav_node
|
||||
VALID_MODES = ["shadow", "lead", "side", "orbit", "loose", "tight", "stop"]
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
if not rclpy.ok():
|
||||
rclpy.init()
|
||||
cls.node = rclpy.create_node("nav_set_mode_test_probe")
|
||||
cls.checker = ServiceChecker(cls.node)
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
cls.node.destroy_node()
|
||||
|
||||
def _try_import_set_mode_srv(self):
|
||||
"""Return the SetNavMode srv type, falling back to std_srvs."""
|
||||
try:
|
||||
# Custom service (future): saltybot_social_msgs/srv/SetNavMode
|
||||
from saltybot_social_msgs.srv import SetNavMode
|
||||
return SetNavMode, "custom"
|
||||
except ImportError:
|
||||
pass
|
||||
try:
|
||||
# Generic string service fallback (std_srvs doesn't have SetString
|
||||
# in Humble; use rcl_interfaces/srv/SetParameters as canary)
|
||||
from std_srvs.srv import Trigger
|
||||
return Trigger, "trigger"
|
||||
except ImportError:
|
||||
return None, None
|
||||
|
||||
def test_nav_set_mode_service_available(self):
|
||||
"""/social/nav/set_mode must be available within 10s."""
|
||||
srv_type, variant = self._try_import_set_mode_srv()
|
||||
if srv_type is None:
|
||||
self.skipTest("No suitable srv type found for /social/nav/set_mode")
|
||||
|
||||
available = self.checker.is_available(
|
||||
"/social/nav/set_mode", srv_type,
|
||||
timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
|
||||
)
|
||||
self.assertTrue(
|
||||
available,
|
||||
"/social/nav/set_mode service not available within "
|
||||
f"{SERVICE_AVAILABILITY_TIMEOUT_S}s"
|
||||
)
|
||||
|
||||
def test_nav_set_mode_shadow_responds(self):
|
||||
"""Setting mode 'shadow' must return a response (not crash)."""
|
||||
srv_type, variant = self._try_import_set_mode_srv()
|
||||
if srv_type is None:
|
||||
self.skipTest("No suitable srv type for /social/nav/set_mode")
|
||||
|
||||
if variant == "custom":
|
||||
req = srv_type.Request()
|
||||
req.mode = "shadow"
|
||||
else:
|
||||
req = srv_type.Request() # Trigger: no payload
|
||||
|
||||
response = self.checker.call_service(
|
||||
"/social/nav/set_mode", srv_type, req,
|
||||
timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
|
||||
)
|
||||
self.assertIsNotNone(
|
||||
response, "/social/nav/set_mode did not respond to 'shadow' mode"
|
||||
)
|
||||
if variant == "custom":
|
||||
self.assertTrue(
|
||||
hasattr(response, "success"),
|
||||
"SetNavMode response missing 'success' field"
|
||||
)
|
||||
|
||||
def test_nav_set_mode_invalid_mode_returns_graceful_error(self):
|
||||
"""Sending an invalid mode string must not crash the node."""
|
||||
srv_type, variant = self._try_import_set_mode_srv()
|
||||
if srv_type is None or variant != "custom":
|
||||
self.skipTest("Custom SetNavMode srv required for invalid-mode test")
|
||||
|
||||
req = srv_type.Request()
|
||||
req.mode = "teleport_to_moon" # definitely invalid
|
||||
|
||||
response = self.checker.call_service(
|
||||
"/social/nav/set_mode", srv_type, req,
|
||||
timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
|
||||
)
|
||||
self.assertIsNotNone(
|
||||
response, "/social/nav/set_mode crashed on invalid mode"
|
||||
)
|
||||
# Must return success=False for unknown mode, not crash
|
||||
self.assertFalse(
|
||||
response.success,
|
||||
"Invalid mode 'teleport_to_moon' should return success=False"
|
||||
)
|
||||
|
||||
def test_nav_set_mode_cycles_all_valid_modes(self):
|
||||
"""Cycle through all valid modes and verify each returns a response."""
|
||||
srv_type, variant = self._try_import_set_mode_srv()
|
||||
if srv_type is None or variant != "custom":
|
||||
self.skipTest("Custom SetNavMode srv required for mode cycling test")
|
||||
|
||||
for mode in self.VALID_MODES:
|
||||
req = srv_type.Request()
|
||||
req.mode = mode
|
||||
response = self.checker.call_service(
|
||||
"/social/nav/set_mode", srv_type, req,
|
||||
timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
|
||||
)
|
||||
self.assertIsNotNone(
|
||||
response, f"/social/nav/set_mode did not respond to mode '{mode}'"
|
||||
)
|
||||
|
||||
def test_nav_mode_topic_reflects_service_call(self):
|
||||
"""/social/nav/mode topic must reflect the mode set via the service."""
|
||||
from std_msgs.msg import String
|
||||
srv_type, variant = self._try_import_set_mode_srv()
|
||||
if srv_type is None or variant != "custom":
|
||||
self.skipTest("Custom SetNavMode srv required for mode-reflect test")
|
||||
|
||||
# Subscribe to mode topic
|
||||
received_modes = []
|
||||
sub = self.node.create_subscription(
|
||||
String, "/social/nav/mode",
|
||||
lambda msg: received_modes.append(msg.data), 10
|
||||
)
|
||||
|
||||
# Request LEAD mode
|
||||
req = srv_type.Request()
|
||||
req.mode = "lead"
|
||||
response = self.checker.call_service(
|
||||
"/social/nav/set_mode", srv_type, req,
|
||||
timeout_s=SERVICE_AVAILABILITY_TIMEOUT_S
|
||||
)
|
||||
if response is None or not response.success:
|
||||
self.node.destroy_subscription(sub)
|
||||
self.skipTest("/social/nav/set_mode did not accept 'lead' mode")
|
||||
|
||||
# Wait for mode topic to update
|
||||
import rclpy as _rclpy
|
||||
deadline = time.time() + 3.0
|
||||
while time.time() < deadline and "lead" not in received_modes:
|
||||
_rclpy.spin_once(self.node, timeout_sec=0.1)
|
||||
self.node.destroy_subscription(sub)
|
||||
|
||||
self.assertIn(
|
||||
"lead", received_modes,
|
||||
"/social/nav/mode did not reflect 'lead' after service call"
|
||||
)
|
||||
|
||||
|
||||
class TestServiceResponseTimes(unittest.TestCase):
|
||||
"""Verify services respond within acceptable latency."""
|
||||
|
||||
SLA_MS = 500.0 # services must respond within 500ms
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
if not rclpy.ok():
|
||||
rclpy.init()
|
||||
cls.node = rclpy.create_node("service_latency_probe")
|
||||
cls.checker = ServiceChecker(cls.node)
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
cls.node.destroy_node()
|
||||
|
||||
def _measure_service_latency(self, svc_name: str, srv_type, request) -> float:
|
||||
"""Return response time in ms, or -1 on failure."""
|
||||
client = self.node.create_client(srv_type, svc_name)
|
||||
if not client.wait_for_service(timeout_sec=10.0):
|
||||
self.node.destroy_client(client)
|
||||
return -1.0
|
||||
t0 = time.perf_counter()
|
||||
future = client.call_async(request)
|
||||
deadline = time.time() + 5.0
|
||||
while not future.done() and time.time() < deadline:
|
||||
rclpy.spin_once(self.node, timeout_sec=0.05)
|
||||
self.node.destroy_client(client)
|
||||
if future.done():
|
||||
return (time.perf_counter() - t0) * 1000
|
||||
return -1.0
|
||||
|
||||
def test_list_persons_response_time(self):
|
||||
"""ListPersons must respond within 500ms."""
|
||||
try:
|
||||
from saltybot_social_msgs.srv import ListPersons
|
||||
except ImportError:
|
||||
self.skipTest("saltybot_social_msgs not installed")
|
||||
|
||||
latency_ms = self._measure_service_latency(
|
||||
"/social/persons/list", ListPersons, ListPersons.Request()
|
||||
)
|
||||
self.assertGreater(latency_ms, 0,
|
||||
"/social/persons/list did not respond")
|
||||
self.assertLess(
|
||||
latency_ms, self.SLA_MS,
|
||||
f"/social/persons/list response time {latency_ms:.0f}ms > {self.SLA_MS}ms SLA"
|
||||
)
|
||||
|
||||
def test_query_mood_response_time(self):
|
||||
"""QueryMood must respond within 500ms."""
|
||||
try:
|
||||
from saltybot_social_msgs.srv import QueryMood
|
||||
except ImportError:
|
||||
self.skipTest("saltybot_social_msgs not installed")
|
||||
|
||||
latency_ms = self._measure_service_latency(
|
||||
"/social/query_mood", QueryMood, QueryMood.Request()
|
||||
)
|
||||
self.assertGreater(latency_ms, 0,
|
||||
"/social/query_mood did not respond")
|
||||
self.assertLess(
|
||||
latency_ms, self.SLA_MS,
|
||||
f"/social/query_mood response time {latency_ms:.0f}ms > {self.SLA_MS}ms SLA"
|
||||
)
|
||||
227
jetson/ros2_ws/src/saltybot_social_tests/test/test_shutdown.py
Normal file
227
jetson/ros2_ws/src/saltybot_social_tests/test/test_shutdown.py
Normal file
@ -0,0 +1,227 @@
|
||||
"""test_shutdown.py — Graceful shutdown verification for social-bot stack.
|
||||
|
||||
Tests:
|
||||
1. Clean shutdown: all nodes terminate within 10s of SIGTERM
|
||||
2. No zombie processes remain after shutdown
|
||||
3. GPU memory returns to near-baseline after shutdown
|
||||
4. No file descriptor leaks (via /proc on Linux)
|
||||
5. No orphaned rosbag / pycuda / audio processes
|
||||
6. PyAudio streams properly closed (no held /dev/snd device)
|
||||
|
||||
These tests are designed to run AFTER the stack has been shut down.
|
||||
They can also be run standalone to verify post-shutdown state.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
import unittest
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
|
||||
from saltybot_social_tests.test_helpers import (
|
||||
GpuMemoryChecker, count_zombie_processes, get_process_pids
|
||||
)
|
||||
|
||||
# ── Shutdown timing constants ─────────────────────────────────────────────────
|
||||
MAX_SHUTDOWN_S = 10.0 # nodes must stop within 10s of receiving SIGTERM
|
||||
BASELINE_DRIFT_MB = 300.0 # GPU memory allowed to drift above pre-launch baseline
|
||||
|
||||
|
||||
class TestGracefulShutdown(unittest.TestCase):
|
||||
"""Verify clean shutdown with no zombies or resource leaks."""
|
||||
|
||||
# ── Test 1: No zombie processes ────────────────────────────────────────────
|
||||
|
||||
def test_no_zombie_ros_processes(self):
|
||||
"""No zombie (defunct) ROS2 processes after stack shutdown."""
|
||||
count = count_zombie_processes("ros")
|
||||
self.assertEqual(
|
||||
count, 0,
|
||||
f"Found {count} zombie ROS2 processes after shutdown"
|
||||
)
|
||||
|
||||
def test_no_zombie_python_processes(self):
|
||||
"""No zombie Python processes (from node children) after shutdown."""
|
||||
count = count_zombie_processes("python")
|
||||
self.assertEqual(
|
||||
count, 0,
|
||||
f"Found {count} zombie Python processes after shutdown"
|
||||
)
|
||||
|
||||
# ── Test 2: All social-bot nodes terminated ───────────────────────────────
|
||||
|
||||
def test_orchestrator_node_terminated(self):
|
||||
"""orchestrator_node must not have any live PIDs after shutdown."""
|
||||
pids = get_process_pids("orchestrator_node")
|
||||
self.assertEqual(
|
||||
pids, [],
|
||||
f"orchestrator_node still running with PIDs: {pids}"
|
||||
)
|
||||
|
||||
def test_speech_pipeline_node_terminated(self):
|
||||
"""speech_pipeline_node must not have any live PIDs after shutdown."""
|
||||
pids = get_process_pids("speech_pipeline_node")
|
||||
self.assertEqual(
|
||||
pids, [],
|
||||
f"speech_pipeline_node still running: {pids}"
|
||||
)
|
||||
|
||||
def test_face_recognition_node_terminated(self):
|
||||
"""face_recognition_node must not have any live PIDs after shutdown."""
|
||||
pids = get_process_pids("face_recognition_node")
|
||||
self.assertEqual(
|
||||
pids, [],
|
||||
f"face_recognition_node still running: {pids}"
|
||||
)
|
||||
|
||||
def test_conversation_node_terminated(self):
|
||||
pids = get_process_pids("conversation_node")
|
||||
self.assertEqual(pids, [],
|
||||
f"conversation_node still running: {pids}")
|
||||
|
||||
def test_tts_node_terminated(self):
|
||||
pids = get_process_pids("tts_node")
|
||||
self.assertEqual(pids, [],
|
||||
f"tts_node still running: {pids}")
|
||||
|
||||
# ── Test 3: GPU memory released ───────────────────────────────────────────
|
||||
|
||||
def test_gpu_memory_released_after_shutdown(self):
|
||||
"""GPU used memory must not be significantly above baseline."""
|
||||
if not GpuMemoryChecker.gpu_available():
|
||||
self.skipTest("No GPU available")
|
||||
if os.environ.get("GPU_BASELINE_MB", "") == "":
|
||||
self.skipTest("Set GPU_BASELINE_MB env var to enable this test")
|
||||
|
||||
baseline_mb = float(os.environ["GPU_BASELINE_MB"])
|
||||
current_used = GpuMemoryChecker.used_mb()
|
||||
drift = current_used - baseline_mb
|
||||
print(f"[Shutdown] GPU: baseline={baseline_mb:.0f}MB, "
|
||||
f"now={current_used:.0f}MB, drift={drift:+.0f}MB")
|
||||
|
||||
self.assertLess(
|
||||
drift, BASELINE_DRIFT_MB,
|
||||
f"GPU memory {drift:+.0f}MB above baseline after shutdown — possible leak"
|
||||
)
|
||||
|
||||
# ── Test 4: Audio device released ─────────────────────────────────────────
|
||||
|
||||
def test_audio_device_released(self):
|
||||
"""No process should hold /dev/snd after shutdown (Linux only)."""
|
||||
if not os.path.exists("/dev/snd"):
|
||||
self.skipTest("/dev/snd not present (not Linux or no sound card)")
|
||||
|
||||
try:
|
||||
# fuser returns exit code 0 if device is in use
|
||||
result = subprocess.run(
|
||||
["fuser", "/dev/snd"],
|
||||
capture_output=True, timeout=5
|
||||
)
|
||||
holders = result.stdout.decode().strip()
|
||||
if holders:
|
||||
# Check if any of those PIDs are our social-bot nodes
|
||||
social_patterns = [
|
||||
"speech_pipeline", "tts_node", "mock_sensor", "pyaudio"
|
||||
]
|
||||
held_by_social = False
|
||||
for pid_str in holders.split():
|
||||
try:
|
||||
pid = int(pid_str)
|
||||
cmdline = open(f"/proc/{pid}/cmdline").read()
|
||||
if any(p in cmdline for p in social_patterns):
|
||||
held_by_social = True
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
self.assertFalse(
|
||||
held_by_social,
|
||||
f"Audio device /dev/snd held by social-bot process after shutdown: {holders}"
|
||||
)
|
||||
except FileNotFoundError:
|
||||
self.skipTest("fuser command not found")
|
||||
|
||||
# ── Test 5: No orphaned subprocesses ──────────────────────────────────────
|
||||
|
||||
def test_no_orphaned_pycuda_processes(self):
|
||||
"""No orphaned pycuda/CUDA processes left from TRT inference."""
|
||||
# Check for any lingering nvidia-cuda-mps processes
|
||||
pids = get_process_pids("nvidia-cuda-mps")
|
||||
# MPS is OK if it was running before — just check for spikes
|
||||
# In practice, just verify no new cuda-related orphans
|
||||
pass # informational check only
|
||||
|
||||
def test_ros_daemon_still_healthy(self):
|
||||
"""ros2 daemon should still be healthy after stack shutdown."""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["ros2", "daemon", "status"],
|
||||
capture_output=True, timeout=10, text=True
|
||||
)
|
||||
# Should output something about the daemon status
|
||||
# A crashed daemon would affect subsequent tests
|
||||
self.assertNotIn("error", result.stdout.lower(),
|
||||
f"ros2 daemon unhealthy: {result.stdout}")
|
||||
except FileNotFoundError:
|
||||
self.skipTest("ros2 command not found — not in ROS environment")
|
||||
except subprocess.TimeoutExpired:
|
||||
self.fail("ros2 daemon status timed out")
|
||||
|
||||
|
||||
class TestShutdownTiming(unittest.TestCase):
|
||||
"""Test that nodes respond to SIGTERM within the shutdown window."""
|
||||
|
||||
def test_shutdown_timeout_constant_is_reasonable(self):
|
||||
"""Shutdown timeout must be >= 5s (some nodes need time to flush)."""
|
||||
self.assertGreaterEqual(MAX_SHUTDOWN_S, 5.0)
|
||||
self.assertLessEqual(MAX_SHUTDOWN_S, 30.0,
|
||||
"Shutdown timeout > 30s is too long for CI")
|
||||
|
||||
def test_no_ros_nodes_running_at_test_start(self):
|
||||
"""Verify we're running in isolation (no leftover nodes from prev run)."""
|
||||
social_nodes = [
|
||||
"orchestrator_node", "face_recognition_node",
|
||||
"speech_pipeline_node", "conversation_node", "tts_node",
|
||||
"tracking_fusion_node", "social_nav_node",
|
||||
]
|
||||
already_running = []
|
||||
for node_name in social_nodes:
|
||||
pids = get_process_pids(node_name)
|
||||
if pids:
|
||||
already_running.append((node_name, pids))
|
||||
|
||||
if already_running:
|
||||
# Warn but don't fail — stack may be legitimately running
|
||||
print(f"\n[WARNING] Nodes still running from previous session: "
|
||||
f"{already_running}")
|
||||
|
||||
|
||||
class TestPostShutdownPure(unittest.TestCase):
|
||||
"""Pure-Python post-shutdown checks (no ROS2 required)."""
|
||||
|
||||
def test_gpu_budget_constants_valid(self):
|
||||
"""MAX_SHUTDOWN_S and BASELINE_DRIFT_MB must be positive."""
|
||||
self.assertGreater(MAX_SHUTDOWN_S, 0)
|
||||
self.assertGreater(BASELINE_DRIFT_MB, 0)
|
||||
|
||||
def test_zombie_counter_returns_int(self):
|
||||
"""count_zombie_processes must return a non-negative int."""
|
||||
result = count_zombie_processes("nonexistent_process_xyz")
|
||||
self.assertIsInstance(result, int)
|
||||
self.assertGreaterEqual(result, 0)
|
||||
|
||||
def test_pid_list_returns_list(self):
|
||||
"""get_process_pids must return a list."""
|
||||
result = get_process_pids("nonexistent_process_xyz")
|
||||
self.assertIsInstance(result, list)
|
||||
|
||||
def test_gpu_checker_no_gpu_returns_none(self):
|
||||
"""GpuMemoryChecker returns None gracefully when no GPU."""
|
||||
# This test always passes — we just verify no exception is raised
|
||||
# (On machines with GPU, this returns a value; without GPU, returns None)
|
||||
result = GpuMemoryChecker.free_mb()
|
||||
if result is not None:
|
||||
self.assertGreater(result, 0)
|
||||
@ -0,0 +1,174 @@
|
||||
"""test_topic_rates.py — Verify social-bot topics publish at expected rates.
|
||||
|
||||
Tests measure actual Hz over a 3-second window and assert:
|
||||
/social/faces/detections >= 10 Hz (face detector, camera@15Hz input)
|
||||
/social/tracking/fused_target >= 8 Hz (tracking fusion)
|
||||
/social/orchestrator/state >= 1 Hz (orchestrator heartbeat)
|
||||
/camera/color/image_raw >= 12 Hz (mock sensor publisher)
|
||||
/uwb/target >= 8 Hz (mock UWB)
|
||||
/social/persons >= 3 Hz (person state tracker)
|
||||
|
||||
Run as part of a live stack (requires social_test.launch.py to be running):
|
||||
pytest test_topic_rates.py -v --stack-running
|
||||
|
||||
Or run standalone with mock sensor pub only for camera/uwb tests.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import unittest
|
||||
|
||||
import pytest
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
|
||||
from saltybot_social_tests.test_helpers import TopicRateChecker, poll_until
|
||||
|
||||
|
||||
# ── Rate requirements ─────────────────────────────────────────────────────────
|
||||
# (topic, msg_type_path, min_hz, warm_up_s)
|
||||
RATE_SPECS = [
|
||||
# Mock-published topics — should always pass
|
||||
("camera_image", "/camera/color/image_raw", "sensor_msgs.msg.Image", 12.0, 2.0),
|
||||
("uwb_target", "/uwb/target", "geometry_msgs.msg.PoseStamped", 8.0, 2.0),
|
||||
# Social-bot topics
|
||||
("faces_detections", "/social/faces/detections", "saltybot_social_msgs.msg.FaceDetectionArray", 10.0, 5.0),
|
||||
("fused_target", "/social/tracking/fused_target", "saltybot_social_msgs.msg.FusedTarget", 8.0, 5.0),
|
||||
("orchestrator_state", "/social/orchestrator/state", "std_msgs.msg.String", 1.0, 3.0),
|
||||
("persons", "/social/persons", "saltybot_social_msgs.msg.PersonStateArray", 3.0, 5.0),
|
||||
]
|
||||
|
||||
MEASUREMENT_WINDOW_S = 3.0
|
||||
|
||||
|
||||
def _import_msg_type(path: str):
|
||||
"""Dynamically import a message class from dotted path like 'std_msgs.msg.String'."""
|
||||
parts = path.rsplit(".", 1)
|
||||
module = __import__(parts[0], fromlist=[parts[1]])
|
||||
return getattr(module, parts[1])
|
||||
|
||||
|
||||
class TestTopicRates(unittest.TestCase):
|
||||
"""Verify all social-bot topics publish at ≥ minimum required rate."""
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
rclpy.init()
|
||||
cls.node = rclpy.create_node("topic_rate_test_probe")
|
||||
cls.checkers: dict = {}
|
||||
|
||||
# Create all rate checkers
|
||||
for spec_id, topic, msg_path, min_hz, warm_up in RATE_SPECS:
|
||||
try:
|
||||
msg_type = _import_msg_type(msg_path)
|
||||
cls.checkers[spec_id] = TopicRateChecker(
|
||||
cls.node, topic, msg_type, window_s=MEASUREMENT_WINDOW_S
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"[WARN] Could not create checker for {spec_id}: {e}")
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
cls.node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
def _spin_for(self, duration_s: float) -> None:
|
||||
deadline = time.time() + duration_s
|
||||
while time.time() < deadline:
|
||||
rclpy.spin_once(self.node, timeout_sec=0.05)
|
||||
|
||||
def _check_rate(self, spec_id: str, min_hz: float, warm_up_s: float) -> None:
|
||||
"""Generic rate check: warm up, measure, assert."""
|
||||
checker = self.checkers.get(spec_id)
|
||||
if checker is None:
|
||||
self.skipTest(f"Checker for {spec_id} not available (missing msg type)")
|
||||
|
||||
# Warm-up period
|
||||
self._spin_for(warm_up_s)
|
||||
|
||||
# Measurement window
|
||||
self._spin_for(MEASUREMENT_WINDOW_S + 0.5)
|
||||
|
||||
hz = checker.measured_hz()
|
||||
count = checker.message_count()
|
||||
|
||||
self.assertGreater(
|
||||
hz, min_hz * 0.7, # 30% tolerance for scheduling jitter
|
||||
f"{spec_id}: measured {hz:.2f} Hz < {min_hz:.1f} Hz minimum "
|
||||
f"(received {count} messages total)"
|
||||
)
|
||||
|
||||
# ── Individual test methods (one per topic) ────────────────────────────────
|
||||
|
||||
def test_camera_image_rate(self):
|
||||
"""Mock camera must publish at ≥12Hz."""
|
||||
self._check_rate("camera_image", 12.0, warm_up_s=2.0)
|
||||
|
||||
def test_uwb_target_rate(self):
|
||||
"""Mock UWB must publish at ≥8Hz."""
|
||||
self._check_rate("uwb_target", 8.0, warm_up_s=2.0)
|
||||
|
||||
def test_face_detection_rate(self):
|
||||
"""Face detection must publish at ≥10Hz given 15Hz camera input."""
|
||||
self._check_rate("faces_detections", 10.0, warm_up_s=5.0)
|
||||
|
||||
def test_tracking_fusion_rate(self):
|
||||
"""Tracking fusion must publish fused_target at ≥8Hz."""
|
||||
self._check_rate("fused_target", 8.0, warm_up_s=5.0)
|
||||
|
||||
def test_orchestrator_state_rate(self):
|
||||
"""Orchestrator must publish state at ≥1Hz (config: 2Hz)."""
|
||||
self._check_rate("orchestrator_state", 1.0, warm_up_s=3.0)
|
||||
|
||||
def test_persons_rate(self):
|
||||
"""Person state tracker must publish at ≥3Hz."""
|
||||
self._check_rate("persons", 3.0, warm_up_s=5.0)
|
||||
|
||||
|
||||
class TestTopicPresence(unittest.TestCase):
|
||||
"""Verify all expected topics exist in the topic graph (not rate)."""
|
||||
|
||||
EXPECTED_TOPICS = [
|
||||
"/social/orchestrator/state",
|
||||
"/social/faces/detections",
|
||||
"/social/faces/embeddings",
|
||||
"/social/tracking/fused_target",
|
||||
"/social/persons",
|
||||
"/social/speech/vad_state",
|
||||
"/social/speech/transcript",
|
||||
"/social/conversation/response",
|
||||
"/social/nav/mode",
|
||||
"/camera/color/image_raw",
|
||||
"/uwb/target",
|
||||
"/cmd_vel",
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
if not rclpy.ok():
|
||||
rclpy.init()
|
||||
cls.node = rclpy.create_node("topic_presence_test_probe")
|
||||
# Wait briefly for topics to register
|
||||
deadline = time.time() + 10.0
|
||||
while time.time() < deadline:
|
||||
rclpy.spin_once(cls.node, timeout_sec=0.2)
|
||||
known = {n for n, _ in cls.node.get_topic_names_and_types()}
|
||||
if "/social/orchestrator/state" in known:
|
||||
break
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
cls.node.destroy_node()
|
||||
|
||||
def test_all_expected_topics_advertised(self):
|
||||
"""Every expected topic must appear in the topic graph."""
|
||||
known = {n for n, _ in self.node.get_topic_names_and_types()}
|
||||
missing = [t for t in self.EXPECTED_TOPICS if t not in known]
|
||||
self.assertEqual(
|
||||
missing, [],
|
||||
f"Topics not advertised: {missing}"
|
||||
)
|
||||
Loading…
x
Reference in New Issue
Block a user