Compare commits
2 Commits
8ff85601c3
...
41040f8bbd
| Author | SHA1 | Date | |
|---|---|---|---|
| 41040f8bbd | |||
| a4371f2a1d |
@ -53,6 +53,21 @@
|
||||
#define LED_STRIP_NUM_LEDS 8u // 8-LED ring
|
||||
#define LED_STRIP_FREQ_HZ 800000u // 800 kHz PWM for NeoPixel (1.25 µs per bit)
|
||||
|
||||
// --- Servo Pan-Tilt (Issue #206) ---
|
||||
// TIM4_CH1 (PB6) for pan servo, TIM4_CH2 (PB7) for tilt servo
|
||||
#define SERVO_TIM TIM4
|
||||
#define SERVO_PAN_PORT GPIOB
|
||||
#define SERVO_PAN_PIN GPIO_PIN_6 // TIM4_CH1
|
||||
#define SERVO_PAN_CHANNEL TIM_CHANNEL_1
|
||||
#define SERVO_TILT_PORT GPIOB
|
||||
#define SERVO_TILT_PIN GPIO_PIN_7 // TIM4_CH2
|
||||
#define SERVO_TILT_CHANNEL TIM_CHANNEL_2
|
||||
#define SERVO_AF GPIO_AF2_TIM4 // Alternate function
|
||||
#define SERVO_FREQ_HZ 50u // 50 Hz (20ms period, standard servo)
|
||||
#define SERVO_MIN_US 500u // 500µs = 0°
|
||||
#define SERVO_MAX_US 2500u // 2500µs = 180°
|
||||
#define SERVO_CENTER_US 1500u // 1500µs = 90°
|
||||
|
||||
// --- OSD: MAX7456 (SPI2) ---
|
||||
#define OSD_SPI SPI2
|
||||
#define OSD_CS_PORT GPIOB
|
||||
|
||||
110
include/servo.h
Normal file
110
include/servo.h
Normal file
@ -0,0 +1,110 @@
|
||||
#ifndef SERVO_H
|
||||
#define SERVO_H
|
||||
|
||||
#include <stdint.h>
|
||||
#include <stdbool.h>
|
||||
|
||||
/*
|
||||
* servo.h — Pan-tilt servo driver for camera head (Issue #206)
|
||||
*
|
||||
* Hardware: TIM4 PWM at 50 Hz (20 ms period)
|
||||
* - CH1 (PB6): Pan servo (0-180°)
|
||||
* - CH2 (PB7): Tilt servo (0-180°)
|
||||
*
|
||||
* Servo pulse mapping:
|
||||
* - 500 µs → 0° (full left/down)
|
||||
* - 1500 µs → 90° (center)
|
||||
* - 2500 µs → 180° (full right/up)
|
||||
*
|
||||
* Smooth sweeping via servo_sweep() for camera motion.
|
||||
*/
|
||||
|
||||
/* Servo channels */
|
||||
typedef enum {
|
||||
SERVO_PAN = 0, /* CH1 (PB6) */
|
||||
SERVO_TILT = 1, /* CH2 (PB7) */
|
||||
SERVO_COUNT
|
||||
} ServoChannel;
|
||||
|
||||
/* Servo state */
|
||||
typedef struct {
|
||||
uint16_t current_angle_deg[SERVO_COUNT]; /* Current angle in degrees (0-180) */
|
||||
uint16_t target_angle_deg[SERVO_COUNT]; /* Target angle in degrees */
|
||||
uint16_t pulse_us[SERVO_COUNT]; /* Pulse width in microseconds (500-2500) */
|
||||
uint32_t sweep_start_ms;
|
||||
uint32_t sweep_duration_ms;
|
||||
bool is_sweeping;
|
||||
} ServoState;
|
||||
|
||||
/*
|
||||
* servo_init()
|
||||
*
|
||||
* Initialize TIM4 PWM on PB6 (CH1, pan) and PB7 (CH2, tilt) at 50 Hz.
|
||||
* Centers both servos at 90° (1500 µs). Call once at startup.
|
||||
*/
|
||||
void servo_init(void);
|
||||
|
||||
/*
|
||||
* servo_set_angle(channel, degrees)
|
||||
*
|
||||
* Set target angle for a servo (0-180°).
|
||||
* Immediately updates PWM without motion ramping.
|
||||
* Valid channels: SERVO_PAN, SERVO_TILT
|
||||
*
|
||||
* Examples:
|
||||
* servo_set_angle(SERVO_PAN, 0); // Pan left
|
||||
* servo_set_angle(SERVO_PAN, 90); // Pan center
|
||||
* servo_set_angle(SERVO_TILT, 180); // Tilt up
|
||||
*/
|
||||
void servo_set_angle(ServoChannel channel, uint16_t degrees);
|
||||
|
||||
/*
|
||||
* servo_get_angle(channel)
|
||||
*
|
||||
* Return current servo angle in degrees (0-180).
|
||||
*/
|
||||
uint16_t servo_get_angle(ServoChannel channel);
|
||||
|
||||
/*
|
||||
* servo_set_pulse_us(channel, pulse_us)
|
||||
*
|
||||
* Set servo pulse width directly in microseconds (500-2500).
|
||||
* Used for fine-tuning or direct control.
|
||||
*/
|
||||
void servo_set_pulse_us(ServoChannel channel, uint16_t pulse_us);
|
||||
|
||||
/*
|
||||
* servo_sweep(channel, start_deg, end_deg, duration_ms)
|
||||
*
|
||||
* Smooth linear sweep from start to end angle over duration_ms.
|
||||
* Non-blocking: must call servo_tick() every ~10 ms to update PWM.
|
||||
*
|
||||
* Examples:
|
||||
* servo_sweep(SERVO_PAN, 0, 180, 2000); // Pan left-to-right in 2 seconds
|
||||
* servo_sweep(SERVO_TILT, 45, 135, 1000); // Tilt up-down in 1 second
|
||||
*/
|
||||
void servo_sweep(ServoChannel channel, uint16_t start_deg, uint16_t end_deg, uint32_t duration_ms);
|
||||
|
||||
/*
|
||||
* servo_tick(now_ms)
|
||||
*
|
||||
* Update servo sweep animation (if active). Call every ~10 ms from main loop.
|
||||
* No-op if not currently sweeping.
|
||||
*/
|
||||
void servo_tick(uint32_t now_ms);
|
||||
|
||||
/*
|
||||
* servo_is_sweeping()
|
||||
*
|
||||
* Returns true if any servo is currently sweeping.
|
||||
*/
|
||||
bool servo_is_sweeping(void);
|
||||
|
||||
/*
|
||||
* servo_stop_sweep(channel)
|
||||
*
|
||||
* Stop sweep immediately, hold current position.
|
||||
*/
|
||||
void servo_stop_sweep(ServoChannel channel);
|
||||
|
||||
#endif /* SERVO_H */
|
||||
@ -0,0 +1,14 @@
|
||||
# Node watchdog configuration
|
||||
|
||||
node_watchdog:
|
||||
ros__parameters:
|
||||
# Publishing frequency in Hz
|
||||
frequency: 20 # 20 Hz = 50ms cycle
|
||||
|
||||
# General heartbeat timeout (seconds)
|
||||
# Alert if any heartbeat lost for this duration
|
||||
heartbeat_timeout: 1.0
|
||||
|
||||
# Motor driver critical timeout (seconds)
|
||||
# Trigger safety fallback (zero cmd_vel) if motor driver down this long
|
||||
motor_driver_critical_timeout: 2.0
|
||||
@ -0,0 +1,36 @@
|
||||
"""Launch file for node_watchdog_node."""
|
||||
|
||||
from launch import LaunchDescription
|
||||
from launch_ros.actions import Node
|
||||
from launch.substitutions import LaunchConfiguration
|
||||
from launch.actions import DeclareLaunchArgument
|
||||
import os
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
|
||||
|
||||
def generate_launch_description():
|
||||
"""Generate launch description for node watchdog."""
|
||||
# Package directory
|
||||
pkg_dir = get_package_share_directory("saltybot_node_watchdog")
|
||||
|
||||
# Parameters
|
||||
config_file = os.path.join(pkg_dir, "config", "watchdog_config.yaml")
|
||||
|
||||
# Declare launch arguments
|
||||
return LaunchDescription(
|
||||
[
|
||||
DeclareLaunchArgument(
|
||||
"config_file",
|
||||
default_value=config_file,
|
||||
description="Path to configuration YAML file",
|
||||
),
|
||||
# Node watchdog node
|
||||
Node(
|
||||
package="saltybot_node_watchdog",
|
||||
executable="node_watchdog_node",
|
||||
name="node_watchdog",
|
||||
output="screen",
|
||||
parameters=[LaunchConfiguration("config_file")],
|
||||
),
|
||||
]
|
||||
)
|
||||
28
jetson/ros2_ws/src/saltybot_node_watchdog/package.xml
Normal file
28
jetson/ros2_ws/src/saltybot_node_watchdog/package.xml
Normal file
@ -0,0 +1,28 @@
|
||||
<?xml version="1.0"?>
|
||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="3">
|
||||
<name>saltybot_node_watchdog</name>
|
||||
<version>0.1.0</version>
|
||||
<description>
|
||||
Node watchdog monitor for SaltyBot critical systems. Monitors heartbeats from balance,
|
||||
motor driver, emergency, and docking nodes. Publishes alerts on heartbeat loss >1s.
|
||||
Implements safety fallback: zeros cmd_vel if motor driver lost >2s. Runs at 20Hz.
|
||||
</description>
|
||||
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
|
||||
<license>MIT</license>
|
||||
|
||||
<depend>rclpy</depend>
|
||||
<depend>geometry_msgs</depend>
|
||||
<depend>std_msgs</depend>
|
||||
|
||||
<buildtool_depend>ament_python</buildtool_depend>
|
||||
|
||||
<test_depend>ament_copyright</test_depend>
|
||||
<test_depend>ament_flake8</test_depend>
|
||||
<test_depend>ament_pep257</test_depend>
|
||||
<test_depend>python3-pytest</test_depend>
|
||||
|
||||
<export>
|
||||
<build_type>ament_python</build_type>
|
||||
</export>
|
||||
</package>
|
||||
Binary file not shown.
@ -0,0 +1,235 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Node watchdog monitor for SaltyBot critical systems.
|
||||
|
||||
Monitors heartbeats from balance, motor driver, emergency, and docking nodes.
|
||||
Publishes alerts on heartbeat loss >1s. Implements safety fallback: zeros cmd_vel
|
||||
if motor driver lost >2s.
|
||||
|
||||
Published topics:
|
||||
/saltybot/node_watchdog (std_msgs/String) - JSON watchdog status
|
||||
/saltybot/cmd_vel_safe (geometry_msgs/Twist) - cmd_vel with motor driver safety check
|
||||
|
||||
Subscribed topics:
|
||||
/saltybot/balance_heartbeat (std_msgs/UInt32) - Balance node heartbeat
|
||||
/saltybot/motor_driver_heartbeat (std_msgs/UInt32) - Motor driver heartbeat
|
||||
/saltybot/emergency_heartbeat (std_msgs/UInt32) - Emergency system heartbeat
|
||||
/saltybot/docking_heartbeat (std_msgs/UInt32) - Docking node heartbeat
|
||||
/cmd_vel (geometry_msgs/Twist) - Velocity command input
|
||||
"""
|
||||
|
||||
import json
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.timer import Timer
|
||||
from geometry_msgs.msg import Twist
|
||||
from std_msgs.msg import UInt32, String
|
||||
|
||||
|
||||
class NodeWatchdogNode(Node):
|
||||
"""ROS2 watchdog monitor for critical system nodes."""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__("node_watchdog")
|
||||
|
||||
# Parameters
|
||||
self.declare_parameter("frequency", 20) # Hz
|
||||
self.declare_parameter("heartbeat_timeout", 1.0) # seconds, general timeout
|
||||
self.declare_parameter("motor_driver_critical_timeout", 2.0) # seconds
|
||||
|
||||
frequency = self.get_parameter("frequency").value
|
||||
self.heartbeat_timeout = self.get_parameter("heartbeat_timeout").value
|
||||
self.motor_driver_critical_timeout = self.get_parameter(
|
||||
"motor_driver_critical_timeout"
|
||||
).value
|
||||
|
||||
# Heartbeat tracking
|
||||
self.critical_nodes = {
|
||||
"balance": None,
|
||||
"motor_driver": None,
|
||||
"emergency": None,
|
||||
"docking": None,
|
||||
}
|
||||
self.last_heartbeat_time = {
|
||||
"balance": None,
|
||||
"motor_driver": None,
|
||||
"emergency": None,
|
||||
"docking": None,
|
||||
}
|
||||
self.last_cmd_vel = None
|
||||
self.motor_driver_down = False
|
||||
|
||||
# Subscriptions for heartbeats
|
||||
self.create_subscription(
|
||||
UInt32, "/saltybot/balance_heartbeat", self._on_balance_heartbeat, 10
|
||||
)
|
||||
self.create_subscription(
|
||||
UInt32,
|
||||
"/saltybot/motor_driver_heartbeat",
|
||||
self._on_motor_driver_heartbeat,
|
||||
10,
|
||||
)
|
||||
self.create_subscription(
|
||||
UInt32, "/saltybot/emergency_heartbeat", self._on_emergency_heartbeat, 10
|
||||
)
|
||||
self.create_subscription(
|
||||
UInt32, "/saltybot/docking_heartbeat", self._on_docking_heartbeat, 10
|
||||
)
|
||||
|
||||
# cmd_vel subscription and safe republishing
|
||||
self.create_subscription(Twist, "/cmd_vel", self._on_cmd_vel, 10)
|
||||
|
||||
# Publications
|
||||
self.pub_watchdog = self.create_publisher(String, "/saltybot/node_watchdog", 10)
|
||||
self.pub_cmd_vel_safe = self.create_publisher(
|
||||
Twist, "/saltybot/cmd_vel_safe", 10
|
||||
)
|
||||
|
||||
# Timer for periodic monitoring at 20Hz
|
||||
period = 1.0 / frequency
|
||||
self.timer: Timer = self.create_timer(period, self._timer_callback)
|
||||
|
||||
self.get_logger().info(
|
||||
f"Node watchdog initialized at {frequency}Hz. "
|
||||
f"Heartbeat timeout: {self.heartbeat_timeout}s, "
|
||||
f"Motor driver critical: {self.motor_driver_critical_timeout}s"
|
||||
)
|
||||
|
||||
def _on_balance_heartbeat(self, msg: UInt32) -> None:
|
||||
"""Update balance node heartbeat timestamp."""
|
||||
self.last_heartbeat_time["balance"] = self.get_clock().now()
|
||||
|
||||
def _on_motor_driver_heartbeat(self, msg: UInt32) -> None:
|
||||
"""Update motor driver heartbeat timestamp."""
|
||||
self.last_heartbeat_time["motor_driver"] = self.get_clock().now()
|
||||
self.motor_driver_down = False
|
||||
|
||||
def _on_emergency_heartbeat(self, msg: UInt32) -> None:
|
||||
"""Update emergency system heartbeat timestamp."""
|
||||
self.last_heartbeat_time["emergency"] = self.get_clock().now()
|
||||
|
||||
def _on_docking_heartbeat(self, msg: UInt32) -> None:
|
||||
"""Update docking node heartbeat timestamp."""
|
||||
self.last_heartbeat_time["docking"] = self.get_clock().now()
|
||||
|
||||
def _on_cmd_vel(self, msg: Twist) -> None:
|
||||
"""Cache the last received cmd_vel."""
|
||||
self.last_cmd_vel = msg
|
||||
|
||||
def _check_node_health(self) -> dict:
|
||||
"""Check health of all monitored nodes.
|
||||
|
||||
Returns:
|
||||
dict: Health status of each node with timeout and elapsed time.
|
||||
"""
|
||||
now = self.get_clock().now()
|
||||
health = {}
|
||||
|
||||
for node_name in self.critical_nodes:
|
||||
last_time = self.last_heartbeat_time[node_name]
|
||||
|
||||
if last_time is None:
|
||||
# No heartbeat received yet
|
||||
health[node_name] = {
|
||||
"status": "unknown",
|
||||
"elapsed_s": None,
|
||||
"timeout_s": self.heartbeat_timeout,
|
||||
}
|
||||
else:
|
||||
# Calculate elapsed time since last heartbeat
|
||||
elapsed = (now - last_time).nanoseconds / 1e9
|
||||
is_timeout = elapsed > self.heartbeat_timeout
|
||||
|
||||
# Special case: motor driver has longer critical timeout
|
||||
if node_name == "motor_driver":
|
||||
is_critical = elapsed > self.motor_driver_critical_timeout
|
||||
else:
|
||||
is_critical = False
|
||||
|
||||
health[node_name] = {
|
||||
"status": "down" if is_timeout else "up",
|
||||
"elapsed_s": elapsed,
|
||||
"timeout_s": (
|
||||
self.motor_driver_critical_timeout
|
||||
if node_name == "motor_driver"
|
||||
else self.heartbeat_timeout
|
||||
),
|
||||
"critical": is_critical,
|
||||
}
|
||||
|
||||
return health
|
||||
|
||||
def _timer_callback(self) -> None:
|
||||
"""Monitor node health and publish alerts at 20Hz."""
|
||||
health = self._check_node_health()
|
||||
|
||||
# Detect if motor driver is in critical state (down for >2s)
|
||||
motor_driver_health = health.get("motor_driver", {})
|
||||
if motor_driver_health.get("critical", False):
|
||||
self.motor_driver_down = True
|
||||
self.get_logger().warn(
|
||||
f"MOTOR DRIVER DOWN >2s ({motor_driver_health['elapsed_s']:.1f}s). "
|
||||
"Applying safety fallback: zeroing cmd_vel."
|
||||
)
|
||||
|
||||
# Determine any nodes down for >1s
|
||||
nodes_with_timeout = {
|
||||
name: status
|
||||
for name, status in health.items()
|
||||
if status.get("status") == "down"
|
||||
}
|
||||
|
||||
# Publish watchdog status
|
||||
watchdog_status = {
|
||||
"timestamp": self.get_clock().now().nanoseconds / 1e9,
|
||||
"all_healthy": len(nodes_with_timeout) == 0
|
||||
and not self.motor_driver_down,
|
||||
"health": health,
|
||||
"motor_driver_critical": self.motor_driver_down,
|
||||
}
|
||||
watchdog_msg = String(data=json.dumps(watchdog_status))
|
||||
self.pub_watchdog.publish(watchdog_msg)
|
||||
|
||||
# Publish cmd_vel with safety checks
|
||||
if self.last_cmd_vel is not None:
|
||||
cmd_vel_safe = self._apply_safety_checks(self.last_cmd_vel)
|
||||
self.pub_cmd_vel_safe.publish(cmd_vel_safe)
|
||||
|
||||
def _apply_safety_checks(self, cmd_vel: Twist) -> Twist:
|
||||
"""Apply safety checks to cmd_vel based on system state.
|
||||
|
||||
Args:
|
||||
cmd_vel: Original velocity command
|
||||
|
||||
Returns:
|
||||
Twist: Potentially modified velocity command for safe operation.
|
||||
"""
|
||||
safe_cmd = Twist()
|
||||
|
||||
# If motor driver is critically down, zero all velocities
|
||||
if self.motor_driver_down:
|
||||
return safe_cmd
|
||||
|
||||
# Otherwise, pass through unchanged
|
||||
safe_cmd.linear.x = cmd_vel.linear.x
|
||||
safe_cmd.linear.y = cmd_vel.linear.y
|
||||
safe_cmd.linear.z = cmd_vel.linear.z
|
||||
safe_cmd.angular.x = cmd_vel.angular.x
|
||||
safe_cmd.angular.y = cmd_vel.angular.y
|
||||
safe_cmd.angular.z = cmd_vel.angular.z
|
||||
return safe_cmd
|
||||
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = NodeWatchdogNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
4
jetson/ros2_ws/src/saltybot_node_watchdog/setup.cfg
Normal file
4
jetson/ros2_ws/src/saltybot_node_watchdog/setup.cfg
Normal file
@ -0,0 +1,4 @@
|
||||
[develop]
|
||||
script_dir=$base/lib/saltybot_node_watchdog
|
||||
[egg_info]
|
||||
tag_date = 0
|
||||
29
jetson/ros2_ws/src/saltybot_node_watchdog/setup.py
Normal file
29
jetson/ros2_ws/src/saltybot_node_watchdog/setup.py
Normal file
@ -0,0 +1,29 @@
|
||||
from setuptools import setup
|
||||
|
||||
package_name = "saltybot_node_watchdog"
|
||||
|
||||
setup(
|
||||
name=package_name,
|
||||
version="0.1.0",
|
||||
packages=[package_name],
|
||||
data_files=[
|
||||
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
|
||||
(f"share/{package_name}", ["package.xml"]),
|
||||
(f"share/{package_name}/launch", ["launch/node_watchdog.launch.py"]),
|
||||
(f"share/{package_name}/config", ["config/watchdog_config.yaml"]),
|
||||
],
|
||||
install_requires=["setuptools"],
|
||||
zip_safe=True,
|
||||
maintainer="sl-controls",
|
||||
maintainer_email="sl-controls@saltylab.local",
|
||||
description=(
|
||||
"Node watchdog: heartbeat monitoring with safety fallback for critical systems"
|
||||
),
|
||||
license="MIT",
|
||||
tests_require=["pytest"],
|
||||
entry_points={
|
||||
"console_scripts": [
|
||||
"node_watchdog_node = saltybot_node_watchdog.node_watchdog_node:main",
|
||||
],
|
||||
},
|
||||
)
|
||||
@ -0,0 +1,329 @@
|
||||
"""Unit tests for node_watchdog_node."""
|
||||
|
||||
import pytest
|
||||
import json
|
||||
import time
|
||||
from geometry_msgs.msg import Twist
|
||||
from std_msgs.msg import UInt32, String
|
||||
|
||||
import rclpy
|
||||
from rclpy.time import Time
|
||||
|
||||
# Import the node under test
|
||||
from saltybot_node_watchdog.node_watchdog_node import NodeWatchdogNode
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def rclpy_fixture():
|
||||
"""Initialize and cleanup rclpy."""
|
||||
rclpy.init()
|
||||
yield
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def node(rclpy_fixture):
|
||||
"""Create a watchdog node instance."""
|
||||
node = NodeWatchdogNode()
|
||||
yield node
|
||||
node.destroy_node()
|
||||
|
||||
|
||||
class TestNodeWatchdogNode:
|
||||
"""Test suite for NodeWatchdogNode."""
|
||||
|
||||
def test_node_initialization(self, node):
|
||||
"""Test that node initializes with correct defaults."""
|
||||
assert node.heartbeat_timeout == 1.0
|
||||
assert node.motor_driver_critical_timeout == 2.0
|
||||
assert node.last_cmd_vel is None
|
||||
assert node.motor_driver_down is False
|
||||
|
||||
# All heartbeat times should be None initially
|
||||
for node_name in node.critical_nodes:
|
||||
assert node.last_heartbeat_time[node_name] is None
|
||||
|
||||
def test_balance_heartbeat_received(self, node):
|
||||
"""Test balance node heartbeat recording."""
|
||||
msg = UInt32(data=1)
|
||||
node._on_balance_heartbeat(msg)
|
||||
assert node.last_heartbeat_time["balance"] is not None
|
||||
|
||||
def test_motor_driver_heartbeat_received(self, node):
|
||||
"""Test motor driver heartbeat recording."""
|
||||
msg = UInt32(data=1)
|
||||
node._on_motor_driver_heartbeat(msg)
|
||||
assert node.last_heartbeat_time["motor_driver"] is not None
|
||||
# Motor driver heartbeat should clear the down flag
|
||||
node.motor_driver_down = True
|
||||
node._on_motor_driver_heartbeat(msg)
|
||||
assert node.motor_driver_down is False
|
||||
|
||||
def test_emergency_heartbeat_received(self, node):
|
||||
"""Test emergency system heartbeat recording."""
|
||||
msg = UInt32(data=1)
|
||||
node._on_emergency_heartbeat(msg)
|
||||
assert node.last_heartbeat_time["emergency"] is not None
|
||||
|
||||
def test_docking_heartbeat_received(self, node):
|
||||
"""Test docking node heartbeat recording."""
|
||||
msg = UInt32(data=1)
|
||||
node._on_docking_heartbeat(msg)
|
||||
assert node.last_heartbeat_time["docking"] is not None
|
||||
|
||||
def test_cmd_vel_caching(self, node):
|
||||
"""Test that cmd_vel messages are cached."""
|
||||
msg = Twist()
|
||||
msg.linear.x = 1.0
|
||||
node._on_cmd_vel(msg)
|
||||
assert node.last_cmd_vel is not None
|
||||
assert node.last_cmd_vel.linear.x == 1.0
|
||||
|
||||
def test_health_check_all_unknown(self, node):
|
||||
"""Test health check when no heartbeats received."""
|
||||
health = node._check_node_health()
|
||||
|
||||
assert len(health) == 4
|
||||
for node_name in node.critical_nodes:
|
||||
assert health[node_name]["status"] == "unknown"
|
||||
assert health[node_name]["elapsed_s"] is None
|
||||
assert health[node_name]["timeout_s"] == 1.0
|
||||
|
||||
def test_health_check_just_received(self, node):
|
||||
"""Test health check just after heartbeat received."""
|
||||
# Record a heartbeat for balance node
|
||||
node.last_heartbeat_time["balance"] = node.get_clock().now()
|
||||
|
||||
health = node._check_node_health()
|
||||
|
||||
# Balance should be up, others unknown
|
||||
assert health["balance"]["status"] == "up"
|
||||
assert health["balance"]["elapsed_s"] < 0.1
|
||||
assert health["emergency"]["status"] == "unknown"
|
||||
|
||||
def test_health_check_timeout_general(self, node):
|
||||
"""Test that heartbeat timeout is detected (>1s)."""
|
||||
# Simulate a heartbeat that arrived >1s ago
|
||||
now = node.get_clock().now()
|
||||
old_time = Time(
|
||||
nanoseconds=now.nanoseconds - int(1.5 * 1e9)
|
||||
) # 1.5 seconds ago
|
||||
node.last_heartbeat_time["balance"] = old_time
|
||||
|
||||
health = node._check_node_health()
|
||||
|
||||
assert health["balance"]["status"] == "down"
|
||||
assert health["balance"]["elapsed_s"] > 1.4
|
||||
assert health["balance"]["elapsed_s"] < 2.0
|
||||
|
||||
def test_health_check_motor_driver_critical(self, node):
|
||||
"""Test motor driver critical timeout (>2s)."""
|
||||
# Simulate motor driver heartbeat >2s ago
|
||||
now = node.get_clock().now()
|
||||
old_time = Time(nanoseconds=now.nanoseconds - int(2.5 * 1e9)) # 2.5 seconds
|
||||
node.last_heartbeat_time["motor_driver"] = old_time
|
||||
|
||||
health = node._check_node_health()
|
||||
|
||||
motor_health = health["motor_driver"]
|
||||
assert motor_health["status"] == "down"
|
||||
assert motor_health.get("critical", False) is True
|
||||
assert motor_health["elapsed_s"] > 2.4
|
||||
|
||||
def test_safety_check_normal_operation(self, node):
|
||||
"""Test safety check passes through cmd_vel normally."""
|
||||
node.motor_driver_down = False
|
||||
cmd = Twist()
|
||||
cmd.linear.x = 1.5
|
||||
cmd.angular.z = 0.3
|
||||
|
||||
safe_cmd = node._apply_safety_checks(cmd)
|
||||
|
||||
assert abs(safe_cmd.linear.x - 1.5) < 1e-6
|
||||
assert abs(safe_cmd.angular.z - 0.3) < 1e-6
|
||||
|
||||
def test_safety_check_motor_driver_down(self, node):
|
||||
"""Test safety check zeros cmd_vel when motor driver is down."""
|
||||
node.motor_driver_down = True
|
||||
cmd = Twist()
|
||||
cmd.linear.x = 1.5
|
||||
cmd.linear.y = 0.2
|
||||
cmd.angular.z = 0.3
|
||||
|
||||
safe_cmd = node._apply_safety_checks(cmd)
|
||||
|
||||
# All velocities should be zero
|
||||
assert safe_cmd.linear.x == 0.0
|
||||
assert safe_cmd.linear.y == 0.0
|
||||
assert safe_cmd.linear.z == 0.0
|
||||
assert safe_cmd.angular.x == 0.0
|
||||
assert safe_cmd.angular.y == 0.0
|
||||
assert safe_cmd.angular.z == 0.0
|
||||
|
||||
def test_timer_callback_publishes(self, node):
|
||||
"""Test that timer callback publishes watchdog status."""
|
||||
# Record a heartbeat
|
||||
node.last_heartbeat_time["balance"] = node.get_clock().now()
|
||||
node.last_cmd_vel = Twist()
|
||||
node.last_cmd_vel.linear.x = 1.0
|
||||
|
||||
# Call timer callback
|
||||
node._timer_callback()
|
||||
# Just verify it doesn't crash; actual publishing is tested via integration
|
||||
|
||||
def test_watchdog_status_json_all_healthy(self, node):
|
||||
"""Test watchdog status JSON when all nodes healthy."""
|
||||
# Record all heartbeats
|
||||
now = node.get_clock().now()
|
||||
for node_name in node.critical_nodes:
|
||||
node.last_heartbeat_time[node_name] = now
|
||||
|
||||
health = node._check_node_health()
|
||||
watchdog_status = {
|
||||
"timestamp": now.nanoseconds / 1e9,
|
||||
"all_healthy": all(
|
||||
s["status"] == "up" for s in health.values()
|
||||
),
|
||||
"health": health,
|
||||
"motor_driver_critical": False,
|
||||
}
|
||||
|
||||
# Verify it's valid JSON
|
||||
json_str = json.dumps(watchdog_status)
|
||||
parsed = json.loads(json_str)
|
||||
|
||||
assert parsed["all_healthy"] is True
|
||||
assert parsed["motor_driver_critical"] is False
|
||||
|
||||
def test_watchdog_status_json_with_timeout(self, node):
|
||||
"""Test watchdog status JSON when node has timed out."""
|
||||
# Balance heartbeat >1s ago
|
||||
now = node.get_clock().now()
|
||||
old_time = Time(nanoseconds=now.nanoseconds - int(1.5 * 1e9))
|
||||
node.last_heartbeat_time["balance"] = old_time
|
||||
|
||||
# Others are current
|
||||
for name in ["motor_driver", "emergency", "docking"]:
|
||||
node.last_heartbeat_time[name] = now
|
||||
|
||||
health = node._check_node_health()
|
||||
watchdog_status = {
|
||||
"timestamp": now.nanoseconds / 1e9,
|
||||
"all_healthy": all(s["status"] == "up" for s in health.values()),
|
||||
"health": health,
|
||||
"motor_driver_critical": False,
|
||||
}
|
||||
|
||||
json_str = json.dumps(watchdog_status)
|
||||
parsed = json.loads(json_str)
|
||||
|
||||
assert parsed["all_healthy"] is False
|
||||
assert parsed["health"]["balance"]["status"] == "down"
|
||||
|
||||
|
||||
class TestNodeWatchdogScenarios:
|
||||
"""Integration-style tests for realistic scenarios."""
|
||||
|
||||
def test_scenario_all_nodes_healthy(self, node):
|
||||
"""Scenario: all critical nodes sending heartbeats."""
|
||||
now = node.get_clock().now()
|
||||
|
||||
# All nodes sending heartbeats
|
||||
for name in node.critical_nodes:
|
||||
node.last_heartbeat_time[name] = now
|
||||
|
||||
health = node._check_node_health()
|
||||
|
||||
all_up = all(h["status"] == "up" for h in health.values())
|
||||
assert all_up is True
|
||||
|
||||
def test_scenario_motor_driver_loss_below_critical(self, node):
|
||||
"""Scenario: motor driver offline 1.5s (below 2s critical)."""
|
||||
now = node.get_clock().now()
|
||||
old_time = Time(nanoseconds=now.nanoseconds - int(1.5 * 1e9))
|
||||
|
||||
# Motor driver down 1.5s, others healthy
|
||||
node.last_heartbeat_time["motor_driver"] = old_time
|
||||
for name in ["balance", "emergency", "docking"]:
|
||||
node.last_heartbeat_time[name] = now
|
||||
|
||||
health = node._check_node_health()
|
||||
motor = health["motor_driver"]
|
||||
|
||||
assert motor["status"] == "down"
|
||||
assert motor.get("critical", False) is False
|
||||
# Safety fallback should NOT be triggered yet
|
||||
assert node.motor_driver_down is False
|
||||
|
||||
def test_scenario_motor_driver_critical_loss(self, node):
|
||||
"""Scenario: motor driver offline >2s (triggers critical)."""
|
||||
now = node.get_clock().now()
|
||||
old_time = Time(nanoseconds=now.nanoseconds - int(2.5 * 1e9))
|
||||
|
||||
node.last_heartbeat_time["motor_driver"] = old_time
|
||||
node.last_heartbeat_time["balance"] = now
|
||||
node.last_heartbeat_time["emergency"] = now
|
||||
node.last_heartbeat_time["docking"] = now
|
||||
|
||||
health = node._check_node_health()
|
||||
motor = health["motor_driver"]
|
||||
|
||||
assert motor["status"] == "down"
|
||||
assert motor.get("critical", False) is True
|
||||
|
||||
def test_scenario_cascading_node_failures(self, node):
|
||||
"""Scenario: multiple nodes failing in sequence."""
|
||||
now = node.get_clock().now()
|
||||
old1s = Time(nanoseconds=now.nanoseconds - int(1.2 * 1e9))
|
||||
old2s = Time(nanoseconds=now.nanoseconds - int(2.5 * 1e9))
|
||||
|
||||
# Balance down 1.2s, motor driver down 2.5s, others healthy
|
||||
node.last_heartbeat_time["balance"] = old1s
|
||||
node.last_heartbeat_time["motor_driver"] = old2s
|
||||
node.last_heartbeat_time["emergency"] = now
|
||||
node.last_heartbeat_time["docking"] = now
|
||||
|
||||
health = node._check_node_health()
|
||||
|
||||
assert health["balance"]["status"] == "down"
|
||||
assert health["balance"].get("critical", False) is False
|
||||
assert health["motor_driver"]["status"] == "down"
|
||||
assert health["motor_driver"].get("critical", False) is True
|
||||
|
||||
def test_scenario_cmd_vel_safety_fallback(self, node):
|
||||
"""Scenario: motor driver down triggers safety zeroing of cmd_vel."""
|
||||
# Motor driver is critically down
|
||||
node.motor_driver_down = True
|
||||
|
||||
cmd = Twist()
|
||||
cmd.linear.x = 2.0
|
||||
cmd.angular.z = 0.5
|
||||
|
||||
safe_cmd = node._apply_safety_checks(cmd)
|
||||
|
||||
# All should be zeroed
|
||||
assert safe_cmd.linear.x == 0.0
|
||||
assert safe_cmd.linear.y == 0.0
|
||||
assert safe_cmd.linear.z == 0.0
|
||||
assert safe_cmd.angular.x == 0.0
|
||||
assert safe_cmd.angular.y == 0.0
|
||||
assert safe_cmd.angular.z == 0.0
|
||||
|
||||
def test_scenario_motor_driver_recovery(self, node):
|
||||
"""Scenario: motor driver comes back online after being down."""
|
||||
now = node.get_clock().now()
|
||||
|
||||
# Motor driver was down
|
||||
node.motor_driver_down = True
|
||||
|
||||
# Motor driver sends heartbeat
|
||||
node._on_motor_driver_heartbeat(UInt32(data=1))
|
||||
|
||||
# Should clear the down flag
|
||||
assert node.motor_driver_down is False
|
||||
|
||||
# cmd_vel should pass through
|
||||
cmd = Twist()
|
||||
cmd.linear.x = 1.0
|
||||
safe_cmd = node._apply_safety_checks(cmd)
|
||||
assert safe_cmd.linear.x == 1.0
|
||||
@ -21,6 +21,7 @@
|
||||
#include "audio.h"
|
||||
#include "buzzer.h"
|
||||
#include "led.h"
|
||||
#include "servo.h"
|
||||
#include "power_mgmt.h"
|
||||
#include "battery.h"
|
||||
#include <math.h>
|
||||
@ -163,6 +164,9 @@ int main(void) {
|
||||
/* Init power management — STOP-mode sleep/wake, wake EXTIs configured */
|
||||
power_mgmt_init();
|
||||
|
||||
/* Init servo pan-tilt driver for camera head (TIM4 PWM on PB6/PB7, Issue #206) */
|
||||
servo_init();
|
||||
|
||||
/* Init mode manager (RC/autonomous blend; CH6 mode switch) */
|
||||
mode_manager_t mode;
|
||||
mode_manager_init(&mode);
|
||||
@ -218,6 +222,9 @@ int main(void) {
|
||||
/* Advance LED animation sequencer (non-blocking, call every tick) */
|
||||
led_tick(now);
|
||||
|
||||
/* Servo pan-tilt animation tick — updates smooth sweeps */
|
||||
servo_tick(now);
|
||||
|
||||
/* Sleep LED: software PWM on LED1 (active-low PC15) driven by PM brightness.
|
||||
* pm_pwm_phase rolls over each ms; brightness sets duty cycle 0-255. */
|
||||
pm_pwm_phase++;
|
||||
|
||||
242
src/servo.c
Normal file
242
src/servo.c
Normal file
@ -0,0 +1,242 @@
|
||||
#include "servo.h"
|
||||
#include "config.h"
|
||||
#include "stm32f7xx_hal.h"
|
||||
#include <string.h>
|
||||
|
||||
/* ================================================================
|
||||
* Servo PWM Protocol
|
||||
* ================================================================
|
||||
* TIM4 at 50 Hz (20 ms period)
|
||||
* APB1 clock: 54 MHz
|
||||
* Prescaler: 53 (54 MHz / 54 = 1 MHz)
|
||||
* ARR: 19999 (1 MHz / 20000 = 50 Hz)
|
||||
* CCR: 500-2500 (0.5-2.5 ms out of 20 ms)
|
||||
*
|
||||
* Servo pulse mapping:
|
||||
* 500 µs → 0° (full left/down)
|
||||
* 1500 µs → 90° (center)
|
||||
* 2500 µs → 180° (full right/up)
|
||||
*/
|
||||
|
||||
#define SERVO_PWM_FREQ 50u /* 50 Hz */
|
||||
#define SERVO_PERIOD_MS 20u /* 20 ms = 1/50 Hz */
|
||||
#define SERVO_CLOCK_HZ 1000000u /* 1 MHz timer clock */
|
||||
#define SERVO_PRESCALER 53u /* APB1 54 MHz / 54 = 1 MHz */
|
||||
#define SERVO_ARR 19999u /* 1 MHz / 20000 = 50 Hz */
|
||||
|
||||
typedef struct {
|
||||
uint16_t current_angle_deg[SERVO_COUNT];
|
||||
uint16_t target_angle_deg[SERVO_COUNT];
|
||||
uint16_t pulse_us[SERVO_COUNT];
|
||||
|
||||
/* Sweep state */
|
||||
uint32_t sweep_start_ms[SERVO_COUNT];
|
||||
uint32_t sweep_duration_ms[SERVO_COUNT];
|
||||
uint16_t sweep_start_deg[SERVO_COUNT];
|
||||
uint16_t sweep_end_deg[SERVO_COUNT];
|
||||
bool is_sweeping[SERVO_COUNT];
|
||||
} ServoState;
|
||||
|
||||
static ServoState s_servo = {0};
|
||||
static TIM_HandleTypeDef s_tim_handle = {0};
|
||||
|
||||
/* ================================================================
|
||||
* Helper functions
|
||||
* ================================================================
|
||||
*/
|
||||
|
||||
static uint16_t angle_to_pulse_us(uint16_t degrees)
|
||||
{
|
||||
/* Linear interpolation: 0° → 500µs, 180° → 2500µs */
|
||||
if (degrees > 180) degrees = 180;
|
||||
|
||||
uint32_t pulse = SERVO_MIN_US + (uint32_t)degrees * (SERVO_MAX_US - SERVO_MIN_US) / 180;
|
||||
return (uint16_t)pulse;
|
||||
}
|
||||
|
||||
static uint16_t pulse_us_to_angle(uint16_t pulse_us)
|
||||
{
|
||||
/* Inverse mapping: 500µs → 0°, 2500µs → 180° */
|
||||
if (pulse_us < SERVO_MIN_US) pulse_us = SERVO_MIN_US;
|
||||
if (pulse_us > SERVO_MAX_US) pulse_us = SERVO_MAX_US;
|
||||
|
||||
uint32_t angle = (uint32_t)(pulse_us - SERVO_MIN_US) * 180 / (SERVO_MAX_US - SERVO_MIN_US);
|
||||
return (uint16_t)angle;
|
||||
}
|
||||
|
||||
static void update_pwm(ServoChannel channel)
|
||||
{
|
||||
/* Convert pulse width (500-2500 µs) to CCR value */
|
||||
/* At 1 MHz timer clock: 1 µs = 1 count */
|
||||
uint32_t ccr_value = s_servo.pulse_us[channel];
|
||||
|
||||
if (channel == SERVO_PAN) {
|
||||
__HAL_TIM_SET_COMPARE(&s_tim_handle, SERVO_PAN_CHANNEL, ccr_value);
|
||||
} else {
|
||||
__HAL_TIM_SET_COMPARE(&s_tim_handle, SERVO_TILT_CHANNEL, ccr_value);
|
||||
}
|
||||
}
|
||||
|
||||
/* ================================================================
|
||||
* Public API
|
||||
* ================================================================
|
||||
*/
|
||||
|
||||
void servo_init(void)
|
||||
{
|
||||
/* Initialize state */
|
||||
memset(&s_servo, 0, sizeof(s_servo));
|
||||
|
||||
/* Center both servos at 90° */
|
||||
for (int i = 0; i < SERVO_COUNT; i++) {
|
||||
s_servo.current_angle_deg[i] = 90;
|
||||
s_servo.target_angle_deg[i] = 90;
|
||||
s_servo.pulse_us[i] = SERVO_CENTER_US;
|
||||
}
|
||||
|
||||
/* Configure GPIO PB6 (CH1) and PB7 (CH2) as TIM4 PWM */
|
||||
__HAL_RCC_GPIOB_CLK_ENABLE();
|
||||
|
||||
GPIO_InitTypeDef gpio_init = {0};
|
||||
gpio_init.Mode = GPIO_MODE_AF_PP;
|
||||
gpio_init.Pull = GPIO_NOPULL;
|
||||
gpio_init.Speed = GPIO_SPEED_FREQ_HIGH;
|
||||
gpio_init.Alternate = SERVO_AF;
|
||||
|
||||
/* Configure PB6 (pan) */
|
||||
gpio_init.Pin = SERVO_PAN_PIN;
|
||||
HAL_GPIO_Init(SERVO_PAN_PORT, &gpio_init);
|
||||
|
||||
/* Configure PB7 (tilt) */
|
||||
gpio_init.Pin = SERVO_TILT_PIN;
|
||||
HAL_GPIO_Init(SERVO_TILT_PORT, &gpio_init);
|
||||
|
||||
/* Configure TIM4: 50 Hz PWM */
|
||||
__HAL_RCC_TIM4_CLK_ENABLE();
|
||||
|
||||
s_tim_handle.Instance = SERVO_TIM;
|
||||
s_tim_handle.Init.Prescaler = SERVO_PRESCALER;
|
||||
s_tim_handle.Init.CounterMode = TIM_COUNTERMODE_UP;
|
||||
s_tim_handle.Init.Period = SERVO_ARR;
|
||||
s_tim_handle.Init.ClockDivision = TIM_CLOCKDIVISION_DIV1;
|
||||
s_tim_handle.Init.RepetitionCounter = 0;
|
||||
|
||||
HAL_TIM_PWM_Init(&s_tim_handle);
|
||||
|
||||
/* Configure TIM4_CH1 (pan) for PWM */
|
||||
TIM_OC_InitTypeDef oc_init = {0};
|
||||
oc_init.OCMode = TIM_OCMODE_PWM1;
|
||||
oc_init.Pulse = SERVO_CENTER_US;
|
||||
oc_init.OCPolarity = TIM_OCPOLARITY_HIGH;
|
||||
oc_init.OCFastMode = TIM_OCFAST_DISABLE;
|
||||
|
||||
HAL_TIM_PWM_ConfigChannel(&s_tim_handle, &oc_init, SERVO_PAN_CHANNEL);
|
||||
HAL_TIM_PWM_Start(&s_tim_handle, SERVO_PAN_CHANNEL);
|
||||
|
||||
/* Configure TIM4_CH2 (tilt) for PWM */
|
||||
oc_init.Pulse = SERVO_CENTER_US;
|
||||
HAL_TIM_PWM_ConfigChannel(&s_tim_handle, &oc_init, SERVO_TILT_CHANNEL);
|
||||
HAL_TIM_PWM_Start(&s_tim_handle, SERVO_TILT_CHANNEL);
|
||||
}
|
||||
|
||||
void servo_set_angle(ServoChannel channel, uint16_t degrees)
|
||||
{
|
||||
if (channel >= SERVO_COUNT) return;
|
||||
if (degrees > 180) degrees = 180;
|
||||
|
||||
s_servo.current_angle_deg[channel] = degrees;
|
||||
s_servo.target_angle_deg[channel] = degrees;
|
||||
s_servo.pulse_us[channel] = angle_to_pulse_us(degrees);
|
||||
|
||||
/* Stop any sweep in progress */
|
||||
s_servo.is_sweeping[channel] = false;
|
||||
|
||||
/* Update PWM immediately */
|
||||
update_pwm(channel);
|
||||
}
|
||||
|
||||
uint16_t servo_get_angle(ServoChannel channel)
|
||||
{
|
||||
if (channel >= SERVO_COUNT) return 0;
|
||||
return s_servo.current_angle_deg[channel];
|
||||
}
|
||||
|
||||
void servo_set_pulse_us(ServoChannel channel, uint16_t pulse_us)
|
||||
{
|
||||
if (channel >= SERVO_COUNT) return;
|
||||
if (pulse_us < SERVO_MIN_US) pulse_us = SERVO_MIN_US;
|
||||
if (pulse_us > SERVO_MAX_US) pulse_us = SERVO_MAX_US;
|
||||
|
||||
s_servo.pulse_us[channel] = pulse_us;
|
||||
s_servo.current_angle_deg[channel] = pulse_us_to_angle(pulse_us);
|
||||
s_servo.target_angle_deg[channel] = s_servo.current_angle_deg[channel];
|
||||
|
||||
/* Stop any sweep in progress */
|
||||
s_servo.is_sweeping[channel] = false;
|
||||
|
||||
/* Update PWM immediately */
|
||||
update_pwm(channel);
|
||||
}
|
||||
|
||||
void servo_sweep(ServoChannel channel, uint16_t start_deg, uint16_t end_deg, uint32_t duration_ms)
|
||||
{
|
||||
if (channel >= SERVO_COUNT) return;
|
||||
if (duration_ms == 0) return;
|
||||
if (start_deg > 180) start_deg = 180;
|
||||
if (end_deg > 180) end_deg = 180;
|
||||
|
||||
s_servo.sweep_start_deg[channel] = start_deg;
|
||||
s_servo.sweep_end_deg[channel] = end_deg;
|
||||
s_servo.sweep_duration_ms[channel] = duration_ms;
|
||||
s_servo.sweep_start_ms[channel] = 0; /* Will be set on first tick */
|
||||
s_servo.is_sweeping[channel] = true;
|
||||
}
|
||||
|
||||
void servo_tick(uint32_t now_ms)
|
||||
{
|
||||
for (int ch = 0; ch < SERVO_COUNT; ch++) {
|
||||
if (!s_servo.is_sweeping[ch]) continue;
|
||||
|
||||
/* Initialize start time on first call */
|
||||
if (s_servo.sweep_start_ms[ch] == 0) {
|
||||
s_servo.sweep_start_ms[ch] = now_ms;
|
||||
}
|
||||
|
||||
uint32_t elapsed = now_ms - s_servo.sweep_start_ms[ch];
|
||||
uint32_t duration = s_servo.sweep_duration_ms[ch];
|
||||
|
||||
if (elapsed >= duration) {
|
||||
/* Sweep complete */
|
||||
s_servo.is_sweeping[ch] = false;
|
||||
s_servo.current_angle_deg[ch] = s_servo.sweep_end_deg[ch];
|
||||
s_servo.pulse_us[ch] = angle_to_pulse_us(s_servo.sweep_end_deg[ch]);
|
||||
} else {
|
||||
/* Linear interpolation */
|
||||
int16_t start = (int16_t)s_servo.sweep_start_deg[ch];
|
||||
int16_t end = (int16_t)s_servo.sweep_end_deg[ch];
|
||||
int32_t delta = end - start;
|
||||
|
||||
/* angle = start + (delta * elapsed / duration) */
|
||||
int32_t angle_i32 = start + (delta * (int32_t)elapsed / (int32_t)duration);
|
||||
s_servo.current_angle_deg[ch] = (uint16_t)angle_i32;
|
||||
s_servo.pulse_us[ch] = angle_to_pulse_us(s_servo.current_angle_deg[ch]);
|
||||
}
|
||||
|
||||
/* Update PWM */
|
||||
update_pwm((ServoChannel)ch);
|
||||
}
|
||||
}
|
||||
|
||||
bool servo_is_sweeping(void)
|
||||
{
|
||||
for (int i = 0; i < SERVO_COUNT; i++) {
|
||||
if (s_servo.is_sweeping[i]) return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void servo_stop_sweep(ServoChannel channel)
|
||||
{
|
||||
if (channel >= SERVO_COUNT) return;
|
||||
s_servo.is_sweeping[channel] = false;
|
||||
}
|
||||
345
test/test_servo.py
Normal file
345
test/test_servo.py
Normal file
@ -0,0 +1,345 @@
|
||||
"""
|
||||
test_servo.py — Pan-tilt servo driver tests (Issue #206)
|
||||
|
||||
Verifies:
|
||||
- PWM frequency: 50 Hz (20 ms period)
|
||||
- Pulse width: 500-2500 µs for 0-180°
|
||||
- Angle conversion: linear mapping
|
||||
- Smooth sweeping: animation timing and interpolation
|
||||
- Multi-servo coordination (pan + tilt independently)
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
# ── Constants ─────────────────────────────────────────────────────────────
|
||||
|
||||
SERVO_MIN_US = 500
|
||||
SERVO_MAX_US = 2500
|
||||
SERVO_CENTER_US = 1500
|
||||
|
||||
PWM_FREQ_HZ = 50
|
||||
PERIOD_MS = 20
|
||||
|
||||
NUM_SERVOS = 2
|
||||
SERVO_PAN = 0
|
||||
SERVO_TILT = 1
|
||||
|
||||
|
||||
# ── Servo Simulator ────────────────────────────────────────────────────────
|
||||
|
||||
class ServoSimulator:
|
||||
def __init__(self):
|
||||
self.current_angle_deg = [90, 90] # Both centered
|
||||
self.pulse_us = [SERVO_CENTER_US, SERVO_CENTER_US]
|
||||
self.is_sweeping = [False, False]
|
||||
self.sweep_start_deg = [0, 0]
|
||||
self.sweep_end_deg = [0, 0]
|
||||
self.sweep_duration_ms = [0, 0]
|
||||
self.sweep_start_ms = [None, None]
|
||||
|
||||
def angle_to_pulse(self, degrees):
|
||||
"""Convert angle (0-180) to pulse width (500-2500 µs)."""
|
||||
if degrees < 0:
|
||||
degrees = 0
|
||||
if degrees > 180:
|
||||
degrees = 180
|
||||
return SERVO_MIN_US + (degrees * (SERVO_MAX_US - SERVO_MIN_US)) // 180
|
||||
|
||||
def pulse_to_angle(self, pulse_us):
|
||||
"""Convert pulse width to angle."""
|
||||
if pulse_us < SERVO_MIN_US:
|
||||
pulse_us = SERVO_MIN_US
|
||||
if pulse_us > SERVO_MAX_US:
|
||||
pulse_us = SERVO_MAX_US
|
||||
return (pulse_us - SERVO_MIN_US) * 180 // (SERVO_MAX_US - SERVO_MIN_US)
|
||||
|
||||
def set_angle(self, channel, degrees):
|
||||
"""Immediately set servo angle."""
|
||||
self.current_angle_deg[channel] = min(180, max(0, degrees))
|
||||
self.pulse_us[channel] = self.angle_to_pulse(self.current_angle_deg[channel])
|
||||
self.is_sweeping[channel] = False
|
||||
|
||||
def get_angle(self, channel):
|
||||
"""Get current servo angle."""
|
||||
return self.current_angle_deg[channel]
|
||||
|
||||
def set_pulse_us(self, channel, pulse_us):
|
||||
"""Set servo by pulse width."""
|
||||
if pulse_us < SERVO_MIN_US:
|
||||
pulse_us = SERVO_MIN_US
|
||||
if pulse_us > SERVO_MAX_US:
|
||||
pulse_us = SERVO_MAX_US
|
||||
self.pulse_us[channel] = pulse_us
|
||||
self.current_angle_deg[channel] = self.pulse_to_angle(pulse_us)
|
||||
self.is_sweeping[channel] = False
|
||||
|
||||
def sweep(self, channel, start_deg, end_deg, duration_ms):
|
||||
"""Start smooth sweep."""
|
||||
self.sweep_start_deg[channel] = start_deg
|
||||
self.sweep_end_deg[channel] = end_deg
|
||||
self.sweep_duration_ms[channel] = duration_ms
|
||||
self.sweep_start_ms[channel] = None
|
||||
self.is_sweeping[channel] = True
|
||||
|
||||
def tick(self, now_ms):
|
||||
"""Update sweep animations."""
|
||||
for ch in range(NUM_SERVOS):
|
||||
if not self.is_sweeping[ch]:
|
||||
continue
|
||||
|
||||
# Initialize start time on first call
|
||||
if self.sweep_start_ms[ch] is None:
|
||||
self.sweep_start_ms[ch] = now_ms
|
||||
|
||||
elapsed = now_ms - self.sweep_start_ms[ch]
|
||||
duration = self.sweep_duration_ms[ch]
|
||||
|
||||
if elapsed >= duration:
|
||||
# Sweep complete
|
||||
self.is_sweeping[ch] = False
|
||||
self.current_angle_deg[ch] = self.sweep_end_deg[ch]
|
||||
self.pulse_us[ch] = self.angle_to_pulse(self.sweep_end_deg[ch])
|
||||
else:
|
||||
# Linear interpolation
|
||||
start = self.sweep_start_deg[ch]
|
||||
end = self.sweep_end_deg[ch]
|
||||
delta = end - start
|
||||
angle = start + (delta * elapsed) // duration
|
||||
self.current_angle_deg[ch] = angle
|
||||
self.pulse_us[ch] = self.angle_to_pulse(angle)
|
||||
|
||||
def is_sweeping_any(self):
|
||||
"""Check if any servo is sweeping."""
|
||||
return any(self.is_sweeping)
|
||||
|
||||
|
||||
# ── Tests ──────────────────────────────────────────────────────────────────
|
||||
|
||||
def test_initialization():
|
||||
"""Servos should initialize centered at 90°."""
|
||||
sim = ServoSimulator()
|
||||
assert sim.get_angle(SERVO_PAN) == 90
|
||||
assert sim.get_angle(SERVO_TILT) == 90
|
||||
assert sim.pulse_us[SERVO_PAN] == SERVO_CENTER_US
|
||||
assert sim.pulse_us[SERVO_TILT] == SERVO_CENTER_US
|
||||
|
||||
|
||||
def test_angle_to_pulse_conversion():
|
||||
"""Angle to pulse conversion should be linear."""
|
||||
sim = ServoSimulator()
|
||||
|
||||
assert sim.angle_to_pulse(0) == SERVO_MIN_US # 500 µs
|
||||
assert sim.angle_to_pulse(90) == SERVO_CENTER_US # 1500 µs
|
||||
assert sim.angle_to_pulse(180) == SERVO_MAX_US # 2500 µs
|
||||
|
||||
# Intermediate angles
|
||||
assert sim.angle_to_pulse(45) == 1000 # 0.5 way: 500 + 500 = 1000
|
||||
assert sim.angle_to_pulse(135) == 2000 # 0.75 way: 500 + 1500 = 2000
|
||||
|
||||
|
||||
def test_pulse_to_angle_conversion():
|
||||
"""Pulse to angle conversion should invert angle_to_pulse."""
|
||||
sim = ServoSimulator()
|
||||
|
||||
assert sim.pulse_to_angle(SERVO_MIN_US) == 0
|
||||
assert sim.pulse_to_angle(SERVO_CENTER_US) == 90
|
||||
assert sim.pulse_to_angle(SERVO_MAX_US) == 180
|
||||
|
||||
# Intermediate pulses
|
||||
assert sim.pulse_to_angle(1000) == 45
|
||||
assert sim.pulse_to_angle(2000) == 135
|
||||
|
||||
|
||||
def test_set_angle_pan():
|
||||
"""Pan servo should update angle immediately."""
|
||||
sim = ServoSimulator()
|
||||
|
||||
sim.set_angle(SERVO_PAN, 0)
|
||||
assert sim.get_angle(SERVO_PAN) == 0
|
||||
assert sim.pulse_us[SERVO_PAN] == SERVO_MIN_US
|
||||
|
||||
sim.set_angle(SERVO_PAN, 90)
|
||||
assert sim.get_angle(SERVO_PAN) == 90
|
||||
assert sim.pulse_us[SERVO_PAN] == SERVO_CENTER_US
|
||||
|
||||
sim.set_angle(SERVO_PAN, 180)
|
||||
assert sim.get_angle(SERVO_PAN) == 180
|
||||
assert sim.pulse_us[SERVO_PAN] == SERVO_MAX_US
|
||||
|
||||
|
||||
def test_set_angle_tilt():
|
||||
"""Tilt servo should work independently."""
|
||||
sim = ServoSimulator()
|
||||
|
||||
sim.set_angle(SERVO_TILT, 45)
|
||||
assert sim.get_angle(SERVO_TILT) == 45
|
||||
assert sim.get_angle(SERVO_PAN) == 90 # Pan unchanged
|
||||
|
||||
|
||||
def test_set_pulse_us():
|
||||
"""Pulse width setter should update angle correctly."""
|
||||
sim = ServoSimulator()
|
||||
|
||||
sim.set_pulse_us(SERVO_PAN, SERVO_MIN_US)
|
||||
assert sim.get_angle(SERVO_PAN) == 0
|
||||
|
||||
sim.set_pulse_us(SERVO_PAN, SERVO_CENTER_US)
|
||||
assert sim.get_angle(SERVO_PAN) == 90
|
||||
|
||||
sim.set_pulse_us(SERVO_PAN, SERVO_MAX_US)
|
||||
assert sim.get_angle(SERVO_PAN) == 180
|
||||
|
||||
|
||||
def test_sweep_timing():
|
||||
"""Sweep should complete in specified duration."""
|
||||
sim = ServoSimulator()
|
||||
|
||||
# Pan from 0° to 180° over 2 seconds
|
||||
sim.sweep(SERVO_PAN, 0, 180, 2000)
|
||||
|
||||
# Initial tick
|
||||
sim.tick(0)
|
||||
assert sim.get_angle(SERVO_PAN) == 0
|
||||
|
||||
# Halfway through sweep (t=1000ms)
|
||||
sim.tick(1000)
|
||||
assert sim.get_angle(SERVO_PAN) == 90 # Linear interpolation
|
||||
|
||||
# End of sweep (t=2000ms)
|
||||
sim.tick(2000)
|
||||
assert sim.get_angle(SERVO_PAN) == 180
|
||||
assert not sim.is_sweeping[SERVO_PAN]
|
||||
|
||||
|
||||
def test_sweep_interpolation():
|
||||
"""Sweep should interpolate smoothly."""
|
||||
sim = ServoSimulator()
|
||||
|
||||
# Sweep from 0° to 180° in 1000ms
|
||||
sim.sweep(SERVO_PAN, 0, 180, 1000)
|
||||
|
||||
angles = []
|
||||
for t in range(0, 1001, 100):
|
||||
sim.tick(t)
|
||||
angles.append(sim.get_angle(SERVO_PAN))
|
||||
|
||||
# Expected: [0, 18, 36, 54, 72, 90, 108, 126, 144, 162, 180]
|
||||
expected = [i * 18 for i in range(11)]
|
||||
assert angles == expected, f"Got {angles}, expected {expected}"
|
||||
|
||||
|
||||
def test_reverse_sweep():
|
||||
"""Sweep from higher angle to lower angle."""
|
||||
sim = ServoSimulator()
|
||||
|
||||
sim.sweep(SERVO_TILT, 180, 0, 1000)
|
||||
|
||||
sim.tick(0)
|
||||
assert sim.get_angle(SERVO_TILT) == 180
|
||||
|
||||
sim.tick(500)
|
||||
assert sim.get_angle(SERVO_TILT) == 90
|
||||
|
||||
sim.tick(1000)
|
||||
assert sim.get_angle(SERVO_TILT) == 0
|
||||
assert not sim.is_sweeping[SERVO_TILT]
|
||||
|
||||
|
||||
def test_sweep_stops_on_immediate_set():
|
||||
"""Setting angle immediately should stop sweep."""
|
||||
sim = ServoSimulator()
|
||||
|
||||
sim.sweep(SERVO_PAN, 0, 180, 2000)
|
||||
sim.tick(500)
|
||||
|
||||
# Stop sweep by setting angle
|
||||
sim.set_angle(SERVO_PAN, 45)
|
||||
assert not sim.is_sweeping[SERVO_PAN]
|
||||
assert sim.get_angle(SERVO_PAN) == 45
|
||||
|
||||
|
||||
def test_independent_servos():
|
||||
"""Pan and tilt servos should sweep independently."""
|
||||
sim = ServoSimulator()
|
||||
|
||||
sim.sweep(SERVO_PAN, 0, 180, 1000)
|
||||
sim.sweep(SERVO_TILT, 180, 0, 2000)
|
||||
|
||||
# Initialize sweep timing
|
||||
sim.tick(0)
|
||||
|
||||
# After 1 second
|
||||
sim.tick(1000)
|
||||
assert sim.get_angle(SERVO_PAN) == 180
|
||||
assert not sim.is_sweeping[SERVO_PAN]
|
||||
assert sim.get_angle(SERVO_TILT) == 90 # Halfway through
|
||||
assert sim.is_sweeping[SERVO_TILT]
|
||||
|
||||
# After 2 seconds
|
||||
sim.tick(2000)
|
||||
assert not sim.is_sweeping[SERVO_PAN]
|
||||
assert sim.get_angle(SERVO_TILT) == 0
|
||||
assert not sim.is_sweeping[SERVO_TILT]
|
||||
assert not sim.is_sweeping_any()
|
||||
|
||||
|
||||
def test_fast_sweep():
|
||||
"""Very short sweep should work."""
|
||||
sim = ServoSimulator()
|
||||
|
||||
sim.sweep(SERVO_PAN, 45, 135, 100) # 90° in 100ms
|
||||
|
||||
sim.tick(0)
|
||||
assert sim.get_angle(SERVO_PAN) == 45
|
||||
|
||||
sim.tick(50)
|
||||
assert sim.get_angle(SERVO_PAN) == 90
|
||||
|
||||
sim.tick(100)
|
||||
assert sim.get_angle(SERVO_PAN) == 135
|
||||
assert not sim.is_sweeping[SERVO_PAN]
|
||||
|
||||
|
||||
def test_multiple_sweeps():
|
||||
"""Multiple sequential sweeps should work."""
|
||||
sim = ServoSimulator()
|
||||
|
||||
# First sweep (0° to 90° in 500ms)
|
||||
sim.sweep(SERVO_PAN, 0, 90, 500)
|
||||
sim.tick(0)
|
||||
sim.tick(500)
|
||||
assert sim.get_angle(SERVO_PAN) == 90
|
||||
assert not sim.is_sweeping[SERVO_PAN]
|
||||
|
||||
# Second sweep (90° to 0° in 500ms, starting at t=500)
|
||||
sim.sweep(SERVO_PAN, 90, 0, 500)
|
||||
sim.tick(500) # Initialize second sweep
|
||||
sim.tick(1000) # After 500ms of second sweep
|
||||
assert sim.get_angle(SERVO_PAN) == 0
|
||||
assert not sim.is_sweeping[SERVO_PAN]
|
||||
|
||||
|
||||
def test_boundary_angles():
|
||||
"""Angles > 180° should clamp to 180°."""
|
||||
sim = ServoSimulator()
|
||||
|
||||
sim.set_angle(SERVO_PAN, 200)
|
||||
assert sim.get_angle(SERVO_PAN) == 180
|
||||
|
||||
sim.set_angle(SERVO_PAN, -10)
|
||||
assert sim.get_angle(SERVO_PAN) == 0
|
||||
|
||||
|
||||
def test_pulse_clamping():
|
||||
"""Pulse widths outside 500-2500 µs should clamp."""
|
||||
sim = ServoSimulator()
|
||||
|
||||
sim.set_pulse_us(SERVO_PAN, 100) # Too low
|
||||
assert sim.pulse_us[SERVO_PAN] == SERVO_MIN_US
|
||||
|
||||
sim.set_pulse_us(SERVO_PAN, 3000) # Too high
|
||||
assert sim.pulse_us[SERVO_PAN] == SERVO_MAX_US
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
pytest.main([__file__, '-v'])
|
||||
Loading…
x
Reference in New Issue
Block a user