feat: Add pan/tilt camera head servo control (Issue #384)
Implement SCS/STS serial protocol driver for Waveshare ST3215 servos at 1Mbps daisy-chain configuration. Pan and tilt servos on single UART. Features: - SCSServoBus class: Low-level protocol handler with packet construction - Position write commands with configurable speed (0-1000) - Position and temperature readback from servos - PanTiltNode: ROS2 node with target tracking control loop - Subscribes to /saltybot/target_track for centroid position - Proportional control to keep target centered in D435i FOV - Publishes /pan_tilt/state with angles and temperatures - Publishes /pan_tilt/command for servo position monitoring - 30 Hz control loop, 1 Hz telemetry loop - Configurable servo limits and speeds Servos: 0.24° resolution, 0-4095 position range Camera: 87° × 58° field of view Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
ce1a5e5fee
commit
7cfb98aec1
@ -0,0 +1,22 @@
|
|||||||
|
pan_tilt:
|
||||||
|
# Serial communication
|
||||||
|
port: /dev/ttyUSB0 # UART port for SCS servo bus
|
||||||
|
baudrate: 1000000 # 1Mbps for SCS/STS protocol
|
||||||
|
|
||||||
|
# Camera parameters (Intel RealSense D435i)
|
||||||
|
image_width: 848 # D435i resolution
|
||||||
|
image_height: 480 # D435i resolution
|
||||||
|
fov_h_deg: 87.0 # Horizontal field of view
|
||||||
|
fov_v_deg: 58.0 # Vertical field of view
|
||||||
|
|
||||||
|
# Motor control
|
||||||
|
max_pan_speed: 100 # Pan servo max speed (0-1000, 0=max)
|
||||||
|
max_tilt_speed: 100 # Tilt servo max speed (0-1000, 0=max)
|
||||||
|
|
||||||
|
# Servo positions
|
||||||
|
pan_center: 2048 # Center position for pan servo
|
||||||
|
tilt_center: 2048 # Center position for tilt servo
|
||||||
|
pan_min: 0 # Minimum pan position
|
||||||
|
pan_max: 4095 # Maximum pan position
|
||||||
|
tilt_min: 1024 # Minimum tilt position (avoid upside down)
|
||||||
|
tilt_max: 3072 # Maximum tilt position (avoid looking too far down)
|
||||||
@ -0,0 +1,51 @@
|
|||||||
|
"""Launch file for pan/tilt servo control node."""
|
||||||
|
|
||||||
|
from launch import LaunchDescription
|
||||||
|
from launch_ros.actions import Node
|
||||||
|
from launch.substitutions import LaunchConfiguration
|
||||||
|
from launch.actions import DeclareLaunchArgument
|
||||||
|
|
||||||
|
|
||||||
|
def generate_launch_description():
|
||||||
|
"""Generate launch description."""
|
||||||
|
# Declare arguments
|
||||||
|
port_arg = DeclareLaunchArgument(
|
||||||
|
"port",
|
||||||
|
default_value="/dev/ttyUSB0",
|
||||||
|
description="Serial port for servo bus"
|
||||||
|
)
|
||||||
|
baudrate_arg = DeclareLaunchArgument(
|
||||||
|
"baudrate",
|
||||||
|
default_value="1000000",
|
||||||
|
description="Serial baudrate (1Mbps for SCS protocol)"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create node
|
||||||
|
pan_tilt_node = Node(
|
||||||
|
package="saltybot_pan_tilt",
|
||||||
|
executable="pan_tilt_node",
|
||||||
|
name="pan_tilt",
|
||||||
|
parameters=[
|
||||||
|
{"port": LaunchConfiguration("port")},
|
||||||
|
{"baudrate": LaunchConfiguration("baudrate")},
|
||||||
|
{"image_width": 848},
|
||||||
|
{"image_height": 480},
|
||||||
|
{"fov_h_deg": 87.0},
|
||||||
|
{"fov_v_deg": 58.0},
|
||||||
|
{"max_pan_speed": 100},
|
||||||
|
{"max_tilt_speed": 100},
|
||||||
|
{"pan_center": 2048},
|
||||||
|
{"tilt_center": 2048},
|
||||||
|
{"pan_min": 0},
|
||||||
|
{"pan_max": 4095},
|
||||||
|
{"tilt_min": 1024},
|
||||||
|
{"tilt_max": 3072},
|
||||||
|
],
|
||||||
|
output="screen",
|
||||||
|
)
|
||||||
|
|
||||||
|
return LaunchDescription([
|
||||||
|
port_arg,
|
||||||
|
baudrate_arg,
|
||||||
|
pan_tilt_node,
|
||||||
|
])
|
||||||
28
jetson/ros2_ws/src/saltybot_pan_tilt/package.xml
Normal file
28
jetson/ros2_ws/src/saltybot_pan_tilt/package.xml
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
<?xml version="1.0"?>
|
||||||
|
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||||
|
<package format="3">
|
||||||
|
<name>saltybot_pan_tilt</name>
|
||||||
|
<version>0.1.0</version>
|
||||||
|
<description>
|
||||||
|
Pan/tilt camera head servo control for SaltyBot using Waveshare ST3215 servos.
|
||||||
|
SCS/STS serial bus protocol with daisy-chain servo communication.
|
||||||
|
</description>
|
||||||
|
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
|
||||||
|
<license>MIT</license>
|
||||||
|
|
||||||
|
<depend>rclpy</depend>
|
||||||
|
<depend>geometry_msgs</depend>
|
||||||
|
<depend>std_msgs</depend>
|
||||||
|
<depend>sensor_msgs</depend>
|
||||||
|
|
||||||
|
<buildtool_depend>ament_python</buildtool_depend>
|
||||||
|
|
||||||
|
<test_depend>ament_copyright</test_depend>
|
||||||
|
<test_depend>ament_flake8</test_depend>
|
||||||
|
<test_depend>ament_pep257</test_depend>
|
||||||
|
<test_depend>python3-pytest</test_depend>
|
||||||
|
|
||||||
|
<export>
|
||||||
|
<build_type>ament_python</build_type>
|
||||||
|
</export>
|
||||||
|
</package>
|
||||||
@ -0,0 +1 @@
|
|||||||
|
"""SaltyBot pan/tilt servo control package."""
|
||||||
@ -0,0 +1,354 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Pan/tilt camera head servo control node.
|
||||||
|
|
||||||
|
Controls Waveshare ST3215 servos via SCS/STS serial bus protocol.
|
||||||
|
Daisy-chain 2 servos (pan + tilt) on single UART at 1Mbps.
|
||||||
|
Tracks target centroid from detection to keep object centered in D435i FOV.
|
||||||
|
|
||||||
|
Subscribed topics:
|
||||||
|
/saltybot/target_track (Point) - Target centroid in image coords (x, y, confidence)
|
||||||
|
|
||||||
|
Published topics:
|
||||||
|
/pan_tilt/state (PanTiltState) - Servo angles, load, temperature
|
||||||
|
/pan_tilt/command (PanTiltCommand) - Commanded angles (for monitoring)
|
||||||
|
"""
|
||||||
|
|
||||||
|
import math
|
||||||
|
import struct
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from geometry_msgs.msg import Point
|
||||||
|
from std_msgs.msg import Float32MultiArray, String
|
||||||
|
import serial
|
||||||
|
|
||||||
|
|
||||||
|
class SCSServoBus:
|
||||||
|
"""SCS/STS serial bus protocol handler (1Mbps daisy-chain)."""
|
||||||
|
|
||||||
|
# Servo IDs for daisy-chain
|
||||||
|
PAN_SERVO_ID = 1
|
||||||
|
TILT_SERVO_ID = 2
|
||||||
|
|
||||||
|
# SCS packet structure
|
||||||
|
PACKET_HEADER = 0xFFFD # 2-byte header
|
||||||
|
CMD_WRITE = 0x03
|
||||||
|
CMD_READ = 0x02
|
||||||
|
CMD_PING = 0x01
|
||||||
|
|
||||||
|
# Common register addresses
|
||||||
|
REG_ANGLE_LIMIT_MIN = 6
|
||||||
|
REG_ANGLE_LIMIT_MAX = 8
|
||||||
|
REG_POSITION_CURRENT = 56 # 2 bytes, 0.24° resolution
|
||||||
|
REG_LOAD_CURRENT = 62 # 1 byte, -100 to +100
|
||||||
|
REG_TEMPERATURE = 63 # 1 byte, °C
|
||||||
|
|
||||||
|
def __init__(self, port: str, baudrate: int = 1000000):
|
||||||
|
"""Initialize SCS servo bus.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
port: Serial port (e.g., /dev/ttyUSB0)
|
||||||
|
baudrate: 1000000 bps for SCS protocol
|
||||||
|
"""
|
||||||
|
self.port = port
|
||||||
|
self.baudrate = baudrate
|
||||||
|
self.serial = None
|
||||||
|
self.lock = threading.Lock()
|
||||||
|
|
||||||
|
def connect(self) -> bool:
|
||||||
|
"""Connect to servo bus."""
|
||||||
|
try:
|
||||||
|
self.serial = serial.Serial(
|
||||||
|
port=self.port,
|
||||||
|
baudrate=self.baudrate,
|
||||||
|
timeout=0.1,
|
||||||
|
bytesize=8,
|
||||||
|
parity=serial.PARITY_NONE,
|
||||||
|
stopbits=1,
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Failed to connect to {self.port}: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def disconnect(self):
|
||||||
|
"""Disconnect from servo bus."""
|
||||||
|
if self.serial:
|
||||||
|
self.serial.close()
|
||||||
|
self.serial = None
|
||||||
|
|
||||||
|
def _checksum(self, data: bytes) -> int:
|
||||||
|
"""Calculate XOR checksum."""
|
||||||
|
checksum = 0
|
||||||
|
for byte in data:
|
||||||
|
checksum ^= byte
|
||||||
|
return checksum
|
||||||
|
|
||||||
|
def set_position(self, servo_id: int, position: int, speed: int = 100) -> bool:
|
||||||
|
"""Set servo position (0-4095, 0.24° resolution).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
servo_id: Servo ID (1=pan, 2=tilt)
|
||||||
|
position: Target position (0-4095)
|
||||||
|
speed: Movement speed (0-1000, 0=max)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if successful
|
||||||
|
"""
|
||||||
|
if not self.serial:
|
||||||
|
return False
|
||||||
|
|
||||||
|
position = max(0, min(4095, position))
|
||||||
|
speed = max(0, min(1000, speed))
|
||||||
|
|
||||||
|
# Build write packet: servo_id, cmd_len, cmd_write, addr, speed_lo, speed_hi, pos_lo, pos_hi
|
||||||
|
addr = 46 # Position write address
|
||||||
|
packet = bytearray([
|
||||||
|
servo_id,
|
||||||
|
5, # length
|
||||||
|
self.CMD_WRITE,
|
||||||
|
addr,
|
||||||
|
speed & 0xFF,
|
||||||
|
(speed >> 8) & 0xFF,
|
||||||
|
position & 0xFF,
|
||||||
|
(position >> 8) & 0xFF,
|
||||||
|
])
|
||||||
|
packet.append(self._checksum(packet))
|
||||||
|
|
||||||
|
try:
|
||||||
|
with self.lock:
|
||||||
|
self.serial.write(bytes([0xFF, 0xFD])) # Header
|
||||||
|
self.serial.write(packet)
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error setting position: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def read_position(self, servo_id: int) -> Optional[int]:
|
||||||
|
"""Read current servo position.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Position (0-4095) or None if error
|
||||||
|
"""
|
||||||
|
if not self.serial:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Build read packet
|
||||||
|
addr = 56
|
||||||
|
packet = bytearray([
|
||||||
|
servo_id,
|
||||||
|
3, # length
|
||||||
|
self.CMD_READ,
|
||||||
|
addr,
|
||||||
|
2, # read 2 bytes
|
||||||
|
])
|
||||||
|
packet.append(self._checksum(packet))
|
||||||
|
|
||||||
|
try:
|
||||||
|
with self.lock:
|
||||||
|
self.serial.write(bytes([0xFF, 0xFD]))
|
||||||
|
self.serial.write(packet)
|
||||||
|
time.sleep(0.01) # Wait for response
|
||||||
|
|
||||||
|
# Read response
|
||||||
|
response = self.serial.read(10)
|
||||||
|
if len(response) >= 8:
|
||||||
|
# Response format: FF FD servo_id len error_code data[2] checksum
|
||||||
|
pos_lo = response[6]
|
||||||
|
pos_hi = response[7]
|
||||||
|
position = pos_lo | (pos_hi << 8)
|
||||||
|
return position
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error reading position: {e}")
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def read_temp(self, servo_id: int) -> Optional[int]:
|
||||||
|
"""Read servo temperature.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Temperature (°C) or None if error
|
||||||
|
"""
|
||||||
|
if not self.serial:
|
||||||
|
return None
|
||||||
|
|
||||||
|
addr = 63
|
||||||
|
packet = bytearray([
|
||||||
|
servo_id,
|
||||||
|
3,
|
||||||
|
self.CMD_READ,
|
||||||
|
addr,
|
||||||
|
1,
|
||||||
|
])
|
||||||
|
packet.append(self._checksum(packet))
|
||||||
|
|
||||||
|
try:
|
||||||
|
with self.lock:
|
||||||
|
self.serial.write(bytes([0xFF, 0xFD]))
|
||||||
|
self.serial.write(packet)
|
||||||
|
time.sleep(0.01)
|
||||||
|
|
||||||
|
response = self.serial.read(10)
|
||||||
|
if len(response) >= 7:
|
||||||
|
temp = response[6]
|
||||||
|
return temp
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error reading temperature: {e}")
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
class PanTiltNode(Node):
|
||||||
|
"""Pan/tilt servo control with target tracking."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__("pan_tilt")
|
||||||
|
|
||||||
|
# Parameters
|
||||||
|
self.declare_parameter("port", "/dev/ttyUSB0")
|
||||||
|
self.declare_parameter("baudrate", 1000000)
|
||||||
|
self.declare_parameter("image_width", 848) # D435i resolution
|
||||||
|
self.declare_parameter("image_height", 480)
|
||||||
|
self.declare_parameter("fov_h_deg", 87.0) # Horizontal FOV
|
||||||
|
self.declare_parameter("fov_v_deg", 58.0) # Vertical FOV
|
||||||
|
self.declare_parameter("max_pan_speed", 100) # 0-1000
|
||||||
|
self.declare_parameter("max_tilt_speed", 100)
|
||||||
|
self.declare_parameter("pan_center", 2048) # Center position
|
||||||
|
self.declare_parameter("tilt_center", 2048)
|
||||||
|
self.declare_parameter("pan_min", 0) # Min/max angles
|
||||||
|
self.declare_parameter("pan_max", 4095)
|
||||||
|
self.declare_parameter("tilt_min", 1024)
|
||||||
|
self.declare_parameter("tilt_max", 3072)
|
||||||
|
|
||||||
|
self.port = self.get_parameter("port").value
|
||||||
|
self.baudrate = self.get_parameter("baudrate").value
|
||||||
|
self.image_width = self.get_parameter("image_width").value
|
||||||
|
self.image_height = self.get_parameter("image_height").value
|
||||||
|
self.fov_h = self.get_parameter("fov_h_deg").value
|
||||||
|
self.fov_v = self.get_parameter("fov_v_deg").value
|
||||||
|
self.max_pan_speed = self.get_parameter("max_pan_speed").value
|
||||||
|
self.max_tilt_speed = self.get_parameter("max_tilt_speed").value
|
||||||
|
self.pan_center = self.get_parameter("pan_center").value
|
||||||
|
self.tilt_center = self.get_parameter("tilt_center").value
|
||||||
|
self.pan_min = self.get_parameter("pan_min").value
|
||||||
|
self.pan_max = self.get_parameter("pan_max").value
|
||||||
|
self.tilt_min = self.get_parameter("tilt_min").value
|
||||||
|
self.tilt_max = self.get_parameter("tilt_max").value
|
||||||
|
|
||||||
|
# Initialize servo bus
|
||||||
|
self.servo_bus = SCSServoBus(self.port, self.baudrate)
|
||||||
|
if not self.servo_bus.connect():
|
||||||
|
self.get_logger().error(f"Failed to connect to servo bus on {self.port}")
|
||||||
|
|
||||||
|
# State
|
||||||
|
self.target_x = self.image_width / 2
|
||||||
|
self.target_y = self.image_height / 2
|
||||||
|
self.pan_pos = self.pan_center
|
||||||
|
self.tilt_pos = self.tilt_center
|
||||||
|
self.pan_load = 0
|
||||||
|
self.tilt_load = 0
|
||||||
|
self.pan_temp = 0
|
||||||
|
self.tilt_temp = 0
|
||||||
|
|
||||||
|
# Subscriptions
|
||||||
|
self.create_subscription(Point, "/saltybot/target_track", self._on_target, 10)
|
||||||
|
|
||||||
|
# Publishers
|
||||||
|
self.pub_state = self.create_publisher(String, "/pan_tilt/state", 10)
|
||||||
|
self.pub_command = self.create_publisher(Float32MultiArray, "/pan_tilt/command", 10)
|
||||||
|
|
||||||
|
# Timer for control loop (30 Hz)
|
||||||
|
self.create_timer(0.033, self._control_loop)
|
||||||
|
|
||||||
|
# Timer for telemetry (1 Hz)
|
||||||
|
self.create_timer(1.0, self._publish_state)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"Pan/tilt initialized: port={self.port}, "
|
||||||
|
f"FOV={self.fov_h:.0f}° × {self.fov_v:.0f}°"
|
||||||
|
)
|
||||||
|
|
||||||
|
def _on_target(self, msg: Point) -> None:
|
||||||
|
"""Handle target centroid update."""
|
||||||
|
# Point.x = pixel X, Point.y = pixel Y, Point.z = confidence
|
||||||
|
self.target_x = msg.x
|
||||||
|
self.target_y = msg.y
|
||||||
|
|
||||||
|
def _control_loop(self) -> None:
|
||||||
|
"""Update servo positions to track target."""
|
||||||
|
# Calculate error from image center
|
||||||
|
center_x = self.image_width / 2
|
||||||
|
center_y = self.image_height / 2
|
||||||
|
|
||||||
|
error_x = self.target_x - center_x # pixels
|
||||||
|
error_y = self.target_y - center_y
|
||||||
|
|
||||||
|
# Convert pixel error to angle error
|
||||||
|
pixels_per_deg_h = self.image_width / self.fov_h
|
||||||
|
pixels_per_deg_v = self.image_height / self.fov_v
|
||||||
|
|
||||||
|
angle_error_h = error_x / pixels_per_deg_h
|
||||||
|
angle_error_v = error_y / pixels_per_deg_v
|
||||||
|
|
||||||
|
# Convert angle to servo position (0.24° per count)
|
||||||
|
pos_error_pan = (angle_error_h / 0.24)
|
||||||
|
pos_error_tilt = -(angle_error_v / 0.24) # Invert for tilt
|
||||||
|
|
||||||
|
# Simple proportional control
|
||||||
|
self.pan_pos = max(self.pan_min, min(self.pan_max, self.pan_center - pos_error_pan))
|
||||||
|
self.tilt_pos = max(self.tilt_min, min(self.tilt_max, self.tilt_center + pos_error_tilt))
|
||||||
|
|
||||||
|
# Send commands to servos
|
||||||
|
self.servo_bus.set_position(SCSServoBus.PAN_SERVO_ID, int(self.pan_pos), self.max_pan_speed)
|
||||||
|
self.servo_bus.set_position(SCSServoBus.TILT_SERVO_ID, int(self.tilt_pos), self.max_tilt_speed)
|
||||||
|
|
||||||
|
# Publish command for monitoring
|
||||||
|
cmd_msg = Float32MultiArray(data=[self.pan_pos, self.tilt_pos])
|
||||||
|
self.pub_command.publish(cmd_msg)
|
||||||
|
|
||||||
|
def _publish_state(self) -> None:
|
||||||
|
"""Publish servo state telemetry."""
|
||||||
|
# Read actual positions and temps
|
||||||
|
pan_actual = self.servo_bus.read_position(SCSServoBus.PAN_SERVO_ID)
|
||||||
|
tilt_actual = self.servo_bus.read_position(SCSServoBus.TILT_SERVO_ID)
|
||||||
|
pan_temp = self.servo_bus.read_temp(SCSServoBus.PAN_SERVO_ID)
|
||||||
|
tilt_temp = self.servo_bus.read_temp(SCSServoBus.TILT_SERVO_ID)
|
||||||
|
|
||||||
|
if pan_actual is not None:
|
||||||
|
self.pan_pos = pan_actual
|
||||||
|
if tilt_actual is not None:
|
||||||
|
self.tilt_pos = tilt_actual
|
||||||
|
if pan_temp is not None:
|
||||||
|
self.pan_temp = pan_temp
|
||||||
|
if tilt_temp is not None:
|
||||||
|
self.tilt_temp = tilt_temp
|
||||||
|
|
||||||
|
# Convert positions to degrees
|
||||||
|
pan_deg = (self.pan_pos - 2048) * 0.24
|
||||||
|
tilt_deg = (self.tilt_pos - 2048) * 0.24
|
||||||
|
|
||||||
|
state_msg = String(
|
||||||
|
data=f'{{"pan_deg": {pan_deg:.1f}, "tilt_deg": {tilt_deg:.1f}, '
|
||||||
|
f'"pan_temp_c": {self.pan_temp}, "tilt_temp_c": {self.tilt_temp}}}'
|
||||||
|
)
|
||||||
|
self.pub_state.publish(state_msg)
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = PanTiltNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.servo_bus.disconnect()
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
5
jetson/ros2_ws/src/saltybot_pan_tilt/setup.cfg
Normal file
5
jetson/ros2_ws/src/saltybot_pan_tilt/setup.cfg
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
[develop]
|
||||||
|
script_dir=$base/lib/saltybot_pan_tilt
|
||||||
|
|
||||||
|
[install]
|
||||||
|
install_scripts=$base/lib/saltybot_pan_tilt
|
||||||
27
jetson/ros2_ws/src/saltybot_pan_tilt/setup.py
Normal file
27
jetson/ros2_ws/src/saltybot_pan_tilt/setup.py
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
from setuptools import setup
|
||||||
|
|
||||||
|
package_name = "saltybot_pan_tilt"
|
||||||
|
|
||||||
|
setup(
|
||||||
|
name=package_name,
|
||||||
|
version="0.1.0",
|
||||||
|
packages=[package_name],
|
||||||
|
data_files=[
|
||||||
|
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
|
||||||
|
(f"share/{package_name}", ["package.xml"]),
|
||||||
|
(f"share/{package_name}/launch", ["launch/pan_tilt.launch.py"]),
|
||||||
|
(f"share/{package_name}/config", ["config/pan_tilt_params.yaml"]),
|
||||||
|
],
|
||||||
|
install_requires=["setuptools"],
|
||||||
|
zip_safe=True,
|
||||||
|
maintainer="sl-controls",
|
||||||
|
maintainer_email="sl-controls@saltylab.local",
|
||||||
|
description="Pan/tilt servo control for camera head with target tracking",
|
||||||
|
license="MIT",
|
||||||
|
tests_require=["pytest"],
|
||||||
|
entry_points={
|
||||||
|
"console_scripts": [
|
||||||
|
"pan_tilt_node = saltybot_pan_tilt.pan_tilt_node:main",
|
||||||
|
],
|
||||||
|
},
|
||||||
|
)
|
||||||
1
jetson/ros2_ws/src/saltybot_pan_tilt/test/__init__.py
Normal file
1
jetson/ros2_ws/src/saltybot_pan_tilt/test/__init__.py
Normal file
@ -0,0 +1 @@
|
|||||||
|
"""Tests for saltybot_pan_tilt package."""
|
||||||
115
jetson/ros2_ws/src/saltybot_pan_tilt/test/test_pan_tilt.py
Normal file
115
jetson/ros2_ws/src/saltybot_pan_tilt/test/test_pan_tilt.py
Normal file
@ -0,0 +1,115 @@
|
|||||||
|
"""Unit tests for pan/tilt servo control."""
|
||||||
|
|
||||||
|
import unittest
|
||||||
|
import math
|
||||||
|
from unittest.mock import Mock, MagicMock, patch
|
||||||
|
from saltybot_pan_tilt.pan_tilt_node import SCSServoBus
|
||||||
|
|
||||||
|
|
||||||
|
class TestSCSServoBus(unittest.TestCase):
|
||||||
|
"""Test SCS/STS serial bus protocol handler."""
|
||||||
|
|
||||||
|
def test_checksum(self):
|
||||||
|
"""Test XOR checksum calculation."""
|
||||||
|
bus = SCSServoBus("/dev/null")
|
||||||
|
|
||||||
|
# Test with known data
|
||||||
|
data = bytes([1, 5, 0x03, 46, 100, 0, 0, 0x10])
|
||||||
|
expected = 1 ^ 5 ^ 0x03 ^ 46 ^ 100 ^ 0 ^ 0 ^ 0x10
|
||||||
|
self.assertEqual(bus._checksum(data), expected)
|
||||||
|
|
||||||
|
def test_position_clamping(self):
|
||||||
|
"""Test position clamping to valid range."""
|
||||||
|
bus = SCSServoBus("/dev/null")
|
||||||
|
|
||||||
|
# Mock serial port
|
||||||
|
bus.serial = MagicMock()
|
||||||
|
|
||||||
|
# Test clamping below minimum
|
||||||
|
bus.set_position(1, -100, 100)
|
||||||
|
call_args = bus.serial.write.call_args_list[-1][0][0]
|
||||||
|
position_lo = call_args[6]
|
||||||
|
position_hi = call_args[7]
|
||||||
|
position = position_lo | (position_hi << 8)
|
||||||
|
self.assertEqual(position, 0)
|
||||||
|
|
||||||
|
# Test clamping above maximum
|
||||||
|
bus.serial.reset_mock()
|
||||||
|
bus.set_position(1, 5000, 100)
|
||||||
|
call_args = bus.serial.write.call_args_list[-1][0][0]
|
||||||
|
position_lo = call_args[6]
|
||||||
|
position_hi = call_args[7]
|
||||||
|
position = position_lo | (position_hi << 8)
|
||||||
|
self.assertEqual(position, 4095)
|
||||||
|
|
||||||
|
def test_speed_clamping(self):
|
||||||
|
"""Test speed clamping to valid range."""
|
||||||
|
bus = SCSServoBus("/dev/null")
|
||||||
|
|
||||||
|
# Mock serial port
|
||||||
|
bus.serial = MagicMock()
|
||||||
|
|
||||||
|
# Test clamping above maximum
|
||||||
|
bus.set_position(1, 2048, 2000)
|
||||||
|
call_args = bus.serial.write.call_args_list[-1][0][0]
|
||||||
|
speed_lo = call_args[4]
|
||||||
|
speed_hi = call_args[5]
|
||||||
|
speed = speed_lo | (speed_hi << 8)
|
||||||
|
self.assertEqual(speed, 1000)
|
||||||
|
|
||||||
|
|
||||||
|
class TestPanTiltControl(unittest.TestCase):
|
||||||
|
"""Test pan/tilt control algorithms."""
|
||||||
|
|
||||||
|
def test_pixel_to_angle_conversion(self):
|
||||||
|
"""Test pixel error to angle error conversion."""
|
||||||
|
image_width = 848
|
||||||
|
image_height = 480
|
||||||
|
fov_h = 87.0
|
||||||
|
fov_v = 58.0
|
||||||
|
|
||||||
|
# Calculate pixels per degree
|
||||||
|
pixels_per_deg_h = image_width / fov_h
|
||||||
|
pixels_per_deg_v = image_height / fov_v
|
||||||
|
|
||||||
|
# Test 10 pixels horizontal error
|
||||||
|
error_x = 10
|
||||||
|
angle_error_h = error_x / pixels_per_deg_h
|
||||||
|
self.assertAlmostEqual(angle_error_h, 10 * 87.0 / image_width, places=2)
|
||||||
|
|
||||||
|
# Test 5 pixels vertical error
|
||||||
|
error_y = 5
|
||||||
|
angle_error_v = error_y / pixels_per_deg_v
|
||||||
|
self.assertAlmostEqual(angle_error_v, 5 * 58.0 / image_height, places=2)
|
||||||
|
|
||||||
|
def test_position_error_calculation(self):
|
||||||
|
"""Test servo position error from angle error."""
|
||||||
|
# 0.24 degrees per position unit
|
||||||
|
angle_error = 2.4
|
||||||
|
pos_error = angle_error / 0.24
|
||||||
|
self.assertAlmostEqual(pos_error, 10.0, places=1)
|
||||||
|
|
||||||
|
def test_servo_position_bounds(self):
|
||||||
|
"""Test servo position stays within bounds."""
|
||||||
|
pan_min = 0
|
||||||
|
pan_max = 4095
|
||||||
|
pan_center = 2048
|
||||||
|
|
||||||
|
# Test position below minimum
|
||||||
|
pos_error = 3000
|
||||||
|
pan_pos = max(pan_min, min(pan_max, pan_center - pos_error))
|
||||||
|
self.assertEqual(pan_pos, pan_min)
|
||||||
|
|
||||||
|
# Test position above maximum
|
||||||
|
pos_error = -3000
|
||||||
|
pan_pos = max(pan_min, min(pan_max, pan_center - pos_error))
|
||||||
|
self.assertEqual(pan_pos, pan_max)
|
||||||
|
|
||||||
|
# Test position within bounds
|
||||||
|
pos_error = 100
|
||||||
|
pan_pos = max(pan_min, min(pan_max, pan_center - pos_error))
|
||||||
|
self.assertEqual(pan_pos, pan_center - pos_error)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
Loading…
x
Reference in New Issue
Block a user