Merge pull request 'feat: STM32 serial bridge — USB CDC to ROS2 topics' (#16) from sl-jetson/stm32-serial-bridge into main
This commit is contained in:
commit
f2d2df030e
7
jetson/ros2_ws/src/saltybot_bridge/.gitignore
vendored
Normal file
7
jetson/ros2_ws/src/saltybot_bridge/.gitignore
vendored
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
__pycache__/
|
||||||
|
*.pyc
|
||||||
|
*.pyo
|
||||||
|
*.egg-info/
|
||||||
|
build/
|
||||||
|
install/
|
||||||
|
log/
|
||||||
@ -0,0 +1,8 @@
|
|||||||
|
stm32_serial_bridge:
|
||||||
|
ros__parameters:
|
||||||
|
# STM32 USB CDC device node
|
||||||
|
# Use /dev/stm32-bridge if udev rule from jetson/docs/pinout.md is applied
|
||||||
|
serial_port: /dev/ttyACM0
|
||||||
|
baud_rate: 921600
|
||||||
|
timeout: 0.1 # serial readline timeout (seconds)
|
||||||
|
reconnect_delay: 2.0 # seconds between reconnect attempts on disconnect
|
||||||
34
jetson/ros2_ws/src/saltybot_bridge/launch/bridge.launch.py
Normal file
34
jetson/ros2_ws/src/saltybot_bridge/launch/bridge.launch.py
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
from launch import LaunchDescription
|
||||||
|
from launch.actions import DeclareLaunchArgument
|
||||||
|
from launch.substitutions import LaunchConfiguration
|
||||||
|
from launch_ros.actions import Node
|
||||||
|
|
||||||
|
|
||||||
|
def generate_launch_description():
|
||||||
|
serial_port_arg = DeclareLaunchArgument(
|
||||||
|
"serial_port",
|
||||||
|
default_value="/dev/ttyACM0",
|
||||||
|
description="STM32 USB CDC device node",
|
||||||
|
)
|
||||||
|
baud_rate_arg = DeclareLaunchArgument(
|
||||||
|
"baud_rate",
|
||||||
|
default_value="921600",
|
||||||
|
description="Serial baud rate",
|
||||||
|
)
|
||||||
|
|
||||||
|
bridge_node = Node(
|
||||||
|
package="saltybot_bridge",
|
||||||
|
executable="serial_bridge_node",
|
||||||
|
name="stm32_serial_bridge",
|
||||||
|
output="screen",
|
||||||
|
parameters=[
|
||||||
|
{
|
||||||
|
"serial_port": LaunchConfiguration("serial_port"),
|
||||||
|
"baud_rate": LaunchConfiguration("baud_rate"),
|
||||||
|
"timeout": 0.1,
|
||||||
|
"reconnect_delay": 2.0,
|
||||||
|
}
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
return LaunchDescription([serial_port_arg, baud_rate_arg, bridge_node])
|
||||||
27
jetson/ros2_ws/src/saltybot_bridge/package.xml
Normal file
27
jetson/ros2_ws/src/saltybot_bridge/package.xml
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
<?xml version="1.0"?>
|
||||||
|
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||||
|
<package format="3">
|
||||||
|
<name>saltybot_bridge</name>
|
||||||
|
<version>0.1.0</version>
|
||||||
|
<description>
|
||||||
|
STM32F722 USB CDC serial bridge for saltybot.
|
||||||
|
Reads JSON telemetry from the STM32 flight controller and publishes
|
||||||
|
sensor_msgs/Imu, std_msgs/String (balance state), and diagnostic_msgs/DiagnosticArray.
|
||||||
|
</description>
|
||||||
|
<maintainer email="sl-jetson@saltylab.local">sl-jetson</maintainer>
|
||||||
|
<license>MIT</license>
|
||||||
|
|
||||||
|
<depend>rclpy</depend>
|
||||||
|
<depend>sensor_msgs</depend>
|
||||||
|
<depend>std_msgs</depend>
|
||||||
|
<depend>diagnostic_msgs</depend>
|
||||||
|
|
||||||
|
<test_depend>ament_copyright</test_depend>
|
||||||
|
<test_depend>ament_flake8</test_depend>
|
||||||
|
<test_depend>ament_pep257</test_depend>
|
||||||
|
<test_depend>python3-pytest</test_depend>
|
||||||
|
|
||||||
|
<export>
|
||||||
|
<build_type>ament_python</build_type>
|
||||||
|
</export>
|
||||||
|
</package>
|
||||||
@ -0,0 +1,304 @@
|
|||||||
|
"""
|
||||||
|
saltybot_bridge — serial_bridge_node
|
||||||
|
STM32F722 USB CDC → ROS2 topic publisher
|
||||||
|
|
||||||
|
Telemetry frame (50 Hz, newline-delimited JSON):
|
||||||
|
{"p":<pitch×10>,"r":<roll×10>,"e":<err×10>,"ig":<integral×10>,
|
||||||
|
"m":<motor_cmd -1000..1000>,"s":<state 0-2>,"y":<yaw×10>}
|
||||||
|
|
||||||
|
Error frame (IMU fault):
|
||||||
|
{"err":<imu_errno>}
|
||||||
|
|
||||||
|
State values (balance_state_t):
|
||||||
|
0 = DISARMED 1 = ARMED 2 = TILT_FAULT
|
||||||
|
|
||||||
|
Published topics:
|
||||||
|
/saltybot/imu sensor_msgs/Imu — pitch/roll/yaw as angular velocity
|
||||||
|
/saltybot/balance_state std_msgs/String (JSON) — full PID diagnostics
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import math
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
|
||||||
|
|
||||||
|
import serial
|
||||||
|
|
||||||
|
from sensor_msgs.msg import Imu
|
||||||
|
from std_msgs.msg import String
|
||||||
|
from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
|
||||||
|
|
||||||
|
# Balance state labels matching STM32 balance_state_t enum
|
||||||
|
_STATE_LABEL = {0: "DISARMED", 1: "ARMED", 2: "TILT_FAULT"}
|
||||||
|
|
||||||
|
# Sensor frame_id published in Imu header
|
||||||
|
IMU_FRAME_ID = "imu_link"
|
||||||
|
|
||||||
|
|
||||||
|
class SerialBridgeNode(Node):
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__("stm32_serial_bridge")
|
||||||
|
|
||||||
|
# ── Parameters ────────────────────────────────────────────────────────
|
||||||
|
self.declare_parameter("serial_port", "/dev/ttyACM0")
|
||||||
|
self.declare_parameter("baud_rate", 921600)
|
||||||
|
self.declare_parameter("timeout", 0.1)
|
||||||
|
self.declare_parameter("reconnect_delay", 2.0)
|
||||||
|
|
||||||
|
port = self.get_parameter("serial_port").value
|
||||||
|
baud = self.get_parameter("baud_rate").value
|
||||||
|
timeout = self.get_parameter("timeout").value
|
||||||
|
self._reconnect_delay = self.get_parameter("reconnect_delay").value
|
||||||
|
|
||||||
|
# ── QoS ───────────────────────────────────────────────────────────────
|
||||||
|
sensor_qos = QoSProfile(
|
||||||
|
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||||
|
history=HistoryPolicy.KEEP_LAST,
|
||||||
|
depth=10,
|
||||||
|
)
|
||||||
|
diag_qos = QoSProfile(
|
||||||
|
reliability=ReliabilityPolicy.RELIABLE,
|
||||||
|
history=HistoryPolicy.KEEP_LAST,
|
||||||
|
depth=10,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Publishers ────────────────────────────────────────────────────────
|
||||||
|
self._imu_pub = self.create_publisher(Imu, "/saltybot/imu", sensor_qos)
|
||||||
|
self._balance_pub = self.create_publisher(
|
||||||
|
String, "/saltybot/balance_state", diag_qos
|
||||||
|
)
|
||||||
|
self._diag_pub = self.create_publisher(
|
||||||
|
DiagnosticArray, "/diagnostics", diag_qos
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── State ─────────────────────────────────────────────────────────────
|
||||||
|
self._port_name = port
|
||||||
|
self._baud = baud
|
||||||
|
self._timeout = timeout
|
||||||
|
self._ser: serial.Serial | None = None
|
||||||
|
self._frame_count = 0
|
||||||
|
self._error_count = 0
|
||||||
|
self._last_state = -1
|
||||||
|
|
||||||
|
# ── Open serial and start read timer ──────────────────────────────────
|
||||||
|
self._open_serial()
|
||||||
|
# Poll at 100 Hz — STM32 sends at 50 Hz, so we never miss a frame
|
||||||
|
self._timer = self.create_timer(0.01, self._read_cb)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"stm32_serial_bridge started — {port} @ {baud} baud"
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Serial management ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _open_serial(self) -> bool:
|
||||||
|
try:
|
||||||
|
self._ser = serial.Serial(
|
||||||
|
port=self._port_name,
|
||||||
|
baudrate=self._baud,
|
||||||
|
timeout=self._timeout,
|
||||||
|
)
|
||||||
|
self._ser.reset_input_buffer()
|
||||||
|
self.get_logger().info(f"Serial port open: {self._port_name}")
|
||||||
|
return True
|
||||||
|
except serial.SerialException as exc:
|
||||||
|
self.get_logger().error(f"Cannot open {self._port_name}: {exc}")
|
||||||
|
self._ser = None
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _close_serial(self):
|
||||||
|
if self._ser and self._ser.is_open:
|
||||||
|
try:
|
||||||
|
self._ser.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
self._ser = None
|
||||||
|
|
||||||
|
# ── Read callback ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _read_cb(self):
|
||||||
|
if self._ser is None or not self._ser.is_open:
|
||||||
|
# Attempt reconnect on next timer fire after delay
|
||||||
|
self.get_logger().warn(
|
||||||
|
f"Serial disconnected — retrying in {self._reconnect_delay}s",
|
||||||
|
throttle_duration_sec=self._reconnect_delay,
|
||||||
|
)
|
||||||
|
self._open_serial()
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Read all lines buffered since last call
|
||||||
|
while self._ser.in_waiting:
|
||||||
|
raw = self._ser.readline()
|
||||||
|
if raw:
|
||||||
|
self._parse_and_publish(raw)
|
||||||
|
except serial.SerialException as exc:
|
||||||
|
self.get_logger().error(f"Serial read error: {exc}")
|
||||||
|
self._close_serial()
|
||||||
|
|
||||||
|
# ── Parsing + publishing ──────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _parse_and_publish(self, raw: bytes):
|
||||||
|
try:
|
||||||
|
text = raw.decode("ascii", errors="ignore").strip()
|
||||||
|
if not text:
|
||||||
|
return
|
||||||
|
data = json.loads(text)
|
||||||
|
except (ValueError, UnicodeDecodeError) as exc:
|
||||||
|
self.get_logger().debug(f"Parse error ({exc}): {raw!r}")
|
||||||
|
self._error_count += 1
|
||||||
|
return
|
||||||
|
|
||||||
|
now = self.get_clock().now().to_msg()
|
||||||
|
|
||||||
|
# IMU fault frame — {"err": <errno>}
|
||||||
|
if "err" in data:
|
||||||
|
self._publish_imu_fault(data["err"], now)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Normal telemetry frame — validate required keys
|
||||||
|
required = ("p", "r", "e", "ig", "m", "s", "y")
|
||||||
|
if not all(k in data for k in required):
|
||||||
|
self.get_logger().debug(f"Incomplete frame: {text}")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Values are ×10 integers from firmware — convert to float
|
||||||
|
pitch_deg = data["p"] / 10.0
|
||||||
|
roll_deg = data["r"] / 10.0
|
||||||
|
yaw_deg = data["y"] / 10.0
|
||||||
|
error_deg = data["e"] / 10.0
|
||||||
|
integral = data["ig"] / 10.0
|
||||||
|
motor_cmd = float(data["m"]) # -1000..+1000 ESC units
|
||||||
|
state = int(data["s"])
|
||||||
|
|
||||||
|
self._frame_count += 1
|
||||||
|
|
||||||
|
self._publish_imu(pitch_deg, roll_deg, yaw_deg, now)
|
||||||
|
self._publish_balance_state(
|
||||||
|
pitch_deg, roll_deg, yaw_deg,
|
||||||
|
error_deg, integral, motor_cmd, state, now,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Log state transitions
|
||||||
|
if state != self._last_state:
|
||||||
|
label = _STATE_LABEL.get(state, f"UNKNOWN({state})")
|
||||||
|
self.get_logger().info(f"Balance state → {label}")
|
||||||
|
self._last_state = state
|
||||||
|
|
||||||
|
def _publish_imu(self, pitch_deg, roll_deg, yaw_deg, stamp):
|
||||||
|
"""
|
||||||
|
Publish sensor_msgs/Imu.
|
||||||
|
|
||||||
|
The STM32 IMU gives Euler angles (pitch/roll from accelerometer+gyro
|
||||||
|
fusion, yaw from gyro integration). We publish them as angular_velocity
|
||||||
|
for immediate use by slam_toolbox / robot_localization.
|
||||||
|
|
||||||
|
Note: orientation quaternion is left zeroed (covariance [-1,...]) until
|
||||||
|
a proper fusion node (e.g. robot_localization EKF) computes it.
|
||||||
|
"""
|
||||||
|
msg = Imu()
|
||||||
|
msg.header.stamp = stamp
|
||||||
|
msg.header.frame_id = IMU_FRAME_ID
|
||||||
|
|
||||||
|
# orientation unknown — signal with -1 in first covariance element
|
||||||
|
msg.orientation_covariance[0] = -1.0
|
||||||
|
|
||||||
|
# angular_velocity: Euler rates approximation (good enough at low freq)
|
||||||
|
msg.angular_velocity.x = math.radians(pitch_deg) # pitch → x
|
||||||
|
msg.angular_velocity.y = math.radians(roll_deg) # roll → y
|
||||||
|
msg.angular_velocity.z = math.radians(yaw_deg) # yaw → z
|
||||||
|
|
||||||
|
# angular_velocity covariance (diagonal, degrees converted to rad)
|
||||||
|
cov_rad = math.radians(0.5) ** 2 # ±0.5° noise estimate
|
||||||
|
msg.angular_velocity_covariance[0] = cov_rad
|
||||||
|
msg.angular_velocity_covariance[4] = cov_rad
|
||||||
|
msg.angular_velocity_covariance[8] = cov_rad
|
||||||
|
|
||||||
|
# linear_acceleration: unknown from this frame
|
||||||
|
msg.linear_acceleration_covariance[0] = -1.0
|
||||||
|
|
||||||
|
self._imu_pub.publish(msg)
|
||||||
|
|
||||||
|
def _publish_balance_state(
|
||||||
|
self, pitch, roll, yaw, error, integral, motor_cmd, state, stamp
|
||||||
|
):
|
||||||
|
"""Publish full PID diagnostics as JSON string and /diagnostics."""
|
||||||
|
state_label = _STATE_LABEL.get(state, f"UNKNOWN({state})")
|
||||||
|
|
||||||
|
payload = {
|
||||||
|
"stamp": f"{stamp.sec}.{stamp.nanosec:09d}",
|
||||||
|
"state": state_label,
|
||||||
|
"pitch_deg": round(pitch, 1),
|
||||||
|
"roll_deg": round(roll, 1),
|
||||||
|
"yaw_deg": round(yaw, 1),
|
||||||
|
"pid_error_deg": round(error, 1),
|
||||||
|
"integral": round(integral, 1),
|
||||||
|
"motor_cmd": int(motor_cmd),
|
||||||
|
"frames": self._frame_count,
|
||||||
|
"parse_errors": self._error_count,
|
||||||
|
}
|
||||||
|
|
||||||
|
str_msg = String()
|
||||||
|
str_msg.data = json.dumps(payload)
|
||||||
|
self._balance_pub.publish(str_msg)
|
||||||
|
|
||||||
|
# /diagnostics
|
||||||
|
diag = DiagnosticArray()
|
||||||
|
diag.header.stamp = stamp
|
||||||
|
status = DiagnosticStatus()
|
||||||
|
status.name = "saltybot/balance_controller"
|
||||||
|
status.hardware_id = "stm32f722"
|
||||||
|
status.message = state_label
|
||||||
|
|
||||||
|
if state == 1: # ARMED
|
||||||
|
status.level = DiagnosticStatus.OK
|
||||||
|
elif state == 0: # DISARMED
|
||||||
|
status.level = DiagnosticStatus.WARN
|
||||||
|
else: # TILT_FAULT
|
||||||
|
status.level = DiagnosticStatus.ERROR
|
||||||
|
|
||||||
|
status.values = [
|
||||||
|
KeyValue(key="pitch_deg", value=f"{pitch:.1f}"),
|
||||||
|
KeyValue(key="roll_deg", value=f"{roll:.1f}"),
|
||||||
|
KeyValue(key="yaw_deg", value=f"{yaw:.1f}"),
|
||||||
|
KeyValue(key="pid_error_deg", value=f"{error:.1f}"),
|
||||||
|
KeyValue(key="integral", value=f"{integral:.1f}"),
|
||||||
|
KeyValue(key="motor_cmd", value=f"{int(motor_cmd)}"),
|
||||||
|
KeyValue(key="state", value=state_label),
|
||||||
|
]
|
||||||
|
diag.status.append(status)
|
||||||
|
self._diag_pub.publish(diag)
|
||||||
|
|
||||||
|
def _publish_imu_fault(self, errno: int, stamp):
|
||||||
|
"""On IMU fault frame, publish an ERROR diagnostic."""
|
||||||
|
diag = DiagnosticArray()
|
||||||
|
diag.header.stamp = stamp
|
||||||
|
status = DiagnosticStatus()
|
||||||
|
status.level = DiagnosticStatus.ERROR
|
||||||
|
status.name = "saltybot/balance_controller"
|
||||||
|
status.hardware_id = "stm32f722"
|
||||||
|
status.message = f"IMU fault errno={errno}"
|
||||||
|
diag.status.append(status)
|
||||||
|
self._diag_pub.publish(diag)
|
||||||
|
self.get_logger().error(f"STM32 reported IMU fault: errno={errno}")
|
||||||
|
|
||||||
|
def destroy_node(self):
|
||||||
|
self._close_serial()
|
||||||
|
super().destroy_node()
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = SerialBridgeNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
4
jetson/ros2_ws/src/saltybot_bridge/setup.cfg
Normal file
4
jetson/ros2_ws/src/saltybot_bridge/setup.cfg
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
[develop]
|
||||||
|
script_dir=$base/lib/saltybot_bridge
|
||||||
|
[install]
|
||||||
|
install_scripts=$base/lib/saltybot_bridge
|
||||||
27
jetson/ros2_ws/src/saltybot_bridge/setup.py
Normal file
27
jetson/ros2_ws/src/saltybot_bridge/setup.py
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
from setuptools import setup
|
||||||
|
|
||||||
|
package_name = "saltybot_bridge"
|
||||||
|
|
||||||
|
setup(
|
||||||
|
name=package_name,
|
||||||
|
version="0.1.0",
|
||||||
|
packages=[package_name],
|
||||||
|
data_files=[
|
||||||
|
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
|
||||||
|
(f"share/{package_name}", ["package.xml"]),
|
||||||
|
(f"share/{package_name}/launch", ["launch/bridge.launch.py"]),
|
||||||
|
(f"share/{package_name}/config", ["config/bridge_params.yaml"]),
|
||||||
|
],
|
||||||
|
install_requires=["setuptools", "pyserial"],
|
||||||
|
zip_safe=True,
|
||||||
|
maintainer="sl-jetson",
|
||||||
|
maintainer_email="sl-jetson@saltylab.local",
|
||||||
|
description="STM32 USB CDC → ROS2 serial bridge for saltybot",
|
||||||
|
license="MIT",
|
||||||
|
tests_require=["pytest"],
|
||||||
|
entry_points={
|
||||||
|
"console_scripts": [
|
||||||
|
"serial_bridge_node = saltybot_bridge.serial_bridge_node:main",
|
||||||
|
],
|
||||||
|
},
|
||||||
|
)
|
||||||
97
jetson/ros2_ws/src/saltybot_bridge/test/test_parse.py
Normal file
97
jetson/ros2_ws/src/saltybot_bridge/test/test_parse.py
Normal file
@ -0,0 +1,97 @@
|
|||||||
|
"""
|
||||||
|
Unit tests for STM32 telemetry parsing logic.
|
||||||
|
Run with: pytest jetson/ros2_ws/src/saltybot_bridge/test/test_parse.py
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
|
# ── Minimal stub so we can test parsing without a ROS2 runtime ───────────────
|
||||||
|
|
||||||
|
def parse_frame(raw: bytes):
|
||||||
|
"""Mirror of the parsing logic in serial_bridge_node._parse_and_publish."""
|
||||||
|
text = raw.decode("ascii", errors="ignore").strip()
|
||||||
|
if not text:
|
||||||
|
return None
|
||||||
|
data = json.loads(text)
|
||||||
|
|
||||||
|
if "err" in data:
|
||||||
|
return {"type": "fault", "errno": data["err"]}
|
||||||
|
|
||||||
|
required = ("p", "r", "e", "ig", "m", "s", "y")
|
||||||
|
if not all(k in data for k in required):
|
||||||
|
raise ValueError(f"Incomplete frame: {data}")
|
||||||
|
|
||||||
|
return {
|
||||||
|
"type": "telemetry",
|
||||||
|
"pitch_deg": data["p"] / 10.0,
|
||||||
|
"roll_deg": data["r"] / 10.0,
|
||||||
|
"yaw_deg": data["y"] / 10.0,
|
||||||
|
"pid_error_deg": data["e"] / 10.0,
|
||||||
|
"integral": data["ig"] / 10.0,
|
||||||
|
"motor_cmd": int(data["m"]),
|
||||||
|
"state": int(data["s"]),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ── Tests ─────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def test_normal_frame():
|
||||||
|
raw = b'{"p":125,"r":-30,"e":50,"ig":20,"m":350,"s":1,"y":0}\n'
|
||||||
|
result = parse_frame(raw)
|
||||||
|
assert result["type"] == "telemetry"
|
||||||
|
assert result["pitch_deg"] == pytest.approx(12.5)
|
||||||
|
assert result["roll_deg"] == pytest.approx(-3.0)
|
||||||
|
assert result["yaw_deg"] == pytest.approx(0.0)
|
||||||
|
assert result["pid_error_deg"] == pytest.approx(5.0)
|
||||||
|
assert result["integral"] == pytest.approx(2.0)
|
||||||
|
assert result["motor_cmd"] == 350
|
||||||
|
assert result["state"] == 1 # ARMED
|
||||||
|
|
||||||
|
|
||||||
|
def test_fault_frame():
|
||||||
|
raw = b'{"err":-1}\n'
|
||||||
|
result = parse_frame(raw)
|
||||||
|
assert result["type"] == "fault"
|
||||||
|
assert result["errno"] == -1
|
||||||
|
|
||||||
|
|
||||||
|
def test_zero_frame():
|
||||||
|
raw = b'{"p":0,"r":0,"e":0,"ig":0,"m":0,"s":0,"y":0}\n'
|
||||||
|
result = parse_frame(raw)
|
||||||
|
assert result["type"] == "telemetry"
|
||||||
|
assert result["pitch_deg"] == pytest.approx(0.0)
|
||||||
|
assert result["state"] == 0 # DISARMED
|
||||||
|
|
||||||
|
|
||||||
|
def test_tilt_fault_state():
|
||||||
|
raw = b'{"p":450,"r":0,"e":400,"ig":999,"m":1000,"s":2,"y":0}\n'
|
||||||
|
result = parse_frame(raw)
|
||||||
|
assert result["state"] == 2 # TILT_FAULT
|
||||||
|
assert result["pitch_deg"] == pytest.approx(45.0)
|
||||||
|
assert result["motor_cmd"] == 1000
|
||||||
|
|
||||||
|
|
||||||
|
def test_negative_motor_cmd():
|
||||||
|
raw = b'{"p":-100,"r":0,"e":-100,"ig":-50,"m":-750,"s":1,"y":10}\n'
|
||||||
|
result = parse_frame(raw)
|
||||||
|
assert result["motor_cmd"] == -750
|
||||||
|
assert result["pitch_deg"] == pytest.approx(-10.0)
|
||||||
|
|
||||||
|
|
||||||
|
def test_incomplete_frame_raises():
|
||||||
|
raw = b'{"p":100,"r":0}\n'
|
||||||
|
with pytest.raises(ValueError, match="Incomplete frame"):
|
||||||
|
parse_frame(raw)
|
||||||
|
|
||||||
|
|
||||||
|
def test_empty_line_returns_none():
|
||||||
|
assert parse_frame(b"\n") is None
|
||||||
|
assert parse_frame(b" \n") is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_corrupt_frame_raises_json_error():
|
||||||
|
raw = b'not_json\n'
|
||||||
|
with pytest.raises(json.JSONDecodeError):
|
||||||
|
parse_frame(raw)
|
||||||
Loading…
x
Reference in New Issue
Block a user