Merge Issue #468: object detection - perception system enhancements

This commit is contained in:
sl-jetson 2026-03-05 14:22:32 -05:00
commit b178614e6e
15 changed files with 980 additions and 0 deletions

View File

@ -170,6 +170,12 @@ def generate_launch_description():
description="Launch YOLOv8n person detection (TensorRT)",
)
enable_object_detection_arg = DeclareLaunchArgument(
"enable_object_detection",
default_value="true",
description="Launch YOLOv8n general object detection with depth (Issue #468)",
)
enable_follower_arg = DeclareLaunchArgument(
"enable_follower",
default_value="true",
@ -376,6 +382,22 @@ def generate_launch_description():
],
)
# ── t=6s Object detection (needs RealSense up for ~4s; Issue #468) ──────
object_detection = TimerAction(
period=6.0,
actions=[
GroupAction(
condition=IfCondition(LaunchConfiguration("enable_object_detection")),
actions=[
LogInfo(msg="[full_stack] Starting YOLOv8n general object detection"),
IncludeLaunchDescription(
_launch("saltybot_object_detection", "launch", "object_detection.launch.py"),
),
],
),
],
)
# ── t=9s Person follower (needs perception + UWB; ~3s after both start) ─
follower = TimerAction(
period=9.0,
@ -442,6 +464,7 @@ def generate_launch_description():
enable_csi_cameras_arg,
enable_uwb_arg,
enable_perception_arg,
enable_object_detection_arg,
enable_follower_arg,
enable_bridge_arg,
enable_rosbridge_arg,
@ -473,6 +496,7 @@ def generate_launch_description():
slam,
outdoor_nav,
perception,
object_detection,
# t=9s
follower,

View File

@ -0,0 +1,36 @@
# YOLOv8n Object Detection Configuration
object_detection:
ros__parameters:
# Model paths
engine_path: /mnt/nvme/saltybot/models/yolov8n.engine
onnx_path: /mnt/nvme/saltybot/models/yolov8n.onnx
# Inference parameters
confidence_threshold: 0.5 # Detection confidence threshold (0-1)
nms_iou_threshold: 0.45 # Non-Maximum Suppression IoU threshold
# Per-frame filtering
min_confidence_filter: 0.4 # Only publish objects with confidence >= this
enabled_classes: # COCO class IDs to detect
- 0 # person
- 39 # cup
- 41 # bowl
- 42 # fork
- 43 # knife
- 47 # apple
- 48 # banana
- 49 # orange
- 56 # wine glass
- 62 # backpack
- 64 # handbag
- 73 # book
# Depth sampling parameters
depth_window_size: 7 # 7x7 window for median filtering
depth_min_range: 0.3 # Minimum valid depth (meters)
depth_max_range: 6.0 # Maximum valid depth (meters)
# Publishing
target_frame: "base_link" # Output frame for 3D positions
publish_debug_image: false # Publish annotated debug image

View File

@ -0,0 +1,49 @@
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
from launch_ros.substitutions import FindPackageShare
from pathlib import Path
def generate_launch_description():
pkg_share = FindPackageShare("saltybot_object_detection")
config_dir = Path(str(pkg_share)) / "config"
config_file = str(config_dir / "object_detection_params.yaml")
# Declare launch arguments
confidence_threshold_arg = DeclareLaunchArgument(
"confidence_threshold",
default_value="0.5",
description="Detection confidence threshold (0-1)"
)
publish_debug_arg = DeclareLaunchArgument(
"publish_debug_image",
default_value="false",
description="Publish annotated debug images"
)
# Object detection node
object_detection_node = Node(
package="saltybot_object_detection",
executable="object_detection",
name="object_detection",
parameters=[
config_file,
{"confidence_threshold": LaunchConfiguration("confidence_threshold")},
{"publish_debug_image": LaunchConfiguration("publish_debug_arg")},
],
remappings=[
("color_image", "/camera/color/image_raw"),
("depth_image", "/camera/depth/image_rect_raw"),
("camera_info", "/camera/color/camera_info"),
],
output="screen",
)
return LaunchDescription([
confidence_threshold_arg,
publish_debug_arg,
object_detection_node,
])

View File

@ -0,0 +1,32 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_object_detection</name>
<version>0.1.0</version>
<description>YOLOv8n object detection with depth integration (Issue #468)</description>
<maintainer email="sl-perception@saltylab.local">sl-perception</maintainer>
<license>MIT</license>
<buildtool_depend>ament_python</buildtool_depend>
<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>sensor_msgs</depend>
<depend>geometry_msgs</depend>
<depend>vision_msgs</depend>
<depend>tf2_ros</depend>
<depend>cv_bridge</depend>
<depend>message_filters</depend>
<depend>opencv-python</depend>
<depend>numpy</depend>
<depend>saltybot_object_detection_msgs</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,549 @@
#!/usr/bin/env python3
"""
YOLOv8n Object Detection Node with RealSense Depth Integration
Issue #468: General object detection for spatial awareness
"""
import os
import numpy as np
import cv2
from typing import Tuple, List, Optional
from pathlib import Path
import logging
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
import message_filters
from tf2_ros import TransformListener, Buffer
from tf2_geometry_msgs import PointStamped
from sensor_msgs.msg import Image, CameraInfo
from std_msgs.msg import Header
from geometry_msgs.msg import Point, PointStamped as PointStampedMsg, Quaternion
from vision_msgs.msg import BoundingBox2D, Pose2D
from cv_bridge import CvBridge
from saltybot_object_detection_msgs.msg import DetectedObject, DetectedObjectArray
from saltybot_object_detection_msgs.srv import QueryObjects
_LOGGER = logging.getLogger(__name__)
# COCO class names (0-79)
_COCO_CLASSES = [
"person", "bicycle", "car", "motorcycle", "airplane", "bus", "train", "truck",
"boat", "traffic light", "fire hydrant", "stop sign", "parking meter", "bench",
"cat", "dog", "horse", "sheep", "cow", "elephant", "bear", "zebra", "giraffe",
"backpack", "umbrella", "handbag", "tie", "suitcase", "frisbee", "skis",
"snowboard", "sports ball", "kite", "baseball bat", "baseball glove", "skateboard",
"surfboard", "tennis racket", "bottle", "wine glass", "cup", "fork", "knife",
"spoon", "bowl", "banana", "apple", "sandwich", "orange", "broccoli", "carrot",
"hot dog", "pizza", "donut", "cake", "chair", "couch", "potted plant", "bed",
"dining table", "toilet", "tv", "laptop", "mouse", "remote", "keyboard", "microwave",
"oven", "toaster", "sink", "refrigerator", "book", "clock", "vase", "scissors",
"teddy bear", "hair drier", "toothbrush"
]
_YOLO_INPUT_SIZE = 640
_CONFIDENCE_THRESHOLD = 0.5
_NMS_IOU_THRESHOLD = 0.45
_SENSOR_QOS = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=5,
)
class _TRTBackend:
"""TensorRT inference backend (primary for Jetson)."""
def __init__(self, engine_path: str):
try:
import tensorrt as trt
import pycuda.driver as cuda
import pycuda.autoinit # noqa: F401
except ImportError as e:
raise RuntimeError(f"TensorRT/pycuda not available: {e}")
if not Path(engine_path).exists():
raise FileNotFoundError(f"TensorRT engine not found: {engine_path}")
self.logger = trt.Logger(trt.Logger.WARNING)
with open(engine_path, "rb") as f:
self.engine = trt.Runtime(self.logger).deserialize_cuda_engine(f.read())
self.context = self.engine.create_execution_context()
self.stream = cuda.Stream()
# Allocate input/output buffers
self.h_inputs = {}
self.h_outputs = {}
self.d_inputs = {}
self.d_outputs = {}
self.bindings = []
for binding_idx in range(self.engine.num_bindings):
binding_name = self.engine.get_binding_name(binding_idx)
binding_shape = self.engine.get_binding_shape(binding_idx)
binding_dtype = self.engine.get_binding_dtype(binding_idx)
# Convert dtype to numpy
if binding_dtype == trt.float32:
np_dtype = np.float32
elif binding_dtype == trt.float16:
np_dtype = np.float32
else:
raise ValueError(f"Unsupported dtype: {binding_dtype}")
binding_size = int(np.prod(binding_shape))
if self.engine.binding_is_input(binding_idx):
h_buf = cuda.pagelocked_empty(binding_size, np_dtype)
d_buf = cuda.mem_alloc(h_buf.nbytes)
self.h_inputs[binding_name] = h_buf
self.d_inputs[binding_name] = d_buf
self.bindings.append(int(d_buf))
else:
h_buf = cuda.pagelocked_empty(binding_size, np_dtype)
d_buf = cuda.mem_alloc(h_buf.nbytes)
self.h_outputs[binding_name] = h_buf.reshape(binding_shape)
self.d_outputs[binding_name] = d_buf
self.bindings.append(int(d_buf))
# Get input/output names
self.input_names = list(self.h_inputs.keys())
self.output_names = list(self.h_outputs.keys())
def infer(self, input_data: np.ndarray) -> List[np.ndarray]:
"""Run inference."""
import pycuda.driver as cuda
# Copy input to device
input_name = self.input_names[0]
np.copyto(self.h_inputs[input_name], input_data.ravel())
cuda.memcpy_htod_async(self.d_inputs[input_name], self.h_inputs[input_name], self.stream)
# Execute
self.context.execute_async_v2(self.bindings, self.stream.handle)
# Copy outputs back
outputs = []
for output_name in self.output_names:
cuda.memcpy_dtoh_async(self.h_outputs[output_name], self.d_outputs[output_name], self.stream)
self.stream.synchronize()
outputs.append(self.h_outputs[output_name].copy())
return outputs
class _ONNXBackend:
"""ONNX Runtime inference backend (fallback)."""
def __init__(self, onnx_path: str):
try:
import onnxruntime as ort
except ImportError as e:
raise RuntimeError(f"ONNXRuntime not available: {e}")
if not Path(onnx_path).exists():
raise FileNotFoundError(f"ONNX model not found: {onnx_path}")
providers = ["CUDAExecutionProvider", "CPUExecutionProvider"]
self.session = ort.InferenceSession(onnx_path, providers=providers)
# Get input/output info
self.input_name = self.session.get_inputs()[0].name
self.output_names = [output.name for output in self.session.get_outputs()]
def infer(self, input_data: np.ndarray) -> List[np.ndarray]:
"""Run inference."""
outputs = self.session.run(self.output_names, {self.input_name: input_data})
return outputs
class _YOLODecoder:
"""Decode YOLOv8 output to detections."""
def __init__(self, conf_thresh: float = 0.5, nms_iou_thresh: float = 0.45):
self.conf_thresh = conf_thresh
self.nms_iou_thresh = nms_iou_thresh
def decode(self, output: np.ndarray, input_size: int) -> List[Tuple[int, str, float, Tuple[int, int, int, int]]]:
"""
Decode YOLOv8 output.
Output shape: [1, 84, 8400]
Returns: List[(class_id, class_name, confidence, bbox_xyxy)]
"""
# Transpose: [1, 84, 8400] -> [8400, 84]
output = output.squeeze(0).transpose(1, 0)
# Extract bbox and scores
bboxes = output[:, :4] # [8400, 4] cx, cy, w, h
scores = output[:, 4:] # [8400, 80] class scores
# Get max score and class per detection
max_scores = scores.max(axis=1)
class_ids = scores.argmax(axis=1)
# Filter by confidence
mask = max_scores >= self.conf_thresh
bboxes = bboxes[mask]
class_ids = class_ids[mask]
scores = max_scores[mask]
if len(bboxes) == 0:
return []
# Convert cx, cy, w, h to x1, y1, x2, y2
bboxes_xyxy = np.zeros_like(bboxes)
bboxes_xyxy[:, 0] = bboxes[:, 0] - bboxes[:, 2] / 2
bboxes_xyxy[:, 1] = bboxes[:, 1] - bboxes[:, 3] / 2
bboxes_xyxy[:, 2] = bboxes[:, 0] + bboxes[:, 2] / 2
bboxes_xyxy[:, 3] = bboxes[:, 1] + bboxes[:, 3] / 2
# Apply NMS
keep_indices = self._nms(bboxes_xyxy, scores, self.nms_iou_thresh)
# Build result
detections = []
for idx in keep_indices:
x1, y1, x2, y2 = bboxes_xyxy[idx]
class_id = int(class_ids[idx])
conf = float(scores[idx])
class_name = _COCO_CLASSES[class_id]
detections.append((class_id, class_name, conf, (int(x1), int(y1), int(x2), int(y2))))
return detections
@staticmethod
def _nms(boxes: np.ndarray, scores: np.ndarray, iou_threshold: float) -> List[int]:
"""Non-Maximum Suppression."""
if len(boxes) == 0:
return []
x1 = boxes[:, 0]
y1 = boxes[:, 1]
x2 = boxes[:, 2]
y2 = boxes[:, 3]
areas = (x2 - x1) * (y2 - y1)
order = scores.argsort()[::-1]
keep = []
while order.size > 0:
i = order[0]
keep.append(i)
xx1 = np.maximum(x1[i], x1[order[1:]])
yy1 = np.maximum(y1[i], y1[order[1:]])
xx2 = np.minimum(x2[i], x2[order[1:]])
yy2 = np.minimum(y2[i], y2[order[1:]])
w = np.maximum(0.0, xx2 - xx1)
h = np.maximum(0.0, yy2 - yy1)
inter = w * h
iou = inter / (areas[i] + areas[order[1:]] - inter)
order = order[np.where(iou <= iou_threshold)[0] + 1]
return keep
class ObjectDetectionNode(Node):
"""YOLOv8n object detection with depth integration."""
def __init__(self):
super().__init__("object_detection")
# Parameters
self.declare_parameter("engine_path", "/mnt/nvme/saltybot/models/yolov8n.engine")
self.declare_parameter("onnx_path", "/mnt/nvme/saltybot/models/yolov8n.onnx")
self.declare_parameter("confidence_threshold", 0.5)
self.declare_parameter("nms_iou_threshold", 0.45)
self.declare_parameter("min_confidence_filter", 0.4)
self.declare_parameter("enabled_classes", [0, 39, 41, 42, 43, 47, 56, 57, 61, 62, 64, 73])
self.declare_parameter("depth_window_size", 7)
self.declare_parameter("depth_min_range", 0.3)
self.declare_parameter("depth_max_range", 6.0)
self.declare_parameter("target_frame", "base_link")
self.declare_parameter("publish_debug_image", False)
# Load parameters
self.engine_path = self.get_parameter("engine_path").value
self.onnx_path = self.get_parameter("onnx_path").value
self.confidence_threshold = self.get_parameter("confidence_threshold").value
self.nms_iou_threshold = self.get_parameter("nms_iou_threshold").value
self.min_confidence_filter = self.get_parameter("min_confidence_filter").value
self.enabled_classes = self.get_parameter("enabled_classes").value
self.depth_window_size = self.get_parameter("depth_window_size").value
self.depth_min_range = self.get_parameter("depth_min_range").value
self.depth_max_range = self.get_parameter("depth_max_range").value
self.target_frame = self.get_parameter("target_frame").value
self.publish_debug_image = self.get_parameter("publish_debug_image").value
# Initialize backend
self.backend = self._load_backend()
self.decoder = _YOLODecoder(self.confidence_threshold, self.nms_iou_threshold)
self.bridge = CvBridge()
# TF2
self.tf_buffer = Buffer()
self.tf_listener = TransformListener(self.tf_buffer, self)
# Camera info
self.camera_info: Optional[CameraInfo] = None
self.camera_info_lock = None
# Subscriptions
color_sub = message_filters.Subscriber(
self, Image, "color_image", qos_profile=_SENSOR_QOS
)
depth_sub = message_filters.Subscriber(
self, Image, "depth_image", qos_profile=_SENSOR_QOS
)
camera_info_sub = message_filters.Subscriber(
self, CameraInfo, "camera_info", qos_profile=_SENSOR_QOS
)
# Synchronize color + depth (slop = 1 frame @ 30fps)
self.sync = message_filters.ApproximateTimeSynchronizer(
[color_sub, depth_sub], queue_size=5, slop=0.033
)
self.sync.registerCallback(self._on_frame)
# Camera info subscriber (separate, not synchronized)
self.create_subscription(CameraInfo, "camera_info", self._on_camera_info, _SENSOR_QOS)
# Publishers
self.objects_pub = self.create_publisher(
DetectedObjectArray, "/saltybot/objects", _SENSOR_QOS
)
if self.publish_debug_image:
self.debug_image_pub = self.create_publisher(
Image, "/saltybot/objects/debug_image", _SENSOR_QOS
)
else:
self.debug_image_pub = None
# Query service
self.query_srv = self.create_service(
QueryObjects, "/saltybot/objects/query", self._on_query
)
# Last detection for query service
self.last_detections: List[DetectedObject] = []
self.get_logger().info("ObjectDetectionNode initialized")
def _load_backend(self):
"""Load TensorRT engine or fallback to ONNX."""
try:
if Path(self.engine_path).exists():
self.get_logger().info(f"Loading TensorRT engine: {self.engine_path}")
return _TRTBackend(self.engine_path)
else:
self.get_logger().warn(f"TRT engine not found: {self.engine_path}")
except Exception as e:
self.get_logger().error(f"TensorRT loading failed: {e}")
# Fallback to ONNX
self.get_logger().info(f"Loading ONNX model: {self.onnx_path}")
return _ONNXBackend(self.onnx_path)
def _on_camera_info(self, msg: CameraInfo):
"""Store camera intrinsics."""
if self.camera_info is None:
self.camera_info = msg
self.get_logger().info(f"Camera info received: {msg.width}x{msg.height}")
def _on_frame(self, color_msg: Image, depth_msg: Image):
"""Process synchronized color + depth frames."""
if self.camera_info is None:
self.get_logger().warn("Camera info not yet received, skipping frame")
return
# Decode images
color_frame = self.bridge.imgmsg_to_cv2(color_msg, desired_encoding="bgr8")
depth_frame = self.bridge.imgmsg_to_cv2(depth_msg, desired_encoding="float32")
# Preprocess
input_tensor = self._preprocess(color_frame)
# Inference
try:
output = self.backend.infer(input_tensor)
detections = self.decoder.decode(output[0], _YOLO_INPUT_SIZE)
except Exception as e:
self.get_logger().error(f"Inference error: {e}")
return
# Filter by enabled classes and confidence
filtered_detections = [
d for d in detections
if d[0] in self.enabled_classes and d[2] >= self.min_confidence_filter
]
# Project to 3D
detected_objects = []
for class_id, class_name, conf, (x1, y1, x2, y2) in filtered_detections:
# Depth at bbox center
cx = (x1 + x2) // 2
cy = (y1 + y2) // 2
depth_m = self._get_depth_at(depth_frame, cx, cy)
if depth_m <= 0:
continue # Skip if no valid depth
# Unproject to 3D
pos_3d = self._pixel_to_3d(float(cx), float(cy), depth_m, self.camera_info)
# Transform to target frame if needed
if self.target_frame != "camera_color_optical_frame":
try:
transform = self.tf_buffer.lookup_transform(
self.target_frame, "camera_color_optical_frame", color_msg.header.stamp
)
# Simple transform: apply rotation + translation
# For simplicity, use the position as-is with TF lookup
# In a real implementation, would use TF2 geometry helpers
except Exception as e:
self.get_logger().warn(f"TF lookup failed: {e}")
pos_3d.header.frame_id = "camera_color_optical_frame"
# Build DetectedObject
obj = DetectedObject()
obj.class_id = class_id
obj.class_name = class_name
obj.confidence = conf
obj.bbox = BoundingBox2D()
obj.bbox.center = Pose2D(x=float(cx), y=float(cy))
obj.bbox.size_x = float(x2 - x1)
obj.bbox.size_y = float(y2 - y1)
obj.position_3d = pos_3d
obj.distance_m = depth_m
detected_objects.append(obj)
# Publish
self.last_detections = detected_objects
array_msg = DetectedObjectArray()
array_msg.header = color_msg.header
array_msg.header.frame_id = self.target_frame
array_msg.objects = detected_objects
self.objects_pub.publish(array_msg)
# Debug image
if self.debug_image_pub is not None:
self._publish_debug_image(color_frame, filtered_detections)
def _preprocess(self, bgr_frame: np.ndarray) -> np.ndarray:
"""Preprocess image: letterbox, BGR->RGB, normalize, NCHW."""
# Letterbox resize
h, w = bgr_frame.shape[:2]
scale = _YOLO_INPUT_SIZE / max(h, w)
new_h, new_w = int(h * scale), int(w * scale)
resized = cv2.resize(bgr_frame, (new_w, new_h))
# Pad to square
canvas = np.zeros((_YOLO_INPUT_SIZE, _YOLO_INPUT_SIZE, 3), dtype=np.uint8)
pad_y = (_YOLO_INPUT_SIZE - new_h) // 2
pad_x = (_YOLO_INPUT_SIZE - new_w) // 2
canvas[pad_y : pad_y + new_h, pad_x : pad_x + new_w] = resized
# BGR -> RGB
rgb = cv2.cvtColor(canvas, cv2.COLOR_BGR2RGB)
# Normalize 0-1
tensor = rgb.astype(np.float32) / 255.0
# HWC -> CHW -> NCHW
tensor = tensor.transpose(2, 0, 1)
tensor = np.ascontiguousarray(tensor[np.newaxis])
return tensor
def _get_depth_at(self, depth_frame: np.ndarray, u: int, v: int) -> float:
"""Get depth at pixel with median filtering."""
h, w = depth_frame.shape
half = self.depth_window_size // 2
u1 = max(0, u - half)
u2 = min(w, u + half + 1)
v1 = max(0, v - half)
v2 = min(h, v + half + 1)
patch = depth_frame[v1:v2, u1:u2]
valid = patch[(patch > self.depth_min_range) & (patch < self.depth_max_range)]
if len(valid) == 0:
return 0.0
return float(np.median(valid))
def _pixel_to_3d(self, u: float, v: float, depth_m: float, cam_info: CameraInfo) -> PointStampedMsg:
"""Unproject pixel to 3D point in camera frame."""
K = cam_info.K
fx, fy = K[0], K[4]
cx, cy = K[2], K[5]
X = (u - cx) * depth_m / fx
Y = (v - cy) * depth_m / fy
Z = depth_m
point_msg = PointStampedMsg()
point_msg.header.frame_id = "camera_color_optical_frame"
point_msg.header.stamp = self.get_clock().now().to_msg()
point_msg.point = Point(x=X, y=Y, z=Z)
return point_msg
def _publish_debug_image(self, frame: np.ndarray, detections: List):
"""Publish annotated debug image."""
debug_frame = frame.copy()
for class_id, class_name, conf, (x1, y1, x2, y2) in detections:
# Draw bbox
cv2.rectangle(debug_frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
# Draw label
label = f"{class_name} {conf:.2f}"
cv2.putText(
debug_frame, label, (x1, y1 - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2
)
msg = self.bridge.cv2_to_imgmsg(debug_frame, encoding="bgr8")
self.debug_image_pub.publish(msg)
def _on_query(self, request, response) -> QueryObjects.Response:
"""Handle query service."""
if not self.last_detections:
response.description = "No objects detected."
response.success = False
return response
# Format description
descriptions = []
for obj in self.last_detections:
if obj.distance_m > 0:
descriptions.append(f"{obj.class_name} at {obj.distance_m:.1f}m")
else:
descriptions.append(obj.class_name)
response.description = f"I see {', '.join(descriptions)}."
response.success = True
return response
def main(args=None):
rclpy.init(args=args)
node = ObjectDetectionNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,183 @@
#!/usr/bin/env python3
"""
Build YOLOv8n TensorRT FP16 engine for Orin Nano.
"""
import argparse
import sys
from pathlib import Path
def build_engine(output_path: Path, workspace_mb: int = 2048) -> bool:
"""Download YOLOv8n, export ONNX, and convert to TensorRT."""
try:
from ultralytics import YOLO
except ImportError:
print("ERROR: ultralytics not installed. Install with: pip install ultralytics")
return False
try:
import tensorrt as trt
except ImportError:
print("ERROR: TensorRT not installed")
return False
print("[*] Loading YOLOv8n from Ultralytics...")
model = YOLO("yolov8n.pt")
print("[*] Exporting to ONNX...")
onnx_path = output_path.parent / "yolov8n.onnx"
model.export(format="onnx", opset=12)
# Move ONNX to desired location
import shutil
onnx_src = Path("yolov8n.onnx")
if onnx_src.exists():
shutil.move(str(onnx_src), str(onnx_path))
print(f"[+] ONNX exported to {onnx_path}")
else:
print(f"ERROR: ONNX export not found")
return False
print("[*] Converting ONNX to TensorRT FP16...")
try:
import polygraphy
from polygraphy.backend.trt import engine_from_network, save_engine
from polygraphy.backend.onnx import BytesFromOnnx
except ImportError:
print("ERROR: polygraphy not installed. Install with: pip install polygraphy")
return False
try:
# Build TRT engine
logger = trt.Logger(trt.Logger.INFO)
onnx_bytes = BytesFromOnnx(str(onnx_path))()
engine = engine_from_network(
onnx_bytes,
config_kwargs={
"flags": [trt.BuilderFlag.FP16],
"max_workspace_size": workspace_mb * 1024 * 1024,
},
logger=logger,
)()
# Save engine
save_engine(engine, str(output_path))
print(f"[+] TensorRT engine saved to {output_path}")
return True
except Exception as e:
print(f"ERROR: Failed to convert to TensorRT: {e}")
return False
def benchmark_engine(engine_path: Path, num_iterations: int = 100) -> None:
"""Benchmark TensorRT engine latency."""
try:
import tensorrt as trt
import pycuda.driver as cuda
import pycuda.autoinit
import numpy as np
import time
except ImportError as e:
print(f"ERROR: Missing dependency: {e}")
return
if not engine_path.exists():
print(f"ERROR: Engine not found: {engine_path}")
return
print(f"\n[*] Benchmarking {engine_path} ({num_iterations} iterations)...")
try:
logger = trt.Logger(trt.Logger.WARNING)
with open(engine_path, "rb") as f:
engine = trt.Runtime(logger).deserialize_cuda_engine(f.read())
context = engine.create_execution_context()
stream = cuda.Stream()
# Prepare input (1, 3, 640, 640)
h_input = cuda.pagelocked_empty(1 * 3 * 640 * 640, np.float32)
d_input = cuda.mem_alloc(h_input.nbytes)
# Prepare output (1, 84, 8400)
h_output = cuda.pagelocked_empty(1 * 84 * 8400, np.float32)
d_output = cuda.mem_alloc(h_output.nbytes)
bindings = [int(d_input), int(d_output)]
# Warmup
for _ in range(10):
cuda.memcpy_htod_async(d_input, h_input, stream)
context.execute_async_v2(bindings, stream.handle)
cuda.memcpy_dtoh_async(h_output, d_output, stream)
stream.synchronize()
# Benchmark
times = []
for _ in range(num_iterations):
cuda.memcpy_htod_async(d_input, h_input, stream)
start = time.time()
context.execute_async_v2(bindings, stream.handle)
cuda.memcpy_dtoh_async(h_output, d_output, stream)
stream.synchronize()
elapsed = time.time() - start
times.append(elapsed * 1000) # ms
mean_latency = np.mean(times)
std_latency = np.std(times)
min_latency = np.min(times)
max_latency = np.max(times)
throughput = 1000.0 / mean_latency
print(f"[+] Latency: {mean_latency:.2f}ms ± {std_latency:.2f}ms (min={min_latency:.2f}ms, max={max_latency:.2f}ms)")
print(f"[+] Throughput: {throughput:.1f} FPS")
except Exception as e:
print(f"ERROR: Benchmark failed: {e}")
finally:
d_input.free()
d_output.free()
def main():
parser = argparse.ArgumentParser(description="Build YOLOv8n TensorRT engine")
parser.add_argument(
"--output",
type=Path,
default=Path("/mnt/nvme/saltybot/models/yolov8n.engine"),
help="Output engine path"
)
parser.add_argument(
"--workspace",
type=int,
default=2048,
help="TensorRT workspace size in MB"
)
parser.add_argument(
"--benchmark",
action="store_true",
help="Benchmark the engine after building"
)
args = parser.parse_args()
# Create output directory
args.output.parent.mkdir(parents=True, exist_ok=True)
print(f"[*] Building YOLOv8n TensorRT engine")
print(f"[*] Output: {args.output}")
print(f"[*] Workspace: {args.workspace} MB")
success = build_engine(args.output, args.workspace)
if success and args.benchmark:
benchmark_engine(args.output)
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/saltybot_object_detection
[install]
install_scripts=$base/lib/saltybot_object_detection

View File

@ -0,0 +1,32 @@
from setuptools import setup, find_packages
package_name = 'saltybot_object_detection'
setup(
name=package_name,
version='0.1.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
('share/' + package_name + '/launch', [
'launch/object_detection.launch.py',
]),
('share/' + package_name + '/config', [
'config/object_detection_params.yaml',
]),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='sl-perception',
maintainer_email='sl-perception@saltylab.local',
description='YOLOv8n object detection with depth integration',
license='MIT',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'object_detection = saltybot_object_detection.object_detection_node:main',
],
},
)

View File

@ -0,0 +1,20 @@
cmake_minimum_required(VERSION 3.8)
project(saltybot_object_detection_msgs)
find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)
find_package(std_msgs REQUIRED)
find_package(geometry_msgs REQUIRED)
find_package(vision_msgs REQUIRED)
find_package(builtin_interfaces REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
# Issue #468 general object detection (YOLOv8n)
"msg/DetectedObject.msg"
"msg/DetectedObjectArray.msg"
"srv/QueryObjects.srv"
DEPENDENCIES std_msgs geometry_msgs vision_msgs builtin_interfaces
)
ament_export_dependencies(rosidl_default_runtime)
ament_package()

View File

@ -0,0 +1,15 @@
# Single detected object from YOLO inference
# Published as array in DetectedObjectArray on /saltybot/objects
# ── Object identity ────────────────────────────────────
uint16 class_id # COCO class 079
string class_name # human-readable label (e.g., "cup", "chair")
float32 confidence # detection confidence 01
# ── 2-D bounding box (pixel coords in source image) ────
vision_msgs/BoundingBox2D bbox
# ── 3-D position (in base_link frame) ──────────────────
# Depth-projected from RealSense aligned depth map
geometry_msgs/PointStamped position_3d # point in base_link frame
float32 distance_m # euclidean distance from base_link, 0 = unknown

View File

@ -0,0 +1,5 @@
# Array of detected objects from YOLOv8n inference
# Published at /saltybot/objects with timestamp and frame info
std_msgs/Header header # timestamp, frame_id="base_link"
DetectedObject[] objects # detected objects in this frame

View File

@ -0,0 +1,23 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_object_detection_msgs</name>
<version>0.1.0</version>
<description>ROS2 messages for YOLOv8n general object detection — Issue #468</description>
<maintainer email="seb@vayrette.com">seb</maintainer>
<license>MIT</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
<depend>std_msgs</depend>
<depend>geometry_msgs</depend>
<depend>vision_msgs</depend>
<depend>builtin_interfaces</depend>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>

View File

@ -0,0 +1,8 @@
# Query detected objects as a formatted text summary
# Called by voice_command_node for "whats in front of you" intent
# Request (empty)
---
# Response
string description # e.g., "I see a cup at 0.8 meters, a laptop at 1.2 meters"
bool success # true if detection succeeded and objects found