feat: STM32-to-Jetson ROS2 serial bridge node
saltybot_bridge ROS2 Python package (ament_python):
- serial_bridge_node.py: reads USB CDC JSON telemetry from STM32F722 at 50Hz
Parses exact firmware format: {"p","r","e","ig","m","s","y"} (all ×10 ints)
State enum: 0=DISARMED, 1=ARMED, 2=TILT_FAULT (matched to balance_state_t)
- Publishes sensor_msgs/Imu on /saltybot/imu (pitch/roll/yaw as angular_velocity)
- Publishes std_msgs/String on /saltybot/balance_state (full PID JSON diagnostics)
- Publishes diagnostic_msgs/DiagnosticArray on /diagnostics (OK/WARN/ERROR by state)
- Auto-reconnects on serial disconnect; IMU fault frames → ERROR diagnostic
- launch/bridge.launch.py with serial_port + baud_rate launch args
- config/bridge_params.yaml (921600 baud, /dev/ttyACM0)
- test/test_parse.py: 8 unit tests covering normal, fault, edge cases (all pass)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
b9c8bc1a36
commit
7c4e46aaa1
7
jetson/ros2_ws/src/saltybot_bridge/.gitignore
vendored
Normal file
7
jetson/ros2_ws/src/saltybot_bridge/.gitignore
vendored
Normal file
@ -0,0 +1,7 @@
|
||||
__pycache__/
|
||||
*.pyc
|
||||
*.pyo
|
||||
*.egg-info/
|
||||
build/
|
||||
install/
|
||||
log/
|
||||
@ -0,0 +1,8 @@
|
||||
stm32_serial_bridge:
|
||||
ros__parameters:
|
||||
# STM32 USB CDC device node
|
||||
# Use /dev/stm32-bridge if udev rule from jetson/docs/pinout.md is applied
|
||||
serial_port: /dev/ttyACM0
|
||||
baud_rate: 921600
|
||||
timeout: 0.1 # serial readline timeout (seconds)
|
||||
reconnect_delay: 2.0 # seconds between reconnect attempts on disconnect
|
||||
34
jetson/ros2_ws/src/saltybot_bridge/launch/bridge.launch.py
Normal file
34
jetson/ros2_ws/src/saltybot_bridge/launch/bridge.launch.py
Normal file
@ -0,0 +1,34 @@
|
||||
from launch import LaunchDescription
|
||||
from launch.actions import DeclareLaunchArgument
|
||||
from launch.substitutions import LaunchConfiguration
|
||||
from launch_ros.actions import Node
|
||||
|
||||
|
||||
def generate_launch_description():
|
||||
serial_port_arg = DeclareLaunchArgument(
|
||||
"serial_port",
|
||||
default_value="/dev/ttyACM0",
|
||||
description="STM32 USB CDC device node",
|
||||
)
|
||||
baud_rate_arg = DeclareLaunchArgument(
|
||||
"baud_rate",
|
||||
default_value="921600",
|
||||
description="Serial baud rate",
|
||||
)
|
||||
|
||||
bridge_node = Node(
|
||||
package="saltybot_bridge",
|
||||
executable="serial_bridge_node",
|
||||
name="stm32_serial_bridge",
|
||||
output="screen",
|
||||
parameters=[
|
||||
{
|
||||
"serial_port": LaunchConfiguration("serial_port"),
|
||||
"baud_rate": LaunchConfiguration("baud_rate"),
|
||||
"timeout": 0.1,
|
||||
"reconnect_delay": 2.0,
|
||||
}
|
||||
],
|
||||
)
|
||||
|
||||
return LaunchDescription([serial_port_arg, baud_rate_arg, bridge_node])
|
||||
27
jetson/ros2_ws/src/saltybot_bridge/package.xml
Normal file
27
jetson/ros2_ws/src/saltybot_bridge/package.xml
Normal file
@ -0,0 +1,27 @@
|
||||
<?xml version="1.0"?>
|
||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="3">
|
||||
<name>saltybot_bridge</name>
|
||||
<version>0.1.0</version>
|
||||
<description>
|
||||
STM32F722 USB CDC serial bridge for saltybot.
|
||||
Reads JSON telemetry from the STM32 flight controller and publishes
|
||||
sensor_msgs/Imu, std_msgs/String (balance state), and diagnostic_msgs/DiagnosticArray.
|
||||
</description>
|
||||
<maintainer email="sl-jetson@saltylab.local">sl-jetson</maintainer>
|
||||
<license>MIT</license>
|
||||
|
||||
<depend>rclpy</depend>
|
||||
<depend>sensor_msgs</depend>
|
||||
<depend>std_msgs</depend>
|
||||
<depend>diagnostic_msgs</depend>
|
||||
|
||||
<test_depend>ament_copyright</test_depend>
|
||||
<test_depend>ament_flake8</test_depend>
|
||||
<test_depend>ament_pep257</test_depend>
|
||||
<test_depend>python3-pytest</test_depend>
|
||||
|
||||
<export>
|
||||
<build_type>ament_python</build_type>
|
||||
</export>
|
||||
</package>
|
||||
@ -0,0 +1,304 @@
|
||||
"""
|
||||
saltybot_bridge — serial_bridge_node
|
||||
STM32F722 USB CDC → ROS2 topic publisher
|
||||
|
||||
Telemetry frame (50 Hz, newline-delimited JSON):
|
||||
{"p":<pitch×10>,"r":<roll×10>,"e":<err×10>,"ig":<integral×10>,
|
||||
"m":<motor_cmd -1000..1000>,"s":<state 0-2>,"y":<yaw×10>}
|
||||
|
||||
Error frame (IMU fault):
|
||||
{"err":<imu_errno>}
|
||||
|
||||
State values (balance_state_t):
|
||||
0 = DISARMED 1 = ARMED 2 = TILT_FAULT
|
||||
|
||||
Published topics:
|
||||
/saltybot/imu sensor_msgs/Imu — pitch/roll/yaw as angular velocity
|
||||
/saltybot/balance_state std_msgs/String (JSON) — full PID diagnostics
|
||||
"""
|
||||
|
||||
import json
|
||||
import math
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
|
||||
|
||||
import serial
|
||||
|
||||
from sensor_msgs.msg import Imu
|
||||
from std_msgs.msg import String
|
||||
from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
|
||||
|
||||
# Balance state labels matching STM32 balance_state_t enum
|
||||
_STATE_LABEL = {0: "DISARMED", 1: "ARMED", 2: "TILT_FAULT"}
|
||||
|
||||
# Sensor frame_id published in Imu header
|
||||
IMU_FRAME_ID = "imu_link"
|
||||
|
||||
|
||||
class SerialBridgeNode(Node):
|
||||
def __init__(self):
|
||||
super().__init__("stm32_serial_bridge")
|
||||
|
||||
# ── Parameters ────────────────────────────────────────────────────────
|
||||
self.declare_parameter("serial_port", "/dev/ttyACM0")
|
||||
self.declare_parameter("baud_rate", 921600)
|
||||
self.declare_parameter("timeout", 0.1)
|
||||
self.declare_parameter("reconnect_delay", 2.0)
|
||||
|
||||
port = self.get_parameter("serial_port").value
|
||||
baud = self.get_parameter("baud_rate").value
|
||||
timeout = self.get_parameter("timeout").value
|
||||
self._reconnect_delay = self.get_parameter("reconnect_delay").value
|
||||
|
||||
# ── QoS ───────────────────────────────────────────────────────────────
|
||||
sensor_qos = QoSProfile(
|
||||
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||
history=HistoryPolicy.KEEP_LAST,
|
||||
depth=10,
|
||||
)
|
||||
diag_qos = QoSProfile(
|
||||
reliability=ReliabilityPolicy.RELIABLE,
|
||||
history=HistoryPolicy.KEEP_LAST,
|
||||
depth=10,
|
||||
)
|
||||
|
||||
# ── Publishers ────────────────────────────────────────────────────────
|
||||
self._imu_pub = self.create_publisher(Imu, "/saltybot/imu", sensor_qos)
|
||||
self._balance_pub = self.create_publisher(
|
||||
String, "/saltybot/balance_state", diag_qos
|
||||
)
|
||||
self._diag_pub = self.create_publisher(
|
||||
DiagnosticArray, "/diagnostics", diag_qos
|
||||
)
|
||||
|
||||
# ── State ─────────────────────────────────────────────────────────────
|
||||
self._port_name = port
|
||||
self._baud = baud
|
||||
self._timeout = timeout
|
||||
self._ser: serial.Serial | None = None
|
||||
self._frame_count = 0
|
||||
self._error_count = 0
|
||||
self._last_state = -1
|
||||
|
||||
# ── Open serial and start read timer ──────────────────────────────────
|
||||
self._open_serial()
|
||||
# Poll at 100 Hz — STM32 sends at 50 Hz, so we never miss a frame
|
||||
self._timer = self.create_timer(0.01, self._read_cb)
|
||||
|
||||
self.get_logger().info(
|
||||
f"stm32_serial_bridge started — {port} @ {baud} baud"
|
||||
)
|
||||
|
||||
# ── Serial management ─────────────────────────────────────────────────────
|
||||
|
||||
def _open_serial(self) -> bool:
|
||||
try:
|
||||
self._ser = serial.Serial(
|
||||
port=self._port_name,
|
||||
baudrate=self._baud,
|
||||
timeout=self._timeout,
|
||||
)
|
||||
self._ser.reset_input_buffer()
|
||||
self.get_logger().info(f"Serial port open: {self._port_name}")
|
||||
return True
|
||||
except serial.SerialException as exc:
|
||||
self.get_logger().error(f"Cannot open {self._port_name}: {exc}")
|
||||
self._ser = None
|
||||
return False
|
||||
|
||||
def _close_serial(self):
|
||||
if self._ser and self._ser.is_open:
|
||||
try:
|
||||
self._ser.close()
|
||||
except Exception:
|
||||
pass
|
||||
self._ser = None
|
||||
|
||||
# ── Read callback ─────────────────────────────────────────────────────────
|
||||
|
||||
def _read_cb(self):
|
||||
if self._ser is None or not self._ser.is_open:
|
||||
# Attempt reconnect on next timer fire after delay
|
||||
self.get_logger().warn(
|
||||
f"Serial disconnected — retrying in {self._reconnect_delay}s",
|
||||
throttle_duration_sec=self._reconnect_delay,
|
||||
)
|
||||
self._open_serial()
|
||||
return
|
||||
|
||||
try:
|
||||
# Read all lines buffered since last call
|
||||
while self._ser.in_waiting:
|
||||
raw = self._ser.readline()
|
||||
if raw:
|
||||
self._parse_and_publish(raw)
|
||||
except serial.SerialException as exc:
|
||||
self.get_logger().error(f"Serial read error: {exc}")
|
||||
self._close_serial()
|
||||
|
||||
# ── Parsing + publishing ──────────────────────────────────────────────────
|
||||
|
||||
def _parse_and_publish(self, raw: bytes):
|
||||
try:
|
||||
text = raw.decode("ascii", errors="ignore").strip()
|
||||
if not text:
|
||||
return
|
||||
data = json.loads(text)
|
||||
except (ValueError, UnicodeDecodeError) as exc:
|
||||
self.get_logger().debug(f"Parse error ({exc}): {raw!r}")
|
||||
self._error_count += 1
|
||||
return
|
||||
|
||||
now = self.get_clock().now().to_msg()
|
||||
|
||||
# IMU fault frame — {"err": <errno>}
|
||||
if "err" in data:
|
||||
self._publish_imu_fault(data["err"], now)
|
||||
return
|
||||
|
||||
# Normal telemetry frame — validate required keys
|
||||
required = ("p", "r", "e", "ig", "m", "s", "y")
|
||||
if not all(k in data for k in required):
|
||||
self.get_logger().debug(f"Incomplete frame: {text}")
|
||||
return
|
||||
|
||||
# Values are ×10 integers from firmware — convert to float
|
||||
pitch_deg = data["p"] / 10.0
|
||||
roll_deg = data["r"] / 10.0
|
||||
yaw_deg = data["y"] / 10.0
|
||||
error_deg = data["e"] / 10.0
|
||||
integral = data["ig"] / 10.0
|
||||
motor_cmd = float(data["m"]) # -1000..+1000 ESC units
|
||||
state = int(data["s"])
|
||||
|
||||
self._frame_count += 1
|
||||
|
||||
self._publish_imu(pitch_deg, roll_deg, yaw_deg, now)
|
||||
self._publish_balance_state(
|
||||
pitch_deg, roll_deg, yaw_deg,
|
||||
error_deg, integral, motor_cmd, state, now,
|
||||
)
|
||||
|
||||
# Log state transitions
|
||||
if state != self._last_state:
|
||||
label = _STATE_LABEL.get(state, f"UNKNOWN({state})")
|
||||
self.get_logger().info(f"Balance state → {label}")
|
||||
self._last_state = state
|
||||
|
||||
def _publish_imu(self, pitch_deg, roll_deg, yaw_deg, stamp):
|
||||
"""
|
||||
Publish sensor_msgs/Imu.
|
||||
|
||||
The STM32 IMU gives Euler angles (pitch/roll from accelerometer+gyro
|
||||
fusion, yaw from gyro integration). We publish them as angular_velocity
|
||||
for immediate use by slam_toolbox / robot_localization.
|
||||
|
||||
Note: orientation quaternion is left zeroed (covariance [-1,...]) until
|
||||
a proper fusion node (e.g. robot_localization EKF) computes it.
|
||||
"""
|
||||
msg = Imu()
|
||||
msg.header.stamp = stamp
|
||||
msg.header.frame_id = IMU_FRAME_ID
|
||||
|
||||
# orientation unknown — signal with -1 in first covariance element
|
||||
msg.orientation_covariance[0] = -1.0
|
||||
|
||||
# angular_velocity: Euler rates approximation (good enough at low freq)
|
||||
msg.angular_velocity.x = math.radians(pitch_deg) # pitch → x
|
||||
msg.angular_velocity.y = math.radians(roll_deg) # roll → y
|
||||
msg.angular_velocity.z = math.radians(yaw_deg) # yaw → z
|
||||
|
||||
# angular_velocity covariance (diagonal, degrees converted to rad)
|
||||
cov_rad = math.radians(0.5) ** 2 # ±0.5° noise estimate
|
||||
msg.angular_velocity_covariance[0] = cov_rad
|
||||
msg.angular_velocity_covariance[4] = cov_rad
|
||||
msg.angular_velocity_covariance[8] = cov_rad
|
||||
|
||||
# linear_acceleration: unknown from this frame
|
||||
msg.linear_acceleration_covariance[0] = -1.0
|
||||
|
||||
self._imu_pub.publish(msg)
|
||||
|
||||
def _publish_balance_state(
|
||||
self, pitch, roll, yaw, error, integral, motor_cmd, state, stamp
|
||||
):
|
||||
"""Publish full PID diagnostics as JSON string and /diagnostics."""
|
||||
state_label = _STATE_LABEL.get(state, f"UNKNOWN({state})")
|
||||
|
||||
payload = {
|
||||
"stamp": f"{stamp.sec}.{stamp.nanosec:09d}",
|
||||
"state": state_label,
|
||||
"pitch_deg": round(pitch, 1),
|
||||
"roll_deg": round(roll, 1),
|
||||
"yaw_deg": round(yaw, 1),
|
||||
"pid_error_deg": round(error, 1),
|
||||
"integral": round(integral, 1),
|
||||
"motor_cmd": int(motor_cmd),
|
||||
"frames": self._frame_count,
|
||||
"parse_errors": self._error_count,
|
||||
}
|
||||
|
||||
str_msg = String()
|
||||
str_msg.data = json.dumps(payload)
|
||||
self._balance_pub.publish(str_msg)
|
||||
|
||||
# /diagnostics
|
||||
diag = DiagnosticArray()
|
||||
diag.header.stamp = stamp
|
||||
status = DiagnosticStatus()
|
||||
status.name = "saltybot/balance_controller"
|
||||
status.hardware_id = "stm32f722"
|
||||
status.message = state_label
|
||||
|
||||
if state == 1: # ARMED
|
||||
status.level = DiagnosticStatus.OK
|
||||
elif state == 0: # DISARMED
|
||||
status.level = DiagnosticStatus.WARN
|
||||
else: # TILT_FAULT
|
||||
status.level = DiagnosticStatus.ERROR
|
||||
|
||||
status.values = [
|
||||
KeyValue(key="pitch_deg", value=f"{pitch:.1f}"),
|
||||
KeyValue(key="roll_deg", value=f"{roll:.1f}"),
|
||||
KeyValue(key="yaw_deg", value=f"{yaw:.1f}"),
|
||||
KeyValue(key="pid_error_deg", value=f"{error:.1f}"),
|
||||
KeyValue(key="integral", value=f"{integral:.1f}"),
|
||||
KeyValue(key="motor_cmd", value=f"{int(motor_cmd)}"),
|
||||
KeyValue(key="state", value=state_label),
|
||||
]
|
||||
diag.status.append(status)
|
||||
self._diag_pub.publish(diag)
|
||||
|
||||
def _publish_imu_fault(self, errno: int, stamp):
|
||||
"""On IMU fault frame, publish an ERROR diagnostic."""
|
||||
diag = DiagnosticArray()
|
||||
diag.header.stamp = stamp
|
||||
status = DiagnosticStatus()
|
||||
status.level = DiagnosticStatus.ERROR
|
||||
status.name = "saltybot/balance_controller"
|
||||
status.hardware_id = "stm32f722"
|
||||
status.message = f"IMU fault errno={errno}"
|
||||
diag.status.append(status)
|
||||
self._diag_pub.publish(diag)
|
||||
self.get_logger().error(f"STM32 reported IMU fault: errno={errno}")
|
||||
|
||||
def destroy_node(self):
|
||||
self._close_serial()
|
||||
super().destroy_node()
|
||||
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = SerialBridgeNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
4
jetson/ros2_ws/src/saltybot_bridge/setup.cfg
Normal file
4
jetson/ros2_ws/src/saltybot_bridge/setup.cfg
Normal file
@ -0,0 +1,4 @@
|
||||
[develop]
|
||||
script_dir=$base/lib/saltybot_bridge
|
||||
[install]
|
||||
install_scripts=$base/lib/saltybot_bridge
|
||||
27
jetson/ros2_ws/src/saltybot_bridge/setup.py
Normal file
27
jetson/ros2_ws/src/saltybot_bridge/setup.py
Normal file
@ -0,0 +1,27 @@
|
||||
from setuptools import setup
|
||||
|
||||
package_name = "saltybot_bridge"
|
||||
|
||||
setup(
|
||||
name=package_name,
|
||||
version="0.1.0",
|
||||
packages=[package_name],
|
||||
data_files=[
|
||||
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
|
||||
(f"share/{package_name}", ["package.xml"]),
|
||||
(f"share/{package_name}/launch", ["launch/bridge.launch.py"]),
|
||||
(f"share/{package_name}/config", ["config/bridge_params.yaml"]),
|
||||
],
|
||||
install_requires=["setuptools", "pyserial"],
|
||||
zip_safe=True,
|
||||
maintainer="sl-jetson",
|
||||
maintainer_email="sl-jetson@saltylab.local",
|
||||
description="STM32 USB CDC → ROS2 serial bridge for saltybot",
|
||||
license="MIT",
|
||||
tests_require=["pytest"],
|
||||
entry_points={
|
||||
"console_scripts": [
|
||||
"serial_bridge_node = saltybot_bridge.serial_bridge_node:main",
|
||||
],
|
||||
},
|
||||
)
|
||||
97
jetson/ros2_ws/src/saltybot_bridge/test/test_parse.py
Normal file
97
jetson/ros2_ws/src/saltybot_bridge/test/test_parse.py
Normal file
@ -0,0 +1,97 @@
|
||||
"""
|
||||
Unit tests for STM32 telemetry parsing logic.
|
||||
Run with: pytest jetson/ros2_ws/src/saltybot_bridge/test/test_parse.py
|
||||
"""
|
||||
|
||||
import json
|
||||
import pytest
|
||||
|
||||
|
||||
# ── Minimal stub so we can test parsing without a ROS2 runtime ───────────────
|
||||
|
||||
def parse_frame(raw: bytes):
|
||||
"""Mirror of the parsing logic in serial_bridge_node._parse_and_publish."""
|
||||
text = raw.decode("ascii", errors="ignore").strip()
|
||||
if not text:
|
||||
return None
|
||||
data = json.loads(text)
|
||||
|
||||
if "err" in data:
|
||||
return {"type": "fault", "errno": data["err"]}
|
||||
|
||||
required = ("p", "r", "e", "ig", "m", "s", "y")
|
||||
if not all(k in data for k in required):
|
||||
raise ValueError(f"Incomplete frame: {data}")
|
||||
|
||||
return {
|
||||
"type": "telemetry",
|
||||
"pitch_deg": data["p"] / 10.0,
|
||||
"roll_deg": data["r"] / 10.0,
|
||||
"yaw_deg": data["y"] / 10.0,
|
||||
"pid_error_deg": data["e"] / 10.0,
|
||||
"integral": data["ig"] / 10.0,
|
||||
"motor_cmd": int(data["m"]),
|
||||
"state": int(data["s"]),
|
||||
}
|
||||
|
||||
|
||||
# ── Tests ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
def test_normal_frame():
|
||||
raw = b'{"p":125,"r":-30,"e":50,"ig":20,"m":350,"s":1,"y":0}\n'
|
||||
result = parse_frame(raw)
|
||||
assert result["type"] == "telemetry"
|
||||
assert result["pitch_deg"] == pytest.approx(12.5)
|
||||
assert result["roll_deg"] == pytest.approx(-3.0)
|
||||
assert result["yaw_deg"] == pytest.approx(0.0)
|
||||
assert result["pid_error_deg"] == pytest.approx(5.0)
|
||||
assert result["integral"] == pytest.approx(2.0)
|
||||
assert result["motor_cmd"] == 350
|
||||
assert result["state"] == 1 # ARMED
|
||||
|
||||
|
||||
def test_fault_frame():
|
||||
raw = b'{"err":-1}\n'
|
||||
result = parse_frame(raw)
|
||||
assert result["type"] == "fault"
|
||||
assert result["errno"] == -1
|
||||
|
||||
|
||||
def test_zero_frame():
|
||||
raw = b'{"p":0,"r":0,"e":0,"ig":0,"m":0,"s":0,"y":0}\n'
|
||||
result = parse_frame(raw)
|
||||
assert result["type"] == "telemetry"
|
||||
assert result["pitch_deg"] == pytest.approx(0.0)
|
||||
assert result["state"] == 0 # DISARMED
|
||||
|
||||
|
||||
def test_tilt_fault_state():
|
||||
raw = b'{"p":450,"r":0,"e":400,"ig":999,"m":1000,"s":2,"y":0}\n'
|
||||
result = parse_frame(raw)
|
||||
assert result["state"] == 2 # TILT_FAULT
|
||||
assert result["pitch_deg"] == pytest.approx(45.0)
|
||||
assert result["motor_cmd"] == 1000
|
||||
|
||||
|
||||
def test_negative_motor_cmd():
|
||||
raw = b'{"p":-100,"r":0,"e":-100,"ig":-50,"m":-750,"s":1,"y":10}\n'
|
||||
result = parse_frame(raw)
|
||||
assert result["motor_cmd"] == -750
|
||||
assert result["pitch_deg"] == pytest.approx(-10.0)
|
||||
|
||||
|
||||
def test_incomplete_frame_raises():
|
||||
raw = b'{"p":100,"r":0}\n'
|
||||
with pytest.raises(ValueError, match="Incomplete frame"):
|
||||
parse_frame(raw)
|
||||
|
||||
|
||||
def test_empty_line_returns_none():
|
||||
assert parse_frame(b"\n") is None
|
||||
assert parse_frame(b" \n") is None
|
||||
|
||||
|
||||
def test_corrupt_frame_raises_json_error():
|
||||
raw = b'not_json\n'
|
||||
with pytest.raises(json.JSONDecodeError):
|
||||
parse_frame(raw)
|
||||
Loading…
x
Reference in New Issue
Block a user