saltylab-firmware/test/test_jlink_frames.py
sl-webui f71dad5344 feat(arch): migrate all STM32/Mamba/BlackPill refs to ESP32 BALANCE/IO + fix roslib@1.4.0
Architecture change (2026-04-03): Mamba F722S (STM32F722) and BlackPill
replaced by ESP32 BALANCE (PID loop) and ESP32 IO (motors/sensors/comms).

- Update CLAUDE.md, docs, chassis BOM/ASSEMBLY, pinout, power-budget,
  wiring-diagram, TEAM.md, AUTONOMOUS_ARMING.md, docker-compose
- Update all ROS2 package comments, config labels, launch args
  (stm32_port→esp32_port, /dev/stm32-bridge→/dev/esp32-bridge)
- Update WebUI: stm32Mode→esp32Mode, stm32Version→esp32Version,
  "STM32 State/Mode" labels → "ESP32 State/Mode" (ControlMode, SettingsPanel)
- Add TODO(esp32-migration) markers on stm32_protocol.py and mamba_protocol.py
  binary frame layouts — pending ESP32 protocol spec from max
- Fix roslib CDN 1.3.0→1.4.0 in all 11 HTML panels (fixes ROS2 Humble
  rosbridge "Received a message without an op" incompatibility)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-04 08:25:24 -04:00

417 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
test_jlink_frames.py — Issue #120: JLink binary protocol unit tests.
Tests CRC16-XModem, frame building, frame parsing (state machine),
command payload encoding, and telemetry frame layout.
No HAL or STM32/ESP32 hardware required — pure Python logic.
"""
import struct
import pytest
# ---------------------------------------------------------------------------
# CRC16-XModem (poly 0x1021, init 0x0000) — mirrors crc16_xmodem() in jlink.c
# ---------------------------------------------------------------------------
def crc16_xmodem(data: bytes) -> int:
crc = 0x0000
for byte in data:
crc ^= byte << 8
for _ in range(8):
if crc & 0x8000:
crc = (crc << 1) ^ 0x1021
else:
crc <<= 1
crc &= 0xFFFF
return crc
# ---------------------------------------------------------------------------
# Frame builder — mirrors jlink_send_telemetry() structure
# ---------------------------------------------------------------------------
JLINK_STX = 0x02
JLINK_ETX = 0x03
JLINK_CMD_HEARTBEAT = 0x01
JLINK_CMD_DRIVE = 0x02
JLINK_CMD_ARM = 0x03
JLINK_CMD_DISARM = 0x04
JLINK_CMD_PID_SET = 0x05
JLINK_CMD_ESTOP = 0x07
JLINK_TLM_STATUS = 0x80
def build_frame(cmd: int, payload: bytes = b"") -> bytes:
"""Build a JLink frame: [STX][LEN][CMD][PAYLOAD][CRC_hi][CRC_lo][ETX]"""
body = bytes([cmd]) + payload
length = len(body)
crc = crc16_xmodem(body)
return bytes([JLINK_STX, length, cmd]) + payload + bytes([crc >> 8, crc & 0xFF, JLINK_ETX])
def parse_frames(data: bytes) -> list:
"""
Parse all valid JLink frames from a byte stream.
Returns list of (cmd, payload) tuples for each valid frame.
"""
WAIT_STX, WAIT_LEN, WAIT_DATA, WAIT_CRC_HI, WAIT_CRC_LO, WAIT_ETX = range(6)
state = WAIT_STX
results = []
length = 0
buf = []
crc_hi = 0
for b in data:
if state == WAIT_STX:
if b == JLINK_STX:
state = WAIT_LEN
elif state == WAIT_LEN:
if b == 0 or b > 65:
state = WAIT_STX
else:
length = b
buf = []
state = WAIT_DATA
elif state == WAIT_DATA:
buf.append(b)
if len(buf) == length:
state = WAIT_CRC_HI
elif state == WAIT_CRC_HI:
crc_hi = b
state = WAIT_CRC_LO
elif state == WAIT_CRC_LO:
rx_crc = (crc_hi << 8) | b
calc_crc = crc16_xmodem(bytes(buf))
if rx_crc == calc_crc:
state = WAIT_ETX
else:
state = WAIT_STX
elif state == WAIT_ETX:
if b == JLINK_ETX:
cmd = buf[0]
payload = bytes(buf[1:])
results.append((cmd, payload))
state = WAIT_STX
return results
# ---------------------------------------------------------------------------
# CRC16 Tests
# ---------------------------------------------------------------------------
class TestCRC16XModem:
def test_empty(self):
assert crc16_xmodem(b"") == 0x0000
def test_known_vector_1(self):
# CRC16/XMODEM of b"123456789" = 0x31C3
assert crc16_xmodem(b"123456789") == 0x31C3
def test_single_zero(self):
assert crc16_xmodem(b"\x00") == 0x0000
def test_single_ff(self):
assert crc16_xmodem(b"\xFF") == 0x1EF0
def test_heartbeat_cmd_byte(self):
# CRC of just CMD=0x01
crc = crc16_xmodem(bytes([JLINK_CMD_HEARTBEAT]))
assert isinstance(crc, int)
assert 0 <= crc <= 0xFFFF
def test_drive_payload(self):
payload = struct.pack("<hh", 500, -300) # speed=500, steer=-300
body = bytes([JLINK_CMD_DRIVE]) + payload
crc = crc16_xmodem(body)
# Verify round-trip: same data same CRC
assert crc == crc16_xmodem(body)
def test_different_payloads_different_crc(self):
a = crc16_xmodem(b"\x02\x01\x02")
b = crc16_xmodem(b"\x02\x01\x03")
assert a != b
def test_zero_bytes(self):
assert crc16_xmodem(b"\x00\x00\x00\x00") != crc16_xmodem(b"\x00\x00\x00\x01")
# ---------------------------------------------------------------------------
# Frame Building Tests
# ---------------------------------------------------------------------------
class TestFrameBuilding:
def test_heartbeat_frame_length(self):
frame = build_frame(JLINK_CMD_HEARTBEAT)
# [STX][LEN=1][CMD=0x01][CRC_hi][CRC_lo][ETX] = 6 bytes
assert len(frame) == 6
def test_heartbeat_frame_markers(self):
frame = build_frame(JLINK_CMD_HEARTBEAT)
assert frame[0] == JLINK_STX
assert frame[-1] == JLINK_ETX
def test_heartbeat_frame_len_field(self):
frame = build_frame(JLINK_CMD_HEARTBEAT)
assert frame[1] == 1 # LEN = 1 (CMD only, no payload)
def test_heartbeat_frame_cmd(self):
frame = build_frame(JLINK_CMD_HEARTBEAT)
assert frame[2] == JLINK_CMD_HEARTBEAT
def test_drive_frame_length(self):
payload = struct.pack("<hh", 0, 0)
frame = build_frame(JLINK_CMD_DRIVE, payload)
# [STX][LEN=5][CMD][s16][s16][CRC_hi][CRC_lo][ETX] = 1+1+1+4+2+1 = 10 bytes
assert len(frame) == 10
def test_drive_frame_len_field(self):
payload = struct.pack("<hh", 0, 0)
frame = build_frame(JLINK_CMD_DRIVE, payload)
assert frame[1] == 5 # 1 (CMD) + 4 (payload)
def test_arm_frame(self):
frame = build_frame(JLINK_CMD_ARM)
assert len(frame) == 6
assert frame[2] == JLINK_CMD_ARM
def test_disarm_frame(self):
frame = build_frame(JLINK_CMD_DISARM)
assert len(frame) == 6
assert frame[2] == JLINK_CMD_DISARM
def test_estop_frame(self):
frame = build_frame(JLINK_CMD_ESTOP)
assert len(frame) == 6
assert frame[2] == JLINK_CMD_ESTOP
def test_pid_set_frame_length(self):
payload = struct.pack("<fff", 35.0, 1.0, 1.0)
frame = build_frame(JLINK_CMD_PID_SET, payload)
# [STX][LEN=13][CMD][12 bytes floats][CRC_hi][CRC_lo][ETX] = 18 bytes
assert len(frame) == 18
def test_pid_set_frame_len_field(self):
payload = struct.pack("<fff", 35.0, 1.0, 1.0)
frame = build_frame(JLINK_CMD_PID_SET, payload)
assert frame[1] == 13 # 1 (CMD) + 12 (three floats)
def test_crc_coverage(self):
"""CRC must differ if payload differs by one bit."""
payload_a = struct.pack("<hh", 500, 100)
payload_b = struct.pack("<hh", 500, 101)
frame_a = build_frame(JLINK_CMD_DRIVE, payload_a)
frame_b = build_frame(JLINK_CMD_DRIVE, payload_b)
# Extract CRC bytes (second to last two bytes before ETX)
crc_a = (frame_a[-3] << 8) | frame_a[-2]
crc_b = (frame_b[-3] << 8) | frame_b[-2]
assert crc_a != crc_b
# ---------------------------------------------------------------------------
# Frame Parsing Tests
# ---------------------------------------------------------------------------
class TestFrameParsing:
def test_parse_single_heartbeat(self):
frame = build_frame(JLINK_CMD_HEARTBEAT)
results = parse_frames(frame)
assert len(results) == 1
cmd, payload = results[0]
assert cmd == JLINK_CMD_HEARTBEAT
assert payload == b""
def test_parse_drive_cmd(self):
payload = struct.pack("<hh", 750, -200)
frame = build_frame(JLINK_CMD_DRIVE, payload)
results = parse_frames(frame)
assert len(results) == 1
cmd, data = results[0]
assert cmd == JLINK_CMD_DRIVE
speed, steer = struct.unpack("<hh", data)
assert speed == 750
assert steer == -200
def test_parse_multiple_frames(self):
stream = (
build_frame(JLINK_CMD_HEARTBEAT) +
build_frame(JLINK_CMD_ARM) +
build_frame(JLINK_CMD_DRIVE, struct.pack("<hh", 100, 50))
)
results = parse_frames(stream)
assert len(results) == 3
assert results[0][0] == JLINK_CMD_HEARTBEAT
assert results[1][0] == JLINK_CMD_ARM
assert results[2][0] == JLINK_CMD_DRIVE
def test_bad_crc_dropped(self):
frame = bytearray(build_frame(JLINK_CMD_HEARTBEAT))
frame[-2] ^= 0xFF # Corrupt CRC low byte
results = parse_frames(bytes(frame))
assert len(results) == 0
def test_bad_etx_dropped(self):
frame = bytearray(build_frame(JLINK_CMD_HEARTBEAT))
frame[-1] = 0xFF # Wrong ETX
results = parse_frames(bytes(frame))
assert len(results) == 0
def test_noise_before_frame(self):
noise = bytes([0xAA, 0xBB, 0xCC, 0xDD])
frame = build_frame(JLINK_CMD_ARM)
results = parse_frames(noise + frame)
assert len(results) == 1
assert results[0][0] == JLINK_CMD_ARM
def test_two_frames_concatenated(self):
f1 = build_frame(JLINK_CMD_DISARM)
f2 = build_frame(JLINK_CMD_HEARTBEAT)
results = parse_frames(f1 + f2)
assert len(results) == 2
def test_pid_set_parsed_correctly(self):
kp, ki, kd = 42.5, 0.5, 2.0
payload = struct.pack("<fff", kp, ki, kd)
frame = build_frame(JLINK_CMD_PID_SET, payload)
results = parse_frames(frame)
assert len(results) == 1
cmd, data = results[0]
assert cmd == JLINK_CMD_PID_SET
got_kp, got_ki, got_kd = struct.unpack("<fff", data)
assert abs(got_kp - kp) < 1e-5
assert abs(got_ki - ki) < 1e-5
assert abs(got_kd - kd) < 1e-5
def test_drive_boundary_values(self):
for spd, str_ in [(1000, 1000), (-1000, -1000), (0, 0)]:
frame = build_frame(JLINK_CMD_DRIVE, struct.pack("<hh", spd, str_))
results = parse_frames(frame)
assert len(results) == 1
speed, steer = struct.unpack("<hh", results[0][1])
assert speed == spd
assert steer == str_
# ---------------------------------------------------------------------------
# Telemetry STATUS Frame Tests
# ---------------------------------------------------------------------------
# STATUS payload struct: <hhhhH b B B B B B B B B B
# pitch_x10, roll_x10, yaw_x10, motor_cmd, vbat_mv,
# rssi_dbm, link_quality, balance_state, rc_armed,
# mode, estop, soc_pct, fw_major, fw_minor, fw_patch
STATUS_FMT = "<hhhhHbBBBBBBBBB" # 20 bytes
STATUS_SIZE = struct.calcsize(STATUS_FMT)
def make_status(pitch=0, roll=0, yaw=0, motor_cmd=0, vbat_mv=12600,
rssi_dbm=-85, link_quality=95, balance_state=1,
rc_armed=1, mode=2, estop=0, soc_pct=80,
fw_major=1, fw_minor=0, fw_patch=0) -> bytes:
return struct.pack(STATUS_FMT,
pitch, roll, yaw, motor_cmd, vbat_mv,
rssi_dbm, link_quality, balance_state, rc_armed,
mode, estop, soc_pct, fw_major, fw_minor, fw_patch)
class TestStatusTelemetry:
def test_status_payload_size(self):
assert STATUS_SIZE == 20
def test_status_frame_total_length(self):
payload = make_status()
frame = build_frame(JLINK_TLM_STATUS, payload)
# [STX][LEN=21][0x80][20 bytes][CRC_hi][CRC_lo][ETX] = 26 bytes
assert len(frame) == 26
def test_status_len_field(self):
payload = make_status()
frame = build_frame(JLINK_TLM_STATUS, payload)
assert frame[1] == 21 # 1 (CMD) + 20 (STATUS payload)
def test_status_cmd_byte(self):
payload = make_status()
frame = build_frame(JLINK_TLM_STATUS, payload)
assert frame[2] == JLINK_TLM_STATUS
def test_status_parseable(self):
payload = make_status(pitch=123, roll=-45, yaw=678, motor_cmd=500,
vbat_mv=11800, rssi_dbm=-90, link_quality=80,
balance_state=1, rc_armed=1, mode=2, estop=0,
soc_pct=60, fw_major=1, fw_minor=0, fw_patch=0)
frame = build_frame(JLINK_TLM_STATUS, payload)
results = parse_frames(frame)
assert len(results) == 1
cmd, data = results[0]
assert cmd == JLINK_TLM_STATUS
vals = struct.unpack(STATUS_FMT, data)
pitch, roll, yaw, motor_cmd, vbat_mv, rssi_dbm, lq, bs, ra, mode, es, soc, fwma, fwmi, fwpa = vals
assert pitch == 123
assert roll == -45
assert yaw == 678
assert motor_cmd == 500
assert vbat_mv == 11800
assert rssi_dbm == -90
assert lq == 80
assert bs == 1
assert ra == 1
assert mode == 2
assert es == 0
assert soc == 60
assert fwma == 1
assert fwmi == 0
assert fwpa == 0
def test_pitch_encoding(self):
"""pitch_deg * 10 fits in int16 for ±3276 degrees."""
for deg in [-20.0, 0.0, 5.5, 15.3, -10.1]:
encoded = int(deg * 10)
payload = make_status(pitch=encoded)
frame = build_frame(JLINK_TLM_STATUS, payload)
results = parse_frames(frame)
assert len(results) == 1
vals = struct.unpack(STATUS_FMT, results[0][1])
assert vals[0] == encoded
def test_vbat_range_3s(self):
"""3S battery 990012600 mV fits in uint16."""
for mv in [9900, 11100, 12600]:
payload = make_status(vbat_mv=mv)
frame = build_frame(JLINK_TLM_STATUS, payload)
results = parse_frames(frame)
vals = struct.unpack(STATUS_FMT, results[0][1])
assert vals[4] == mv
def test_vbat_range_4s(self):
"""4S battery 1320016800 mV fits in uint16."""
for mv in [13200, 14800, 16800]:
payload = make_status(vbat_mv=mv)
frame = build_frame(JLINK_TLM_STATUS, payload)
results = parse_frames(frame)
vals = struct.unpack(STATUS_FMT, results[0][1])
assert vals[4] == mv
def test_fw_version_fields(self):
payload = make_status(fw_major=2, fw_minor=3, fw_patch=7)
frame = build_frame(JLINK_TLM_STATUS, payload)
results = parse_frames(frame)
vals = struct.unpack(STATUS_FMT, results[0][1])
assert vals[-3] == 2
assert vals[-2] == 3
assert vals[-1] == 7
def test_soc_pct_unknown(self):
"""soc_pct=255 encodes as 0xFF (unknown)."""
payload = make_status(soc_pct=255)
frame = build_frame(JLINK_TLM_STATUS, payload)
results = parse_frames(frame)
vals = struct.unpack(STATUS_FMT, results[0][1])
assert vals[11] == 255