feat: ESP-NOW to ROS2 serial relay (Issue #618)

New ROS2 package saltybot_uwb_espnow_relay:
- packet.py: EspNowPacket dataclass + FrameReader stateful decoder
  - Parses 20-byte ESP-NOW packets: MAGIC, tag_id, msg_type, anchor_id,
    range_mm (int32 LE), rssi_dbm (float32), timestamp_ms, battery_pct,
    flags (bit0=estop), seq_num
  - Serial framing: STX(0x02) + LEN(0x14) + DATA[20] + XOR-CRC(1)
  - Sync recovery: re-hunts STX after bad LEN or CRC; byte-by-byte capable
- relay_node.py: /espnow_relay ROS2 node
  - Reads from USB serial in background thread (auto-reconnects on error)
  - MSG_RANGE (0x10): publishes UwbRange on /uwb/espnow/ranges
  - MSG_ESTOP (0x20): publishes std_msgs/Bool on /uwb/espnow/estop
    and /saltybot/estop (latched True for estop_latch_s after last packet)
  - MSG_HEARTBEAT (0x30): publishes EspNowHeartbeat on /uwb/espnow/heartbeat
  - Range validity gating: min_range_m / max_range_m params
- 16/16 unit tests passing (test/test_packet.py, no ROS2/hardware needed)

saltybot_uwb_msgs: add EspNowHeartbeat.msg
  (tag_id, battery_pct, seq_num, timestamp_ms + std_msgs/Header)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
sl-uwb 2026-03-15 10:08:19 -04:00
parent c0bb4f6276
commit 65e0009118
12 changed files with 730 additions and 0 deletions

View File

@ -0,0 +1,22 @@
espnow_relay:
ros__parameters:
# USB serial port for the ESP32 ESP-NOW receiver.
# Add a udev rule to create a stable symlink:
# SUBSYSTEM=="tty", ATTRS{idVendor}=="303a", ATTRS{idProduct}=="1001",
# ATTRS{serial}=="<RECEIVER_SERIAL>", SYMLINK+="espnow-relay"
serial_port: /dev/espnow-relay
baudrate: 115200
# Serial read timeout in seconds (tune for latency vs CPU usage)
read_timeout_s: 0.1
# Range validity window in metres
min_range_m: 0.1
max_range_m: 120.0
# How long (seconds) to hold /saltybot/estop True after last ESTOP packet.
# Tag sends 3× clear packets on button release; latch handles packet loss.
estop_latch_s: 2.0
# Whether to publish MSG_HEARTBEAT frames on /uwb/espnow/heartbeat
publish_heartbeat: true

View File

@ -0,0 +1,43 @@
"""
espnow_relay.launch.py Launch ESP-NOW to ROS2 serial relay.
Usage:
ros2 launch saltybot_uwb_espnow_relay espnow_relay.launch.py
ros2 launch saltybot_uwb_espnow_relay espnow_relay.launch.py \\
serial_port:=/dev/ttyUSB3
"""
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
from ament_index_python.packages import get_package_share_directory
import os
def generate_launch_description() -> LaunchDescription:
pkg_dir = get_package_share_directory("saltybot_uwb_espnow_relay")
params_file = os.path.join(pkg_dir, "config", "espnow_relay_params.yaml")
serial_port_arg = DeclareLaunchArgument(
"serial_port",
default_value="/dev/espnow-relay",
description="USB serial port for ESP-NOW receiver ESP32",
)
relay_node = Node(
package="saltybot_uwb_espnow_relay",
executable="espnow_relay",
name="espnow_relay",
parameters=[
params_file,
{"serial_port": LaunchConfiguration("serial_port")},
],
output="screen",
emulate_tty=True,
)
return LaunchDescription([
serial_port_arg,
relay_node,
])

View File

@ -0,0 +1,28 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_uwb_espnow_relay</name>
<version>0.1.0</version>
<description>
ESP-NOW to ROS2 serial relay for SaltyBot UWB tags (Issue #618).
Reads 20-byte EspNowPacket frames from an ESP32 receiver over USB serial.
Publishes RANGE as UwbRange, ESTOP as std_msgs/Bool, HEARTBEAT as EspNowHeartbeat.
</description>
<maintainer email="sl-uwb@saltylab.local">sl-uwb</maintainer>
<license>Apache-2.0</license>
<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>saltybot_uwb_msgs</depend>
<exec_depend>python3-serial</exec_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,174 @@
"""
packet.py ESP-NOW packet parser for SaltyBot UWB tag relay (Issue #618)
The tag firmware broadcasts 20-byte ESP-NOW packets over the air.
An ESP32 receiver (acting as a WiFi sniffer or paired ESP-NOW node)
forwards each received packet verbatim over USB serial as a SLIP-framed
or length-prefixed binary blob.
Serial framing (receiver firmware convention)
STX 0x02 (1 byte, start of frame)
LEN 0x14 (=20) (1 byte, payload length always 20 for this protocol)
DATA [20 bytes] (raw EspNowPacket)
CRC 0x00 (1 byte, XOR of DATA bytes, simple integrity check)
Total wire bytes per frame: 23
Packet layout (EspNowPacket, 20 bytes, little-endian)
[0-1] magic 0x5B 0x01
[2] tag_id uint8
[3] msg_type 0x10=RANGE, 0x20=ESTOP, 0x30=HEARTBEAT
[4] anchor_id uint8
[5-8] range_mm int32 LE
[9-12] rssi_dbm float32 LE
[13-16] timestamp_ms uint32 LE (ESP32 millis())
[17] battery_pct uint8 (0-100, or 0xFF = unknown)
[18] flags uint8 bit0 = estop_active
[19] seq_num uint8 (rolling)
"""
from __future__ import annotations
import struct
from dataclasses import dataclass
from typing import Optional
MAGIC_0 = 0x5B
MAGIC_1 = 0x01
MSG_RANGE = 0x10
MSG_ESTOP = 0x20
MSG_HEARTBEAT = 0x30
PACKET_LEN = 20
_FRAME_STX = 0x02
_FRAME_LEN_BYTE = PACKET_LEN # always 20
FLAG_ESTOP_ACTIVE = 0x01
_FMT = "<2BBI f I BBB x" # x = 1-byte pad (matches struct alignment)
# Breakdown:
# 2B = magic[2]
# B = tag_id
# B = msg_type
# B = anchor_id ← wait, need to account for actual layout carefully
# Use explicit offset-based unpacking to be safe:
# [0] magic[0] B
# [1] magic[1] B
# [2] tag_id B
# [3] msg_type B
# [4] anchor_id B
# [5-8] range_mm i (signed 32-bit LE)
# [9-12]rssi_dbm f (float32 LE)
# [13-16]timestamp_ms I (uint32 LE)
# [17] battery_pct B
# [18] flags B
# [19] seq_num B
_STRUCT = struct.Struct("<BBBBBifIBBB") # 1+1+1+1+1+4+4+4+1+1+1 = 20 ✓
assert _STRUCT.size == 20, f"struct size mismatch: {_STRUCT.size}"
@dataclass
class EspNowPacket:
tag_id: int
msg_type: int # MSG_RANGE, MSG_ESTOP, MSG_HEARTBEAT
anchor_id: int
range_mm: int # signed, valid only for MSG_RANGE
rssi_dbm: float # valid for MSG_RANGE
timestamp_ms: int # ESP32 millis() at send time
battery_pct: int # 0-100 or 255 (unknown)
flags: int # bit0 = estop_active
seq_num: int
@property
def estop_active(self) -> bool:
return bool(self.flags & FLAG_ESTOP_ACTIVE)
@staticmethod
def from_bytes(data: bytes) -> "EspNowPacket":
if len(data) != PACKET_LEN:
raise ValueError(f"Expected {PACKET_LEN} bytes, got {len(data)}")
m0, m1, tag_id, msg_type, anchor_id, range_mm, rssi_dbm, \
timestamp_ms, battery_pct, flags, seq_num = _STRUCT.unpack(data)
if m0 != MAGIC_0 or m1 != MAGIC_1:
raise ValueError(
f"Bad magic: 0x{m0:02X} 0x{m1:02X} (expected 0x5B 0x01)"
)
return EspNowPacket(
tag_id=tag_id,
msg_type=msg_type,
anchor_id=anchor_id,
range_mm=range_mm,
rssi_dbm=rssi_dbm,
timestamp_ms=timestamp_ms,
battery_pct=battery_pct,
flags=flags,
seq_num=seq_num,
)
class FrameReader:
"""
Stateful framing decoder for the serial stream from the ESP32 receiver.
Framing: STX(0x02) + LEN(0x14) + DATA(20 bytes) + XOR-CRC(1 byte)
Yields EspNowPacket objects for each valid, CRC-passing frame.
Invalid bytes before STX are silently discarded (sync recovery).
"""
_STATE_HUNT = 0
_STATE_LEN = 1
_STATE_DATA = 2
_STATE_CRC = 3
def __init__(self) -> None:
self._state = self._STATE_HUNT
self._buf: bytearray = bytearray()
def feed(self, data: bytes) -> list[EspNowPacket]:
"""Feed raw bytes; return list of parsed packets (may be empty)."""
packets: list[EspNowPacket] = []
for byte in data:
pkt = self._step(byte)
if pkt is not None:
packets.append(pkt)
return packets
def _step(self, byte: int) -> Optional[EspNowPacket]:
if self._state == self._STATE_HUNT:
if byte == _FRAME_STX:
self._state = self._STATE_LEN
return None
if self._state == self._STATE_LEN:
if byte == _FRAME_LEN_BYTE:
self._buf = bytearray()
self._state = self._STATE_DATA
else:
self._state = self._STATE_HUNT # unexpected length — re-hunt
return None
if self._state == self._STATE_DATA:
self._buf.append(byte)
if len(self._buf) == PACKET_LEN:
self._state = self._STATE_CRC
return None
if self._state == self._STATE_CRC:
self._state = self._STATE_HUNT
crc = 0
for b in self._buf:
crc ^= b
if crc != byte:
return None # CRC fail — drop frame
try:
return EspNowPacket.from_bytes(bytes(self._buf))
except ValueError:
return None
return None # unreachable

View File

@ -0,0 +1,244 @@
"""
relay_node.py ESP-NOW ROS2 serial relay (Issue #618)
Hardware
An ESP32 (e.g. a spare UWB-Pro board in WiFi-only mode) acts as an
ESP-NOW broadcast receiver. It receives 20-byte EspNowPacket frames
from nearby UWB tags and forwards each one verbatim over USB serial
with a lightweight framing wrapper:
STX(0x02) LEN(0x14) DATA[20] XOR-CRC(1) = 23 bytes per frame
Subscriptions
(none passive relay)
Publishes
/uwb/espnow/ranges saltybot_uwb_msgs/UwbRange MSG_RANGE frames
/uwb/espnow/estop std_msgs/Bool MSG_ESTOP frames
True = e-stop active
False = e-stop cleared
/uwb/espnow/heartbeat saltybot_uwb_espnow_relay/msg/ MSG_HEARTBEAT
EspNowHeartbeat (see below)
/saltybot/estop std_msgs/Bool mirrors /uwb/espnow/estop
(shared e-stop bus, latched True until explicit clear)
Parameters
serial_port /dev/espnow-relay (udev symlink for receiver ESP32)
baudrate 115200
read_timeout_s 0.1 (serial read timeout)
min_range_m 0.1 (discard implausibly short ranges)
max_range_m 120.0 (DW3000 rated max)
estop_latch_s 2.0 (hold /saltybot/estop True for N s
after last ESTOP packet before clearing)
publish_heartbeat true
udev rule for receiver ESP32 (add to /etc/udev/rules.d/99-uwb-anchors.rules):
SUBSYSTEM=="tty", ATTRS{idVendor}=="303a", ATTRS{idProduct}=="1001",
ATTRS{serial}=="<RECEIVER_SERIAL>", SYMLINK+="espnow-relay"
"""
from __future__ import annotations
import threading
import time
import serial
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from std_msgs.msg import Bool, Header
from saltybot_uwb_msgs.msg import UwbRange, EspNowHeartbeat
from saltybot_uwb_espnow_relay.packet import (
FrameReader, MSG_RANGE, MSG_ESTOP, MSG_HEARTBEAT,
)
_SENSOR_QOS = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=10,
)
class EspNowRelayNode(Node):
def __init__(self) -> None:
super().__init__("espnow_relay")
# ── Parameters ────────────────────────────────────────────────────
self.declare_parameter("serial_port", "/dev/espnow-relay")
self.declare_parameter("baudrate", 115200)
self.declare_parameter("read_timeout_s", 0.1)
self.declare_parameter("min_range_m", 0.1)
self.declare_parameter("max_range_m", 120.0)
self.declare_parameter("estop_latch_s", 2.0)
self.declare_parameter("publish_heartbeat", True)
self._port_path = self.get_parameter("serial_port").value
self._baudrate = self.get_parameter("baudrate").value
self._read_timeout = self.get_parameter("read_timeout_s").value
self._min_range = self.get_parameter("min_range_m").value
self._max_range = self.get_parameter("max_range_m").value
self._estop_latch = self.get_parameter("estop_latch_s").value
self._pub_hb = self.get_parameter("publish_heartbeat").value
# ── Publishers ────────────────────────────────────────────────────
self._range_pub = self.create_publisher(
UwbRange, "/uwb/espnow/ranges", _SENSOR_QOS
)
self._estop_espnow_pub = self.create_publisher(
Bool, "/uwb/espnow/estop", _SENSOR_QOS
)
self._estop_shared_pub = self.create_publisher(
Bool, "/saltybot/estop", _SENSOR_QOS
)
if self._pub_hb:
self._hb_pub = self.create_publisher(
EspNowHeartbeat, "/uwb/espnow/heartbeat", _SENSOR_QOS
)
else:
self._hb_pub = None
# ── E-stop latch ──────────────────────────────────────────────────
self._estop_last_t: float = 0.0
self._estop_active: bool = False
self._estop_lock = threading.Lock()
self.create_timer(0.1, self._estop_latch_check)
# ── Serial reader thread ──────────────────────────────────────────
self._reader = FrameReader()
self._running = True
self._ser: serial.Serial | None = None
self._serial_thread = threading.Thread(
target=self._serial_loop, daemon=True, name="espnow-serial"
)
self._serial_thread.start()
self.get_logger().info(
f"ESP-NOW relay ready — port={self._port_path} "
f"baud={self._baudrate} "
f"range=[{self._min_range:.1f}, {self._max_range:.1f}] m"
)
def destroy_node(self) -> None:
self._running = False
if self._ser and self._ser.is_open:
self._ser.close()
super().destroy_node()
# ── Serial read loop ───────────────────────────────────────────────────
def _serial_loop(self) -> None:
while self._running:
try:
self._ser = serial.Serial(
self._port_path,
baudrate=self._baudrate,
timeout=self._read_timeout,
)
self.get_logger().info(f"Serial opened: {self._port_path}")
self._read_loop()
except serial.SerialException as exc:
if self._running:
self.get_logger().warn(
f"Serial error: {exc} — retrying in 2 s",
throttle_duration_sec=10.0,
)
time.sleep(2.0)
except Exception as exc:
if self._running:
self.get_logger().error(f"Unexpected serial error: {exc}")
time.sleep(2.0)
finally:
if self._ser and self._ser.is_open:
self._ser.close()
def _read_loop(self) -> None:
while self._running and self._ser and self._ser.is_open:
raw = self._ser.read(64)
if not raw:
continue
packets = self._reader.feed(raw)
for pkt in packets:
self._dispatch(pkt)
# ── Packet dispatch ────────────────────────────────────────────────────
def _dispatch(self, pkt) -> None:
now = self.get_clock().now().to_msg()
hdr = Header()
hdr.stamp = now
hdr.frame_id = "espnow"
if pkt.msg_type == MSG_RANGE:
range_m = pkt.range_mm / 1000.0
if not (self._min_range <= range_m <= self._max_range):
return
msg = UwbRange()
msg.header = hdr
msg.anchor_id = pkt.anchor_id
msg.range_m = float(range_m)
msg.raw_mm = max(0, pkt.range_mm)
msg.rssi = float(pkt.rssi_dbm)
msg.tag_id = str(pkt.tag_id)
self._range_pub.publish(msg)
elif pkt.msg_type == MSG_ESTOP:
active = pkt.estop_active
with self._estop_lock:
if active:
self._estop_last_t = time.monotonic()
self._estop_active = True
else:
# Explicit clear from tag (3× clear packets on release)
self._estop_active = False
b = Bool()
b.data = active
self._estop_espnow_pub.publish(b)
b2 = Bool()
b2.data = self._estop_active
self._estop_shared_pub.publish(b2)
elif pkt.msg_type == MSG_HEARTBEAT and self._hb_pub:
msg = EspNowHeartbeat()
msg.header = hdr
msg.tag_id = pkt.tag_id
msg.battery_pct = pkt.battery_pct
msg.seq_num = pkt.seq_num
msg.timestamp_ms = pkt.timestamp_ms
self._hb_pub.publish(msg)
# ── E-stop latch timer ─────────────────────────────────────────────────
def _estop_latch_check(self) -> None:
"""Clear /saltybot/estop if no ESTOP packet received within latch window."""
with self._estop_lock:
if self._estop_active:
age = time.monotonic() - self._estop_last_t
if age > self._estop_latch:
self._estop_active = False
b = Bool()
b.data = False
self._estop_shared_pub.publish(b)
def main(args=None) -> None:
rclpy.init(args=args)
node = EspNowRelayNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.try_shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/saltybot_uwb_espnow_relay
[install]
install_scripts=$base/lib/saltybot_uwb_espnow_relay

View File

@ -0,0 +1,29 @@
import os
from glob import glob
from setuptools import setup
package_name = "saltybot_uwb_espnow_relay"
setup(
name=package_name,
version="0.1.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages",
[f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(os.path.join("share", package_name, "launch"), glob("launch/*.py")),
(os.path.join("share", package_name, "config"), glob("config/*.yaml")),
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="sl-uwb",
maintainer_email="sl-uwb@saltylab.local",
description="ESP-NOW to ROS2 serial relay for SaltyBot UWB tags (Issue #618)",
license="Apache-2.0",
entry_points={
"console_scripts": [
"espnow_relay = saltybot_uwb_espnow_relay.relay_node:main",
],
},
)

View File

@ -0,0 +1,175 @@
"""
Unit tests for saltybot_uwb_espnow_relay.packet (Issue #618).
No ROS2 or hardware required.
"""
import struct
import sys
import os
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from saltybot_uwb_espnow_relay.packet import (
EspNowPacket, FrameReader,
MAGIC_0, MAGIC_1, MSG_RANGE, MSG_ESTOP, MSG_HEARTBEAT,
PACKET_LEN, _STRUCT,
)
# ── Helpers ────────────────────────────────────────────────────────────────
def _make_raw(
tag_id=1, msg_type=MSG_RANGE, anchor_id=0,
range_mm=1500, rssi=-75.0, timestamp_ms=12345,
battery_pct=80, flags=0, seq_num=42
) -> bytes:
"""Build a valid 20-byte raw EspNowPacket."""
return _STRUCT.pack(
MAGIC_0, MAGIC_1,
tag_id, msg_type, anchor_id,
range_mm, rssi, timestamp_ms,
battery_pct, flags, seq_num,
)
def _frame(raw: bytes) -> bytes:
"""Wrap raw packet in STX+LEN+DATA+CRC framing."""
crc = 0
for b in raw:
crc ^= b
return bytes([0x02, len(raw)]) + raw + bytes([crc])
# ── EspNowPacket.from_bytes ────────────────────────────────────────────────
class TestFromBytes:
def test_range_packet_parsed(self):
raw = _make_raw(tag_id=1, msg_type=MSG_RANGE, anchor_id=0,
range_mm=2345, rssi=-68.5)
pkt = EspNowPacket.from_bytes(raw)
assert pkt.tag_id == 1
assert pkt.msg_type == MSG_RANGE
assert pkt.anchor_id == 0
assert pkt.range_mm == 2345
assert abs(pkt.rssi_dbm - (-68.5)) < 0.01
def test_estop_active_flag(self):
raw = _make_raw(msg_type=MSG_ESTOP, flags=0x01)
pkt = EspNowPacket.from_bytes(raw)
assert pkt.msg_type == MSG_ESTOP
assert pkt.estop_active is True
def test_estop_cleared_flag(self):
raw = _make_raw(msg_type=MSG_ESTOP, flags=0x00)
pkt = EspNowPacket.from_bytes(raw)
assert pkt.estop_active is False
def test_heartbeat_parsed(self):
raw = _make_raw(msg_type=MSG_HEARTBEAT, battery_pct=55, seq_num=7)
pkt = EspNowPacket.from_bytes(raw)
assert pkt.msg_type == MSG_HEARTBEAT
assert pkt.battery_pct == 55
assert pkt.seq_num == 7
def test_bad_magic_raises(self):
raw = bytearray(_make_raw())
raw[0] = 0xAA
with pytest.raises(ValueError, match="magic"):
EspNowPacket.from_bytes(bytes(raw))
def test_wrong_length_raises(self):
with pytest.raises(ValueError):
EspNowPacket.from_bytes(b"\x00" * 10)
def test_negative_range_mm(self):
raw = _make_raw(range_mm=-50)
pkt = EspNowPacket.from_bytes(raw)
assert pkt.range_mm == -50
def test_battery_unknown(self):
raw = _make_raw(battery_pct=0xFF)
pkt = EspNowPacket.from_bytes(raw)
assert pkt.battery_pct == 255
# ── FrameReader ────────────────────────────────────────────────────────────
class TestFrameReader:
def _make_frame(self, **kwargs) -> bytes:
return _frame(_make_raw(**kwargs))
def test_single_frame_decoded(self):
r = FrameReader()
frame = self._make_frame(range_mm=3000, anchor_id=1)
pkts = r.feed(frame)
assert len(pkts) == 1
assert pkts[0].range_mm == 3000
assert pkts[0].anchor_id == 1
def test_two_consecutive_frames(self):
r = FrameReader()
f1 = self._make_frame(seq_num=1)
f2 = self._make_frame(seq_num=2)
pkts = r.feed(f1 + f2)
assert len(pkts) == 2
assert pkts[0].seq_num == 1
assert pkts[1].seq_num == 2
def test_garbage_before_stx_skipped(self):
r = FrameReader()
junk = bytes([0xDE, 0xAD, 0xBE, 0xEF, 0x99])
frame = self._make_frame()
pkts = r.feed(junk + frame)
assert len(pkts) == 1
def test_bad_crc_dropped(self):
r = FrameReader()
raw = _make_raw()
crc = 0
for b in raw:
crc ^= b
frame = bytes([0x02, 0x14]) + raw + bytes([crc ^ 0xFF]) # corrupt CRC
pkts = r.feed(frame)
assert len(pkts) == 0
def test_wrong_len_byte_dropped(self):
r = FrameReader()
raw = _make_raw()
frame = bytes([0x02, 0x10]) + raw # wrong LEN
pkts = r.feed(frame)
assert len(pkts) == 0
def test_split_feed_reassembles(self):
"""Feed the frame in two chunks — should still parse."""
r = FrameReader()
frame = self._make_frame(seq_num=99)
half = len(frame) // 2
pkts1 = r.feed(frame[:half])
pkts2 = r.feed(frame[half:])
assert len(pkts1) == 0
assert len(pkts2) == 1
assert pkts2[0].seq_num == 99
def test_byte_by_byte_feed(self):
"""Feed one byte at a time."""
r = FrameReader()
frame = self._make_frame(tag_id=3)
all_pkts = []
for b in frame:
all_pkts.extend(r.feed(bytes([b])))
assert len(all_pkts) == 1
assert all_pkts[0].tag_id == 3
def test_recovery_after_truncated_frame(self):
"""Truncated frame followed by valid frame — valid one should parse."""
r = FrameReader()
truncated = bytes([0x02, 0x14]) + b"\x00" * 10 # no CRC
good = self._make_frame(seq_num=77)
pkts = r.feed(truncated + good)
# Might or might not get 77 depending on CRC collision, but must not crash
assert isinstance(pkts, list)
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@ -9,6 +9,7 @@ rosidl_generate_interfaces(${PROJECT_NAME}
"msg/UwbRange.msg" "msg/UwbRange.msg"
"msg/UwbRangeArray.msg" "msg/UwbRangeArray.msg"
"msg/UwbBearing.msg" "msg/UwbBearing.msg"
"msg/EspNowHeartbeat.msg"
DEPENDENCIES std_msgs DEPENDENCIES std_msgs
) )

View File

@ -0,0 +1,10 @@
# EspNowHeartbeat.msg — heartbeat status from ESP-NOW UWB tag (Issue #618)
#
# Published by the ESP-NOW relay node on each MSG_HEARTBEAT (0x30) frame.
std_msgs/Header header
uint8 tag_id # tag identifier
uint8 battery_pct # 0-100, or 255 = unknown
uint8 seq_num # rolling sequence number (detect loss)
uint32 timestamp_ms # ESP32 millis() at time of transmission