Archive STM32 firmware to legacy/stm32/: - src/, include/, lib/USB_CDC/, platformio.ini, test stubs, flash_firmware.py - test/test_battery_adc.c, test_hw_button.c, test_pid_schedule.c, test_vesc_can.c, test_can_watchdog.c - USB_CDC_BUG.md Rename: stm32_protocol → esp32_protocol, mamba_protocol → balance_protocol, stm32_cmd_node → esp32_cmd_node, stm32_cmd_params → esp32_cmd_params, stm32_cmd.launch.py → esp32_cmd.launch.py, test_stm32_protocol → test_esp32_protocol, test_stm32_cmd_node → test_esp32_cmd_node Content cleanup across all files: - Mamba F722S → ESP32-S3 BALANCE - BlackPill → ESP32-S3 IO - STM32F722/F7xx → ESP32-S3 - stm32Mode/Version/Port → esp32Mode/Version/Port - STM32 State/Mode labels → ESP32 State/Mode - Jetson Nano → Jetson Orin Nano Super - /dev/stm32 → /dev/esp32 - stm32_bridge → esp32_bridge - STM32 HAL → ESP-IDF docs/SALTYLAB.md: - Update "Drone FC Details" to describe ESP32-S3 BALANCE board (Waveshare ESP32-S3 Touch LCD 1.28) - Replace verbose "Self-Balancing Control" STM32 section with brief note pointing to SAUL-TEE-SYSTEM-REFERENCE.md TEAM.md: Update Embedded Firmware Engineer role to ESP32-S3 / ESP-IDF No new functionality — cleanup only. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
417 lines
14 KiB
Python
417 lines
14 KiB
Python
"""
|
||
test_jlink_frames.py — Issue #120: JLink binary protocol unit tests.
|
||
|
||
Tests CRC16-XModem, frame building, frame parsing (state machine),
|
||
command payload encoding, and telemetry frame layout.
|
||
|
||
No HAL or ESP32-S3 hardware required — pure Python logic.
|
||
"""
|
||
|
||
import struct
|
||
import pytest
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# CRC16-XModem (poly 0x1021, init 0x0000) — mirrors crc16_xmodem() in jlink.c
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def crc16_xmodem(data: bytes) -> int:
|
||
crc = 0x0000
|
||
for byte in data:
|
||
crc ^= byte << 8
|
||
for _ in range(8):
|
||
if crc & 0x8000:
|
||
crc = (crc << 1) ^ 0x1021
|
||
else:
|
||
crc <<= 1
|
||
crc &= 0xFFFF
|
||
return crc
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Frame builder — mirrors jlink_send_telemetry() structure
|
||
# ---------------------------------------------------------------------------
|
||
|
||
JLINK_STX = 0x02
|
||
JLINK_ETX = 0x03
|
||
|
||
JLINK_CMD_HEARTBEAT = 0x01
|
||
JLINK_CMD_DRIVE = 0x02
|
||
JLINK_CMD_ARM = 0x03
|
||
JLINK_CMD_DISARM = 0x04
|
||
JLINK_CMD_PID_SET = 0x05
|
||
JLINK_CMD_ESTOP = 0x07
|
||
JLINK_TLM_STATUS = 0x80
|
||
|
||
|
||
def build_frame(cmd: int, payload: bytes = b"") -> bytes:
|
||
"""Build a JLink frame: [STX][LEN][CMD][PAYLOAD][CRC_hi][CRC_lo][ETX]"""
|
||
body = bytes([cmd]) + payload
|
||
length = len(body)
|
||
crc = crc16_xmodem(body)
|
||
return bytes([JLINK_STX, length, cmd]) + payload + bytes([crc >> 8, crc & 0xFF, JLINK_ETX])
|
||
|
||
|
||
def parse_frames(data: bytes) -> list:
|
||
"""
|
||
Parse all valid JLink frames from a byte stream.
|
||
Returns list of (cmd, payload) tuples for each valid frame.
|
||
"""
|
||
WAIT_STX, WAIT_LEN, WAIT_DATA, WAIT_CRC_HI, WAIT_CRC_LO, WAIT_ETX = range(6)
|
||
state = WAIT_STX
|
||
results = []
|
||
length = 0
|
||
buf = []
|
||
crc_hi = 0
|
||
|
||
for b in data:
|
||
if state == WAIT_STX:
|
||
if b == JLINK_STX:
|
||
state = WAIT_LEN
|
||
elif state == WAIT_LEN:
|
||
if b == 0 or b > 65:
|
||
state = WAIT_STX
|
||
else:
|
||
length = b
|
||
buf = []
|
||
state = WAIT_DATA
|
||
elif state == WAIT_DATA:
|
||
buf.append(b)
|
||
if len(buf) == length:
|
||
state = WAIT_CRC_HI
|
||
elif state == WAIT_CRC_HI:
|
||
crc_hi = b
|
||
state = WAIT_CRC_LO
|
||
elif state == WAIT_CRC_LO:
|
||
rx_crc = (crc_hi << 8) | b
|
||
calc_crc = crc16_xmodem(bytes(buf))
|
||
if rx_crc == calc_crc:
|
||
state = WAIT_ETX
|
||
else:
|
||
state = WAIT_STX
|
||
elif state == WAIT_ETX:
|
||
if b == JLINK_ETX:
|
||
cmd = buf[0]
|
||
payload = bytes(buf[1:])
|
||
results.append((cmd, payload))
|
||
state = WAIT_STX
|
||
|
||
return results
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# CRC16 Tests
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class TestCRC16XModem:
|
||
|
||
def test_empty(self):
|
||
assert crc16_xmodem(b"") == 0x0000
|
||
|
||
def test_known_vector_1(self):
|
||
# CRC16/XMODEM of b"123456789" = 0x31C3
|
||
assert crc16_xmodem(b"123456789") == 0x31C3
|
||
|
||
def test_single_zero(self):
|
||
assert crc16_xmodem(b"\x00") == 0x0000
|
||
|
||
def test_single_ff(self):
|
||
assert crc16_xmodem(b"\xFF") == 0x1EF0
|
||
|
||
def test_heartbeat_cmd_byte(self):
|
||
# CRC of just CMD=0x01
|
||
crc = crc16_xmodem(bytes([JLINK_CMD_HEARTBEAT]))
|
||
assert isinstance(crc, int)
|
||
assert 0 <= crc <= 0xFFFF
|
||
|
||
def test_drive_payload(self):
|
||
payload = struct.pack("<hh", 500, -300) # speed=500, steer=-300
|
||
body = bytes([JLINK_CMD_DRIVE]) + payload
|
||
crc = crc16_xmodem(body)
|
||
# Verify round-trip: same data same CRC
|
||
assert crc == crc16_xmodem(body)
|
||
|
||
def test_different_payloads_different_crc(self):
|
||
a = crc16_xmodem(b"\x02\x01\x02")
|
||
b = crc16_xmodem(b"\x02\x01\x03")
|
||
assert a != b
|
||
|
||
def test_zero_bytes(self):
|
||
assert crc16_xmodem(b"\x00\x00\x00\x00") != crc16_xmodem(b"\x00\x00\x00\x01")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Frame Building Tests
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class TestFrameBuilding:
|
||
|
||
def test_heartbeat_frame_length(self):
|
||
frame = build_frame(JLINK_CMD_HEARTBEAT)
|
||
# [STX][LEN=1][CMD=0x01][CRC_hi][CRC_lo][ETX] = 6 bytes
|
||
assert len(frame) == 6
|
||
|
||
def test_heartbeat_frame_markers(self):
|
||
frame = build_frame(JLINK_CMD_HEARTBEAT)
|
||
assert frame[0] == JLINK_STX
|
||
assert frame[-1] == JLINK_ETX
|
||
|
||
def test_heartbeat_frame_len_field(self):
|
||
frame = build_frame(JLINK_CMD_HEARTBEAT)
|
||
assert frame[1] == 1 # LEN = 1 (CMD only, no payload)
|
||
|
||
def test_heartbeat_frame_cmd(self):
|
||
frame = build_frame(JLINK_CMD_HEARTBEAT)
|
||
assert frame[2] == JLINK_CMD_HEARTBEAT
|
||
|
||
def test_drive_frame_length(self):
|
||
payload = struct.pack("<hh", 0, 0)
|
||
frame = build_frame(JLINK_CMD_DRIVE, payload)
|
||
# [STX][LEN=5][CMD][s16][s16][CRC_hi][CRC_lo][ETX] = 1+1+1+4+2+1 = 10 bytes
|
||
assert len(frame) == 10
|
||
|
||
def test_drive_frame_len_field(self):
|
||
payload = struct.pack("<hh", 0, 0)
|
||
frame = build_frame(JLINK_CMD_DRIVE, payload)
|
||
assert frame[1] == 5 # 1 (CMD) + 4 (payload)
|
||
|
||
def test_arm_frame(self):
|
||
frame = build_frame(JLINK_CMD_ARM)
|
||
assert len(frame) == 6
|
||
assert frame[2] == JLINK_CMD_ARM
|
||
|
||
def test_disarm_frame(self):
|
||
frame = build_frame(JLINK_CMD_DISARM)
|
||
assert len(frame) == 6
|
||
assert frame[2] == JLINK_CMD_DISARM
|
||
|
||
def test_estop_frame(self):
|
||
frame = build_frame(JLINK_CMD_ESTOP)
|
||
assert len(frame) == 6
|
||
assert frame[2] == JLINK_CMD_ESTOP
|
||
|
||
def test_pid_set_frame_length(self):
|
||
payload = struct.pack("<fff", 35.0, 1.0, 1.0)
|
||
frame = build_frame(JLINK_CMD_PID_SET, payload)
|
||
# [STX][LEN=13][CMD][12 bytes floats][CRC_hi][CRC_lo][ETX] = 18 bytes
|
||
assert len(frame) == 18
|
||
|
||
def test_pid_set_frame_len_field(self):
|
||
payload = struct.pack("<fff", 35.0, 1.0, 1.0)
|
||
frame = build_frame(JLINK_CMD_PID_SET, payload)
|
||
assert frame[1] == 13 # 1 (CMD) + 12 (three floats)
|
||
|
||
def test_crc_coverage(self):
|
||
"""CRC must differ if payload differs by one bit."""
|
||
payload_a = struct.pack("<hh", 500, 100)
|
||
payload_b = struct.pack("<hh", 500, 101)
|
||
frame_a = build_frame(JLINK_CMD_DRIVE, payload_a)
|
||
frame_b = build_frame(JLINK_CMD_DRIVE, payload_b)
|
||
# Extract CRC bytes (second to last two bytes before ETX)
|
||
crc_a = (frame_a[-3] << 8) | frame_a[-2]
|
||
crc_b = (frame_b[-3] << 8) | frame_b[-2]
|
||
assert crc_a != crc_b
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Frame Parsing Tests
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class TestFrameParsing:
|
||
|
||
def test_parse_single_heartbeat(self):
|
||
frame = build_frame(JLINK_CMD_HEARTBEAT)
|
||
results = parse_frames(frame)
|
||
assert len(results) == 1
|
||
cmd, payload = results[0]
|
||
assert cmd == JLINK_CMD_HEARTBEAT
|
||
assert payload == b""
|
||
|
||
def test_parse_drive_cmd(self):
|
||
payload = struct.pack("<hh", 750, -200)
|
||
frame = build_frame(JLINK_CMD_DRIVE, payload)
|
||
results = parse_frames(frame)
|
||
assert len(results) == 1
|
||
cmd, data = results[0]
|
||
assert cmd == JLINK_CMD_DRIVE
|
||
speed, steer = struct.unpack("<hh", data)
|
||
assert speed == 750
|
||
assert steer == -200
|
||
|
||
def test_parse_multiple_frames(self):
|
||
stream = (
|
||
build_frame(JLINK_CMD_HEARTBEAT) +
|
||
build_frame(JLINK_CMD_ARM) +
|
||
build_frame(JLINK_CMD_DRIVE, struct.pack("<hh", 100, 50))
|
||
)
|
||
results = parse_frames(stream)
|
||
assert len(results) == 3
|
||
assert results[0][0] == JLINK_CMD_HEARTBEAT
|
||
assert results[1][0] == JLINK_CMD_ARM
|
||
assert results[2][0] == JLINK_CMD_DRIVE
|
||
|
||
def test_bad_crc_dropped(self):
|
||
frame = bytearray(build_frame(JLINK_CMD_HEARTBEAT))
|
||
frame[-2] ^= 0xFF # Corrupt CRC low byte
|
||
results = parse_frames(bytes(frame))
|
||
assert len(results) == 0
|
||
|
||
def test_bad_etx_dropped(self):
|
||
frame = bytearray(build_frame(JLINK_CMD_HEARTBEAT))
|
||
frame[-1] = 0xFF # Wrong ETX
|
||
results = parse_frames(bytes(frame))
|
||
assert len(results) == 0
|
||
|
||
def test_noise_before_frame(self):
|
||
noise = bytes([0xAA, 0xBB, 0xCC, 0xDD])
|
||
frame = build_frame(JLINK_CMD_ARM)
|
||
results = parse_frames(noise + frame)
|
||
assert len(results) == 1
|
||
assert results[0][0] == JLINK_CMD_ARM
|
||
|
||
def test_two_frames_concatenated(self):
|
||
f1 = build_frame(JLINK_CMD_DISARM)
|
||
f2 = build_frame(JLINK_CMD_HEARTBEAT)
|
||
results = parse_frames(f1 + f2)
|
||
assert len(results) == 2
|
||
|
||
def test_pid_set_parsed_correctly(self):
|
||
kp, ki, kd = 42.5, 0.5, 2.0
|
||
payload = struct.pack("<fff", kp, ki, kd)
|
||
frame = build_frame(JLINK_CMD_PID_SET, payload)
|
||
results = parse_frames(frame)
|
||
assert len(results) == 1
|
||
cmd, data = results[0]
|
||
assert cmd == JLINK_CMD_PID_SET
|
||
got_kp, got_ki, got_kd = struct.unpack("<fff", data)
|
||
assert abs(got_kp - kp) < 1e-5
|
||
assert abs(got_ki - ki) < 1e-5
|
||
assert abs(got_kd - kd) < 1e-5
|
||
|
||
def test_drive_boundary_values(self):
|
||
for spd, str_ in [(1000, 1000), (-1000, -1000), (0, 0)]:
|
||
frame = build_frame(JLINK_CMD_DRIVE, struct.pack("<hh", spd, str_))
|
||
results = parse_frames(frame)
|
||
assert len(results) == 1
|
||
speed, steer = struct.unpack("<hh", results[0][1])
|
||
assert speed == spd
|
||
assert steer == str_
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Telemetry STATUS Frame Tests
|
||
# ---------------------------------------------------------------------------
|
||
|
||
# STATUS payload struct: <hhhhH b B B B B B B B B B
|
||
# pitch_x10, roll_x10, yaw_x10, motor_cmd, vbat_mv,
|
||
# rssi_dbm, link_quality, balance_state, rc_armed,
|
||
# mode, estop, soc_pct, fw_major, fw_minor, fw_patch
|
||
STATUS_FMT = "<hhhhHbBBBBBBBBB" # 20 bytes
|
||
STATUS_SIZE = struct.calcsize(STATUS_FMT)
|
||
|
||
|
||
def make_status(pitch=0, roll=0, yaw=0, motor_cmd=0, vbat_mv=12600,
|
||
rssi_dbm=-85, link_quality=95, balance_state=1,
|
||
rc_armed=1, mode=2, estop=0, soc_pct=80,
|
||
fw_major=1, fw_minor=0, fw_patch=0) -> bytes:
|
||
return struct.pack(STATUS_FMT,
|
||
pitch, roll, yaw, motor_cmd, vbat_mv,
|
||
rssi_dbm, link_quality, balance_state, rc_armed,
|
||
mode, estop, soc_pct, fw_major, fw_minor, fw_patch)
|
||
|
||
|
||
class TestStatusTelemetry:
|
||
|
||
def test_status_payload_size(self):
|
||
assert STATUS_SIZE == 20
|
||
|
||
def test_status_frame_total_length(self):
|
||
payload = make_status()
|
||
frame = build_frame(JLINK_TLM_STATUS, payload)
|
||
# [STX][LEN=21][0x80][20 bytes][CRC_hi][CRC_lo][ETX] = 26 bytes
|
||
assert len(frame) == 26
|
||
|
||
def test_status_len_field(self):
|
||
payload = make_status()
|
||
frame = build_frame(JLINK_TLM_STATUS, payload)
|
||
assert frame[1] == 21 # 1 (CMD) + 20 (STATUS payload)
|
||
|
||
def test_status_cmd_byte(self):
|
||
payload = make_status()
|
||
frame = build_frame(JLINK_TLM_STATUS, payload)
|
||
assert frame[2] == JLINK_TLM_STATUS
|
||
|
||
def test_status_parseable(self):
|
||
payload = make_status(pitch=123, roll=-45, yaw=678, motor_cmd=500,
|
||
vbat_mv=11800, rssi_dbm=-90, link_quality=80,
|
||
balance_state=1, rc_armed=1, mode=2, estop=0,
|
||
soc_pct=60, fw_major=1, fw_minor=0, fw_patch=0)
|
||
frame = build_frame(JLINK_TLM_STATUS, payload)
|
||
results = parse_frames(frame)
|
||
assert len(results) == 1
|
||
cmd, data = results[0]
|
||
assert cmd == JLINK_TLM_STATUS
|
||
vals = struct.unpack(STATUS_FMT, data)
|
||
pitch, roll, yaw, motor_cmd, vbat_mv, rssi_dbm, lq, bs, ra, mode, es, soc, fwma, fwmi, fwpa = vals
|
||
assert pitch == 123
|
||
assert roll == -45
|
||
assert yaw == 678
|
||
assert motor_cmd == 500
|
||
assert vbat_mv == 11800
|
||
assert rssi_dbm == -90
|
||
assert lq == 80
|
||
assert bs == 1
|
||
assert ra == 1
|
||
assert mode == 2
|
||
assert es == 0
|
||
assert soc == 60
|
||
assert fwma == 1
|
||
assert fwmi == 0
|
||
assert fwpa == 0
|
||
|
||
def test_pitch_encoding(self):
|
||
"""pitch_deg * 10 fits in int16 for ±3276 degrees."""
|
||
for deg in [-20.0, 0.0, 5.5, 15.3, -10.1]:
|
||
encoded = int(deg * 10)
|
||
payload = make_status(pitch=encoded)
|
||
frame = build_frame(JLINK_TLM_STATUS, payload)
|
||
results = parse_frames(frame)
|
||
assert len(results) == 1
|
||
vals = struct.unpack(STATUS_FMT, results[0][1])
|
||
assert vals[0] == encoded
|
||
|
||
def test_vbat_range_3s(self):
|
||
"""3S battery 9900–12600 mV fits in uint16."""
|
||
for mv in [9900, 11100, 12600]:
|
||
payload = make_status(vbat_mv=mv)
|
||
frame = build_frame(JLINK_TLM_STATUS, payload)
|
||
results = parse_frames(frame)
|
||
vals = struct.unpack(STATUS_FMT, results[0][1])
|
||
assert vals[4] == mv
|
||
|
||
def test_vbat_range_4s(self):
|
||
"""4S battery 13200–16800 mV fits in uint16."""
|
||
for mv in [13200, 14800, 16800]:
|
||
payload = make_status(vbat_mv=mv)
|
||
frame = build_frame(JLINK_TLM_STATUS, payload)
|
||
results = parse_frames(frame)
|
||
vals = struct.unpack(STATUS_FMT, results[0][1])
|
||
assert vals[4] == mv
|
||
|
||
def test_fw_version_fields(self):
|
||
payload = make_status(fw_major=2, fw_minor=3, fw_patch=7)
|
||
frame = build_frame(JLINK_TLM_STATUS, payload)
|
||
results = parse_frames(frame)
|
||
vals = struct.unpack(STATUS_FMT, results[0][1])
|
||
assert vals[-3] == 2
|
||
assert vals[-2] == 3
|
||
assert vals[-1] == 7
|
||
|
||
def test_soc_pct_unknown(self):
|
||
"""soc_pct=255 encodes as 0xFF (unknown)."""
|
||
payload = make_status(soc_pct=255)
|
||
frame = build_frame(JLINK_TLM_STATUS, payload)
|
||
results = parse_frames(frame)
|
||
vals = struct.unpack(STATUS_FMT, results[0][1])
|
||
assert vals[11] == 255
|