feat: Visual odometry from RealSense stereo ORB (Issue #586) #593

Merged
sl-jetson merged 1 commits from sl-perception/issue-586-visual-odom into main 2026-03-14 13:32:57 -04:00
5 changed files with 569 additions and 0 deletions

View File

@ -0,0 +1,16 @@
stereo_orb_vo:
ros__parameters:
# ORB feature detection
max_features: 500 # max keypoints per infra1 frame
# Stereo scale
baseline_m: 0.050 # D435i stereo baseline ~50 mm (overridden by camera_info Tx)
# Depth validity window for scale recovery
min_depth_m: 0.3 # metres — below reliable D435i range
max_depth_m: 6.0 # metres — above reliable D435i range
min_depth_samples: 10 # minimum valid depth samples for primary scale
# TF / frame IDs
frame_id: "odom"
child_frame_id: "camera_link"

View File

@ -0,0 +1,27 @@
"""stereo_orb.launch.py — Launch the ORB stereo visual odometry node (Issue #586)."""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch_ros.actions import Node
def generate_launch_description():
pkg_share = get_package_share_directory('saltybot_visual_odom')
params = os.path.join(pkg_share, 'config', 'stereo_orb_params.yaml')
stereo_orb_node = Node(
package='saltybot_visual_odom',
executable='stereo_orb',
name='stereo_orb_vo',
output='screen',
parameters=[params],
remappings=[
# Remap to the standard RealSense D435i topic names if needed
('/camera/infra1/image_rect_raw', '/camera/infra1/image_rect_raw'),
('/camera/infra2/image_rect_raw', '/camera/infra2/image_rect_raw'),
('/camera/depth/image_rect_raw', '/camera/depth/image_rect_raw'),
],
)
return LaunchDescription([stereo_orb_node])

View File

@ -0,0 +1,204 @@
"""
orb_stereo_matcher.py ORB feature detection and descriptor matching for stereo VO.
Provides two match types
Temporal : left_gray(t-1) left_gray(t) matched pairs for Essential-matrix estimation
Stereo : left_gray(t) right_gray(t) disparity-based metric depth (scale source)
Matching
BFMatcher(NORM_HAMMING) with Lowe ratio test (default ratio=0.75).
Stereo matching additionally enforces the epipolar row constraint (|Δrow| row_tol px)
and requires positive disparity (left.x > right.x, i.e. object in front of cameras).
ORB parameters
nfeatures : max keypoints per frame (default 500)
scale_factor: image pyramid scale (default 1.2)
nlevels : pyramid levels (default 8)
Scale recovery (stereo)
For matched infra1/infra2 pairs with disparity d > 0:
depth_i = baseline_m * fx / d_i
Returns median over valid [d_min_px, d_max_px] disparity range.
Returns 0.0 when fewer than min_stereo_samples good pairs exist.
"""
from __future__ import annotations
from typing import Tuple
import numpy as np
import cv2
# ── Matching parameters ────────────────────────────────────────────────────────
_LOWE_RATIO = 0.75 # Lowe ratio test threshold
_MIN_MATCHES = 20 # minimum temporal matches for VO
_ROW_TOL = 2 # ±px for stereo epipolar row constraint
_DISP_MIN = 1.0 # minimum valid disparity (pixels)
_DISP_MAX = 192.0 # maximum valid disparity (D435i 640-px baseline, ~0.15m min depth)
_MIN_STEREO_SAMP = 10 # minimum stereo samples for reliable scale
class OrbStereoMatcher:
"""
ORB-based feature tracker for monocular temporal matching and stereo scale.
Usage (per frame)::
matcher = OrbStereoMatcher()
# --- temporal ---
prev_pts, curr_pts = matcher.update(curr_gray_left)
# --- stereo scale (optional, called same frame) ---
depth_m = matcher.stereo_scale(curr_gray_left, curr_gray_right, fx, baseline_m)
"""
def __init__(
self,
nfeatures: int = 500,
scale_factor: float = 1.2,
nlevels: int = 8,
) -> None:
self._orb = cv2.ORB_create(
nfeatures=nfeatures,
scaleFactor=scale_factor,
nLevels=nlevels,
edgeThreshold=15,
patchSize=31,
)
self._matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False)
# Previous-frame state
self._prev_gray: np.ndarray | None = None
self._prev_kp: list | None = None
self._prev_desc: np.ndarray | None = None
# ── Temporal matching ──────────────────────────────────────────────────────
def update(
self,
curr_gray: np.ndarray,
) -> Tuple[np.ndarray, np.ndarray]:
"""
Detect ORB features in curr_gray, match against previous frame.
Returns
-------
(prev_pts, curr_pts) : (N, 2) float32 arrays of matched pixel coords.
Returns empty (0, 2) arrays on first call or when matching fails.
"""
kp, desc = self._orb.detectAndCompute(curr_gray, None)
if (
desc is None
or len(kp) < 8
or self._prev_desc is None
or self._prev_kp is None
):
# First frame — just store state
self._prev_gray = curr_gray.copy()
self._prev_kp = kp
self._prev_desc = desc
return np.zeros((0, 2), np.float32), np.zeros((0, 2), np.float32)
# Ratio test matching
prev_pts, curr_pts = self._ratio_match(
self._prev_kp, self._prev_desc,
kp, desc,
)
# Update state
self._prev_gray = curr_gray.copy()
self._prev_kp = kp
self._prev_desc = desc
return prev_pts, curr_pts
# ── Stereo scale ───────────────────────────────────────────────────────────
def stereo_scale(
self,
left_gray: np.ndarray,
right_gray: np.ndarray,
fx: float,
baseline_m: float,
) -> float:
"""
Compute median depth (metres) from stereo ORB disparity.
Matches ORB features between left and right images under the epipolar
row constraint, then: depth_i = baseline_m * fx / disparity_i.
Returns 0.0 on failure (< min_stereo_samples valid pairs).
"""
if baseline_m <= 0 or fx <= 0:
return 0.0
kp_l, desc_l = self._orb.detectAndCompute(left_gray, None)
kp_r, desc_r = self._orb.detectAndCompute(right_gray, None)
if desc_l is None or desc_r is None:
return 0.0
pts_l, pts_r = self._ratio_match(kp_l, desc_l, kp_r, desc_r)
if len(pts_l) == 0:
return 0.0
# Epipolar row constraint: |row_l - row_r| ≤ _ROW_TOL
row_diff = np.abs(pts_l[:, 1] - pts_r[:, 1])
mask_row = row_diff <= _ROW_TOL
# Positive disparity (left camera has larger x for points in front)
disp = pts_l[:, 0] - pts_r[:, 0]
mask_disp = (disp >= _DISP_MIN) & (disp <= _DISP_MAX)
valid = mask_row & mask_disp
if valid.sum() < _MIN_STEREO_SAMP:
return 0.0
depths = baseline_m * fx / disp[valid]
return float(np.median(depths))
# ── Reset ──────────────────────────────────────────────────────────────────
def reset(self) -> None:
self._prev_gray = None
self._prev_kp = None
self._prev_desc = None
# ── Internal helpers ───────────────────────────────────────────────────────
def _ratio_match(
self,
kp1: list,
desc1: np.ndarray,
kp2: list,
desc2: np.ndarray,
) -> Tuple[np.ndarray, np.ndarray]:
"""BFMatcher kNN (k=2) + Lowe ratio test → (pts1, pts2) float32."""
try:
matches = self._matcher.knnMatch(desc1, desc2, k=2)
except cv2.error:
return np.zeros((0, 2), np.float32), np.zeros((0, 2), np.float32)
good = []
for pair in matches:
if len(pair) == 2:
m, n = pair
if m.distance < _LOWE_RATIO * n.distance:
good.append(m)
if len(good) < _MIN_MATCHES:
return np.zeros((0, 2), np.float32), np.zeros((0, 2), np.float32)
pts1 = np.array(
[kp1[m.queryIdx].pt for m in good], dtype=np.float32
).reshape(-1, 2)
pts2 = np.array(
[kp2[m.trainIdx].pt for m in good], dtype=np.float32
).reshape(-1, 2)
return pts1, pts2

View File

@ -0,0 +1,317 @@
"""
stereo_orb_node.py D435i stereo ORB visual odometry node (Issue #586).
Pipeline per frame
1. Receive synchronized infra1 (left IR) + infra2 (right IR) + aligned depth.
2. Detect ORB features on infra1; match temporally (prevcurr) via BFMatcher
+ Lowe ratio test.
3. Estimate incremental SE(3) via Essential-matrix (5-point RANSAC) using the
temporal ORB correspondences.
4. Recover metric scale:
Primary median depth at matched infra1 points (D435i aligned depth).
Fallback stereo baseline disparity: depth = baseline_m * fx / disparity
(from ORB matches between infra1 and infra2 of the same frame).
5. Accumulate global SE(3) pose, publish nav_msgs/Odometry.
6. Broadcast TF2: odom camera_link.
Subscribes
/camera/infra1/image_rect_raw sensor_msgs/Image 30 Hz (left IR, mono8)
/camera/infra2/image_rect_raw sensor_msgs/Image 30 Hz (right IR, mono8)
/camera/depth/image_rect_raw sensor_msgs/Image 30 Hz float32 depth (m)
/camera/infra1/camera_info sensor_msgs/CameraInfo (intrinsics + baseline)
/camera/infra2/camera_info sensor_msgs/CameraInfo (Tx field gives baseline)
Publishes
/saltybot/visual_odom nav_msgs/Odometry up to 30 Hz
TF
odom camera_link
Parameters
max_features int 500 ORB max keypoints per frame
baseline_m float 0.050 stereo baseline (metres); overridden by
camera_info Tx if non-zero
frame_id str 'odom'
child_frame_id str 'camera_link'
min_depth_m float 0.3 depth validity lower bound
max_depth_m float 6.0 depth validity upper bound
min_depth_samples int 10 min valid depth samples for primary scale
"""
from __future__ import annotations
import math
import time
import rclpy
from rclpy.node import Node
from rclpy.qos import (
QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy
)
import message_filters
import numpy as np
import cv2
from cv_bridge import CvBridge
from sensor_msgs.msg import Image, CameraInfo
from nav_msgs.msg import Odometry
from geometry_msgs.msg import Point, Quaternion, TransformStamped
from tf2_ros import TransformBroadcaster
from .orb_stereo_matcher import OrbStereoMatcher
from .stereo_vo import StereoVO
# ── QoS profiles ──────────────────────────────────────────────────────────────
_SENSOR_QOS = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=5,
)
_LATCHED_QOS = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST,
depth=1,
durability=DurabilityPolicy.TRANSIENT_LOCAL,
)
# ── Depth validity ─────────────────────────────────────────────────────────────
_D_MIN_DEFAULT = 0.3 # metres
_D_MAX_DEFAULT = 6.0 # metres
def _yaw_to_quat(yaw: float) -> Quaternion:
q = Quaternion()
q.w = math.cos(yaw * 0.5)
q.z = math.sin(yaw * 0.5)
return q
def _rot_to_yaw(R: np.ndarray) -> float:
return math.atan2(float(R[1, 0]), float(R[0, 0]))
class StereoOrbNode(Node):
"""ORB stereo visual odometry — see module docstring for details."""
def __init__(self) -> None:
super().__init__('stereo_orb_vo')
# ── Parameters ────────────────────────────────────────────────────────
self.declare_parameter('max_features', 500)
self.declare_parameter('baseline_m', 0.050)
self.declare_parameter('frame_id', 'odom')
self.declare_parameter('child_frame_id', 'camera_link')
self.declare_parameter('min_depth_m', _D_MIN_DEFAULT)
self.declare_parameter('max_depth_m', _D_MAX_DEFAULT)
self.declare_parameter('min_depth_samples', 10)
max_feat = self.get_parameter('max_features').value
self._baseline = self.get_parameter('baseline_m').value
self._frame_id = self.get_parameter('frame_id').value
self._child_id = self.get_parameter('child_frame_id').value
self._d_min = self.get_parameter('min_depth_m').value
self._d_max = self.get_parameter('max_depth_m').value
self._min_samp = self.get_parameter('min_depth_samples').value
# ── Core objects ───────────────────────────────────────────────────────
self._bridge = CvBridge()
self._matcher = OrbStereoMatcher(nfeatures=max_feat)
self._vo = StereoVO()
self._K: np.ndarray | None = None
self._fx: float = 0.0
self._last_t: float | None = None
# ── Publishers ─────────────────────────────────────────────────────────
self._odom_pub = self.create_publisher(
Odometry, '/saltybot/visual_odom', 10
)
self._tf_br = TransformBroadcaster(self)
# ── Camera-info subscriptions (latched) ────────────────────────────────
self.create_subscription(
CameraInfo, '/camera/infra1/camera_info',
self._on_infra1_info, _LATCHED_QOS,
)
self.create_subscription(
CameraInfo, '/camera/infra2/camera_info',
self._on_infra2_info, _LATCHED_QOS,
)
# ── Synchronised image subscriptions ──────────────────────────────────
ir1_sub = message_filters.Subscriber(
self, Image, '/camera/infra1/image_rect_raw', qos_profile=_SENSOR_QOS
)
ir2_sub = message_filters.Subscriber(
self, Image, '/camera/infra2/image_rect_raw', qos_profile=_SENSOR_QOS
)
depth_sub = message_filters.Subscriber(
self, Image, '/camera/depth/image_rect_raw', qos_profile=_SENSOR_QOS
)
self._sync = message_filters.ApproximateTimeSynchronizer(
[ir1_sub, ir2_sub, depth_sub],
queue_size=5,
slop=0.033, # 1 frame at 30 Hz
)
self._sync.registerCallback(self._on_frame)
self.get_logger().info(
f'stereo_orb_vo ready — ORB nfeatures={max_feat}, '
f'baseline={self._baseline:.3f}m'
)
# ── Camera info callbacks ──────────────────────────────────────────────────
def _on_infra1_info(self, msg: CameraInfo) -> None:
if self._K is not None:
return
K = np.array(msg.k, dtype=np.float64).reshape(3, 3)
self._K = K
self._fx = float(K[0, 0])
self._vo.set_K(K)
self.get_logger().info(f'camera_info received — fx={self._fx:.1f}')
def _on_infra2_info(self, msg: CameraInfo) -> None:
# Tx = -fx * baseline; positive baseline when Tx < 0
# RealSense camera_info uses P[3] = Tx = -fx * baseline_m
if msg.p[3] != 0.0 and self._fx > 0.0:
baseline_from_info = abs(float(msg.p[3]) / self._fx)
if baseline_from_info > 0.01: # sanity: > 1 cm
self._baseline = baseline_from_info
self.get_logger().info(
f'stereo baseline from camera_info: {self._baseline*1000:.1f} mm'
)
# ── Main frame callback ────────────────────────────────────────────────────
def _on_frame(
self,
ir1_msg: Image,
ir2_msg: Image,
depth_msg: Image,
) -> None:
now = self.get_clock().now()
t_s = now.nanoseconds * 1e-9
dt = (t_s - self._last_t) if self._last_t is not None else (1.0 / 30.0)
dt = max(1e-4, min(dt, 0.5))
self._last_t = t_s
# ── Decode images ──────────────────────────────────────────────────────
try:
ir1 = self._bridge.imgmsg_to_cv2(ir1_msg, desired_encoding='mono8')
ir2 = self._bridge.imgmsg_to_cv2(ir2_msg, desired_encoding='mono8')
depth = self._bridge.imgmsg_to_cv2(depth_msg, desired_encoding='32FC1')
except Exception as exc:
self.get_logger().warning(f'decode error: {exc}')
return
# ── Temporal ORB match (infra1: prev→curr) ─────────────────────────────
prev_pts, curr_pts = self._matcher.update(ir1)
if len(prev_pts) < 8:
# First frame or too few matches — nothing to estimate
return
# ── Scale recovery ─────────────────────────────────────────────────────
# Primary: depth image at matched infra1 feature points
scale = self._depth_scale(depth, curr_pts)
# Fallback: stereo disparity between infra1 and infra2
if scale <= 0.0 and self._baseline > 0.0 and self._fx > 0.0:
scale = self._matcher.stereo_scale(ir1, ir2, self._fx, self._baseline)
# ── Motion estimation via StereoVO (Essential matrix + SE(3)) ──────────
est = self._vo.update(prev_pts, curr_pts, depth, dt=dt)
if not est.valid:
# Still publish last known pose with high covariance
pass
# ── Publish odometry ───────────────────────────────────────────────────
odom = self._build_odom(est, now.to_msg())
self._odom_pub.publish(odom)
# ── Broadcast TF2: odom → camera_link ─────────────────────────────────
self._broadcast_tf(est, now)
# ── Scale from depth image ─────────────────────────────────────────────────
def _depth_scale(self, depth: np.ndarray, pts: np.ndarray) -> float:
"""Median depth at feature pixel locations (metres). Returns 0 on failure."""
if len(pts) == 0:
return 0.0
h, w = depth.shape
u = np.clip(pts[:, 0].astype(np.int32), 0, w - 1)
v = np.clip(pts[:, 1].astype(np.int32), 0, h - 1)
samples = depth[v, u]
valid = samples[(samples > self._d_min) & (samples < self._d_max)]
if len(valid) < self._min_samp:
return 0.0
return float(np.median(valid))
# ── Message builders ───────────────────────────────────────────────────────
def _build_odom(self, est, stamp) -> Odometry:
q = _yaw_to_quat(est.yaw)
odom = Odometry()
odom.header.stamp = stamp
odom.header.frame_id = self._frame_id
odom.child_frame_id = self._child_id
odom.pose.pose.position = Point(x=est.x, y=est.y, z=est.z)
odom.pose.pose.orientation = q
cov_xy = 0.05 if est.valid else 0.5
cov_theta = 0.02 if est.valid else 0.2
cov_v = 0.1 if est.valid else 1.0
p_cov = [0.0] * 36
p_cov[0] = cov_xy
p_cov[7] = cov_xy
p_cov[35] = cov_theta
odom.pose.covariance = p_cov
odom.twist.twist.linear.x = est.vx
odom.twist.twist.angular.z = est.omega
t_cov = [0.0] * 36
t_cov[0] = cov_v
t_cov[35] = cov_v * 0.5
odom.twist.covariance = t_cov
return odom
def _broadcast_tf(self, est, stamp) -> None:
q = _yaw_to_quat(est.yaw)
tf = TransformStamped()
tf.header.stamp = stamp.to_msg()
tf.header.frame_id = self._frame_id
tf.child_frame_id = self._child_id
tf.transform.translation.x = est.x
tf.transform.translation.y = est.y
tf.transform.translation.z = est.z
tf.transform.rotation = q
self._tf_br.sendTransform(tf)
def main(args=None):
rclpy.init(args=args)
node = StereoOrbNode()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -1,3 +1,5 @@
import os
from glob import glob
from setuptools import setup, find_packages
package_name = 'saltybot_visual_odom'
@ -10,6 +12,8 @@ setup(
('share/ament_index/resource_index/packages',
[f'resource/{package_name}']),
(f'share/{package_name}', ['package.xml']),
(os.path.join('share', package_name, 'launch'), glob('launch/*.py')),
(os.path.join('share', package_name, 'config'), glob('config/*.yaml')),
],
install_requires=['setuptools'],
zip_safe=True,
@ -21,6 +25,7 @@ setup(
'console_scripts': [
'visual_odom = saltybot_visual_odom.visual_odom_node:main',
'odom_fusion = saltybot_visual_odom.odom_fusion_node:main',
'stereo_orb = saltybot_visual_odom.stereo_orb_node:main',
],
},
)