Merge pull request 'feat: pan/tilt camera head (Issue #384)' (#390) from sl-controls/issue-384-pan-tilt into main

This commit is contained in:
sl-jetson 2026-03-04 12:39:07 -05:00
commit 19017eca37
10 changed files with 604 additions and 0 deletions

View File

@ -0,0 +1,22 @@
pan_tilt:
# Serial communication
port: /dev/ttyUSB0 # UART port for SCS servo bus
baudrate: 1000000 # 1Mbps for SCS/STS protocol
# Camera parameters (Intel RealSense D435i)
image_width: 848 # D435i resolution
image_height: 480 # D435i resolution
fov_h_deg: 87.0 # Horizontal field of view
fov_v_deg: 58.0 # Vertical field of view
# Motor control
max_pan_speed: 100 # Pan servo max speed (0-1000, 0=max)
max_tilt_speed: 100 # Tilt servo max speed (0-1000, 0=max)
# Servo positions
pan_center: 2048 # Center position for pan servo
tilt_center: 2048 # Center position for tilt servo
pan_min: 0 # Minimum pan position
pan_max: 4095 # Maximum pan position
tilt_min: 1024 # Minimum tilt position (avoid upside down)
tilt_max: 3072 # Maximum tilt position (avoid looking too far down)

View File

@ -0,0 +1,51 @@
"""Launch file for pan/tilt servo control node."""
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.substitutions import LaunchConfiguration
from launch.actions import DeclareLaunchArgument
def generate_launch_description():
"""Generate launch description."""
# Declare arguments
port_arg = DeclareLaunchArgument(
"port",
default_value="/dev/ttyUSB0",
description="Serial port for servo bus"
)
baudrate_arg = DeclareLaunchArgument(
"baudrate",
default_value="1000000",
description="Serial baudrate (1Mbps for SCS protocol)"
)
# Create node
pan_tilt_node = Node(
package="saltybot_pan_tilt",
executable="pan_tilt_node",
name="pan_tilt",
parameters=[
{"port": LaunchConfiguration("port")},
{"baudrate": LaunchConfiguration("baudrate")},
{"image_width": 848},
{"image_height": 480},
{"fov_h_deg": 87.0},
{"fov_v_deg": 58.0},
{"max_pan_speed": 100},
{"max_tilt_speed": 100},
{"pan_center": 2048},
{"tilt_center": 2048},
{"pan_min": 0},
{"pan_max": 4095},
{"tilt_min": 1024},
{"tilt_max": 3072},
],
output="screen",
)
return LaunchDescription([
port_arg,
baudrate_arg,
pan_tilt_node,
])

View File

@ -0,0 +1,28 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_pan_tilt</name>
<version>0.1.0</version>
<description>
Pan/tilt camera head servo control for SaltyBot using Waveshare ST3215 servos.
SCS/STS serial bus protocol with daisy-chain servo communication.
</description>
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>geometry_msgs</depend>
<depend>std_msgs</depend>
<depend>sensor_msgs</depend>
<buildtool_depend>ament_python</buildtool_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1 @@
"""SaltyBot pan/tilt servo control package."""

View File

@ -0,0 +1,354 @@
#!/usr/bin/env python3
"""Pan/tilt camera head servo control node.
Controls Waveshare ST3215 servos via SCS/STS serial bus protocol.
Daisy-chain 2 servos (pan + tilt) on single UART at 1Mbps.
Tracks target centroid from detection to keep object centered in D435i FOV.
Subscribed topics:
/saltybot/target_track (Point) - Target centroid in image coords (x, y, confidence)
Published topics:
/pan_tilt/state (PanTiltState) - Servo angles, load, temperature
/pan_tilt/command (PanTiltCommand) - Commanded angles (for monitoring)
"""
import math
import struct
import threading
import time
from typing import Optional
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Point
from std_msgs.msg import Float32MultiArray, String
import serial
class SCSServoBus:
"""SCS/STS serial bus protocol handler (1Mbps daisy-chain)."""
# Servo IDs for daisy-chain
PAN_SERVO_ID = 1
TILT_SERVO_ID = 2
# SCS packet structure
PACKET_HEADER = 0xFFFD # 2-byte header
CMD_WRITE = 0x03
CMD_READ = 0x02
CMD_PING = 0x01
# Common register addresses
REG_ANGLE_LIMIT_MIN = 6
REG_ANGLE_LIMIT_MAX = 8
REG_POSITION_CURRENT = 56 # 2 bytes, 0.24° resolution
REG_LOAD_CURRENT = 62 # 1 byte, -100 to +100
REG_TEMPERATURE = 63 # 1 byte, °C
def __init__(self, port: str, baudrate: int = 1000000):
"""Initialize SCS servo bus.
Args:
port: Serial port (e.g., /dev/ttyUSB0)
baudrate: 1000000 bps for SCS protocol
"""
self.port = port
self.baudrate = baudrate
self.serial = None
self.lock = threading.Lock()
def connect(self) -> bool:
"""Connect to servo bus."""
try:
self.serial = serial.Serial(
port=self.port,
baudrate=self.baudrate,
timeout=0.1,
bytesize=8,
parity=serial.PARITY_NONE,
stopbits=1,
)
return True
except Exception as e:
print(f"Failed to connect to {self.port}: {e}")
return False
def disconnect(self):
"""Disconnect from servo bus."""
if self.serial:
self.serial.close()
self.serial = None
def _checksum(self, data: bytes) -> int:
"""Calculate XOR checksum."""
checksum = 0
for byte in data:
checksum ^= byte
return checksum
def set_position(self, servo_id: int, position: int, speed: int = 100) -> bool:
"""Set servo position (0-4095, 0.24° resolution).
Args:
servo_id: Servo ID (1=pan, 2=tilt)
position: Target position (0-4095)
speed: Movement speed (0-1000, 0=max)
Returns:
True if successful
"""
if not self.serial:
return False
position = max(0, min(4095, position))
speed = max(0, min(1000, speed))
# Build write packet: servo_id, cmd_len, cmd_write, addr, speed_lo, speed_hi, pos_lo, pos_hi
addr = 46 # Position write address
packet = bytearray([
servo_id,
5, # length
self.CMD_WRITE,
addr,
speed & 0xFF,
(speed >> 8) & 0xFF,
position & 0xFF,
(position >> 8) & 0xFF,
])
packet.append(self._checksum(packet))
try:
with self.lock:
self.serial.write(bytes([0xFF, 0xFD])) # Header
self.serial.write(packet)
return True
except Exception as e:
print(f"Error setting position: {e}")
return False
def read_position(self, servo_id: int) -> Optional[int]:
"""Read current servo position.
Returns:
Position (0-4095) or None if error
"""
if not self.serial:
return None
# Build read packet
addr = 56
packet = bytearray([
servo_id,
3, # length
self.CMD_READ,
addr,
2, # read 2 bytes
])
packet.append(self._checksum(packet))
try:
with self.lock:
self.serial.write(bytes([0xFF, 0xFD]))
self.serial.write(packet)
time.sleep(0.01) # Wait for response
# Read response
response = self.serial.read(10)
if len(response) >= 8:
# Response format: FF FD servo_id len error_code data[2] checksum
pos_lo = response[6]
pos_hi = response[7]
position = pos_lo | (pos_hi << 8)
return position
except Exception as e:
print(f"Error reading position: {e}")
return None
def read_temp(self, servo_id: int) -> Optional[int]:
"""Read servo temperature.
Returns:
Temperature (°C) or None if error
"""
if not self.serial:
return None
addr = 63
packet = bytearray([
servo_id,
3,
self.CMD_READ,
addr,
1,
])
packet.append(self._checksum(packet))
try:
with self.lock:
self.serial.write(bytes([0xFF, 0xFD]))
self.serial.write(packet)
time.sleep(0.01)
response = self.serial.read(10)
if len(response) >= 7:
temp = response[6]
return temp
except Exception as e:
print(f"Error reading temperature: {e}")
return None
class PanTiltNode(Node):
"""Pan/tilt servo control with target tracking."""
def __init__(self):
super().__init__("pan_tilt")
# Parameters
self.declare_parameter("port", "/dev/ttyUSB0")
self.declare_parameter("baudrate", 1000000)
self.declare_parameter("image_width", 848) # D435i resolution
self.declare_parameter("image_height", 480)
self.declare_parameter("fov_h_deg", 87.0) # Horizontal FOV
self.declare_parameter("fov_v_deg", 58.0) # Vertical FOV
self.declare_parameter("max_pan_speed", 100) # 0-1000
self.declare_parameter("max_tilt_speed", 100)
self.declare_parameter("pan_center", 2048) # Center position
self.declare_parameter("tilt_center", 2048)
self.declare_parameter("pan_min", 0) # Min/max angles
self.declare_parameter("pan_max", 4095)
self.declare_parameter("tilt_min", 1024)
self.declare_parameter("tilt_max", 3072)
self.port = self.get_parameter("port").value
self.baudrate = self.get_parameter("baudrate").value
self.image_width = self.get_parameter("image_width").value
self.image_height = self.get_parameter("image_height").value
self.fov_h = self.get_parameter("fov_h_deg").value
self.fov_v = self.get_parameter("fov_v_deg").value
self.max_pan_speed = self.get_parameter("max_pan_speed").value
self.max_tilt_speed = self.get_parameter("max_tilt_speed").value
self.pan_center = self.get_parameter("pan_center").value
self.tilt_center = self.get_parameter("tilt_center").value
self.pan_min = self.get_parameter("pan_min").value
self.pan_max = self.get_parameter("pan_max").value
self.tilt_min = self.get_parameter("tilt_min").value
self.tilt_max = self.get_parameter("tilt_max").value
# Initialize servo bus
self.servo_bus = SCSServoBus(self.port, self.baudrate)
if not self.servo_bus.connect():
self.get_logger().error(f"Failed to connect to servo bus on {self.port}")
# State
self.target_x = self.image_width / 2
self.target_y = self.image_height / 2
self.pan_pos = self.pan_center
self.tilt_pos = self.tilt_center
self.pan_load = 0
self.tilt_load = 0
self.pan_temp = 0
self.tilt_temp = 0
# Subscriptions
self.create_subscription(Point, "/saltybot/target_track", self._on_target, 10)
# Publishers
self.pub_state = self.create_publisher(String, "/pan_tilt/state", 10)
self.pub_command = self.create_publisher(Float32MultiArray, "/pan_tilt/command", 10)
# Timer for control loop (30 Hz)
self.create_timer(0.033, self._control_loop)
# Timer for telemetry (1 Hz)
self.create_timer(1.0, self._publish_state)
self.get_logger().info(
f"Pan/tilt initialized: port={self.port}, "
f"FOV={self.fov_h:.0f}° × {self.fov_v:.0f}°"
)
def _on_target(self, msg: Point) -> None:
"""Handle target centroid update."""
# Point.x = pixel X, Point.y = pixel Y, Point.z = confidence
self.target_x = msg.x
self.target_y = msg.y
def _control_loop(self) -> None:
"""Update servo positions to track target."""
# Calculate error from image center
center_x = self.image_width / 2
center_y = self.image_height / 2
error_x = self.target_x - center_x # pixels
error_y = self.target_y - center_y
# Convert pixel error to angle error
pixels_per_deg_h = self.image_width / self.fov_h
pixels_per_deg_v = self.image_height / self.fov_v
angle_error_h = error_x / pixels_per_deg_h
angle_error_v = error_y / pixels_per_deg_v
# Convert angle to servo position (0.24° per count)
pos_error_pan = (angle_error_h / 0.24)
pos_error_tilt = -(angle_error_v / 0.24) # Invert for tilt
# Simple proportional control
self.pan_pos = max(self.pan_min, min(self.pan_max, self.pan_center - pos_error_pan))
self.tilt_pos = max(self.tilt_min, min(self.tilt_max, self.tilt_center + pos_error_tilt))
# Send commands to servos
self.servo_bus.set_position(SCSServoBus.PAN_SERVO_ID, int(self.pan_pos), self.max_pan_speed)
self.servo_bus.set_position(SCSServoBus.TILT_SERVO_ID, int(self.tilt_pos), self.max_tilt_speed)
# Publish command for monitoring
cmd_msg = Float32MultiArray(data=[self.pan_pos, self.tilt_pos])
self.pub_command.publish(cmd_msg)
def _publish_state(self) -> None:
"""Publish servo state telemetry."""
# Read actual positions and temps
pan_actual = self.servo_bus.read_position(SCSServoBus.PAN_SERVO_ID)
tilt_actual = self.servo_bus.read_position(SCSServoBus.TILT_SERVO_ID)
pan_temp = self.servo_bus.read_temp(SCSServoBus.PAN_SERVO_ID)
tilt_temp = self.servo_bus.read_temp(SCSServoBus.TILT_SERVO_ID)
if pan_actual is not None:
self.pan_pos = pan_actual
if tilt_actual is not None:
self.tilt_pos = tilt_actual
if pan_temp is not None:
self.pan_temp = pan_temp
if tilt_temp is not None:
self.tilt_temp = tilt_temp
# Convert positions to degrees
pan_deg = (self.pan_pos - 2048) * 0.24
tilt_deg = (self.tilt_pos - 2048) * 0.24
state_msg = String(
data=f'{{"pan_deg": {pan_deg:.1f}, "tilt_deg": {tilt_deg:.1f}, '
f'"pan_temp_c": {self.pan_temp}, "tilt_temp_c": {self.tilt_temp}}}'
)
self.pub_state.publish(state_msg)
def main(args=None):
rclpy.init(args=args)
node = PanTiltNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.servo_bus.disconnect()
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,5 @@
[develop]
script_dir=$base/lib/saltybot_pan_tilt
[install]
install_scripts=$base/lib/saltybot_pan_tilt

View File

@ -0,0 +1,27 @@
from setuptools import setup
package_name = "saltybot_pan_tilt"
setup(
name=package_name,
version="0.1.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(f"share/{package_name}/launch", ["launch/pan_tilt.launch.py"]),
(f"share/{package_name}/config", ["config/pan_tilt_params.yaml"]),
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="sl-controls",
maintainer_email="sl-controls@saltylab.local",
description="Pan/tilt servo control for camera head with target tracking",
license="MIT",
tests_require=["pytest"],
entry_points={
"console_scripts": [
"pan_tilt_node = saltybot_pan_tilt.pan_tilt_node:main",
],
},
)

View File

@ -0,0 +1 @@
"""Tests for saltybot_pan_tilt package."""

View File

@ -0,0 +1,115 @@
"""Unit tests for pan/tilt servo control."""
import unittest
import math
from unittest.mock import Mock, MagicMock, patch
from saltybot_pan_tilt.pan_tilt_node import SCSServoBus
class TestSCSServoBus(unittest.TestCase):
"""Test SCS/STS serial bus protocol handler."""
def test_checksum(self):
"""Test XOR checksum calculation."""
bus = SCSServoBus("/dev/null")
# Test with known data
data = bytes([1, 5, 0x03, 46, 100, 0, 0, 0x10])
expected = 1 ^ 5 ^ 0x03 ^ 46 ^ 100 ^ 0 ^ 0 ^ 0x10
self.assertEqual(bus._checksum(data), expected)
def test_position_clamping(self):
"""Test position clamping to valid range."""
bus = SCSServoBus("/dev/null")
# Mock serial port
bus.serial = MagicMock()
# Test clamping below minimum
bus.set_position(1, -100, 100)
call_args = bus.serial.write.call_args_list[-1][0][0]
position_lo = call_args[6]
position_hi = call_args[7]
position = position_lo | (position_hi << 8)
self.assertEqual(position, 0)
# Test clamping above maximum
bus.serial.reset_mock()
bus.set_position(1, 5000, 100)
call_args = bus.serial.write.call_args_list[-1][0][0]
position_lo = call_args[6]
position_hi = call_args[7]
position = position_lo | (position_hi << 8)
self.assertEqual(position, 4095)
def test_speed_clamping(self):
"""Test speed clamping to valid range."""
bus = SCSServoBus("/dev/null")
# Mock serial port
bus.serial = MagicMock()
# Test clamping above maximum
bus.set_position(1, 2048, 2000)
call_args = bus.serial.write.call_args_list[-1][0][0]
speed_lo = call_args[4]
speed_hi = call_args[5]
speed = speed_lo | (speed_hi << 8)
self.assertEqual(speed, 1000)
class TestPanTiltControl(unittest.TestCase):
"""Test pan/tilt control algorithms."""
def test_pixel_to_angle_conversion(self):
"""Test pixel error to angle error conversion."""
image_width = 848
image_height = 480
fov_h = 87.0
fov_v = 58.0
# Calculate pixels per degree
pixels_per_deg_h = image_width / fov_h
pixels_per_deg_v = image_height / fov_v
# Test 10 pixels horizontal error
error_x = 10
angle_error_h = error_x / pixels_per_deg_h
self.assertAlmostEqual(angle_error_h, 10 * 87.0 / image_width, places=2)
# Test 5 pixels vertical error
error_y = 5
angle_error_v = error_y / pixels_per_deg_v
self.assertAlmostEqual(angle_error_v, 5 * 58.0 / image_height, places=2)
def test_position_error_calculation(self):
"""Test servo position error from angle error."""
# 0.24 degrees per position unit
angle_error = 2.4
pos_error = angle_error / 0.24
self.assertAlmostEqual(pos_error, 10.0, places=1)
def test_servo_position_bounds(self):
"""Test servo position stays within bounds."""
pan_min = 0
pan_max = 4095
pan_center = 2048
# Test position below minimum
pos_error = 3000
pan_pos = max(pan_min, min(pan_max, pan_center - pos_error))
self.assertEqual(pan_pos, pan_min)
# Test position above maximum
pos_error = -3000
pan_pos = max(pan_min, min(pan_max, pan_center - pos_error))
self.assertEqual(pan_pos, pan_max)
# Test position within bounds
pos_error = 100
pan_pos = max(pan_min, min(pan_max, pan_center - pos_error))
self.assertEqual(pan_pos, pan_center - pos_error)
if __name__ == "__main__":
unittest.main()