saltylab-firmware/test/test_jlink_frames.py
sl-firmware 6f0ad8e92e feat(firmware): Jetson binary serial protocol on USART1 (Issue #120)
New jlink module replaces ASCII-over-USB-CDC jetson_cmd with a dedicated
hardware UART binary protocol at 921600 baud for reliable Jetson comms.

- include/jlink.h: JLinkState struct, jlink_tlm_status_t (20-byte packed),
  command/telemetry IDs (0x01-0x07 cmd, 0x80 status), API declarations
- src/jlink.c: USART1 DMA2_Stream2_Channel4 circular RX (128 bytes),
  IDLE interrupt, CRC16-XModem (poly 0x1021) frame parser state machine,
  command dispatch (HEARTBEAT/DRIVE/ARM/DISARM/PID_SET/ESTOP),
  jlink_send_telemetry() blocking TX (≈0.28 ms per frame)
- include/config.h: JLINK_BAUD=921600, JLINK_HB_TIMEOUT_MS=1000,
  JLINK_TLM_HZ=50, FW_MAJOR/MINOR/PATCH version constants
- src/main.c: jlink_init(), jlink_process() in main loop, arm/disarm/
  estop/PID flag handling, 50 Hz STATUS telemetry TX, jlink takes
  priority over legacy jetson_cmd for speed/steer injection
- test/test_jlink_frames.py: 39 pytest tests (39/39 pass) — CRC16,
  frame building, parser state machine, drive/PID/status encoding

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-02 09:22:34 -05:00

417 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
test_jlink_frames.py — Issue #120: JLink binary protocol unit tests.
Tests CRC16-XModem, frame building, frame parsing (state machine),
command payload encoding, and telemetry frame layout.
No HAL or STM32 hardware required — pure Python logic.
"""
import struct
import pytest
# ---------------------------------------------------------------------------
# CRC16-XModem (poly 0x1021, init 0x0000) — mirrors crc16_xmodem() in jlink.c
# ---------------------------------------------------------------------------
def crc16_xmodem(data: bytes) -> int:
crc = 0x0000
for byte in data:
crc ^= byte << 8
for _ in range(8):
if crc & 0x8000:
crc = (crc << 1) ^ 0x1021
else:
crc <<= 1
crc &= 0xFFFF
return crc
# ---------------------------------------------------------------------------
# Frame builder — mirrors jlink_send_telemetry() structure
# ---------------------------------------------------------------------------
JLINK_STX = 0x02
JLINK_ETX = 0x03
JLINK_CMD_HEARTBEAT = 0x01
JLINK_CMD_DRIVE = 0x02
JLINK_CMD_ARM = 0x03
JLINK_CMD_DISARM = 0x04
JLINK_CMD_PID_SET = 0x05
JLINK_CMD_ESTOP = 0x07
JLINK_TLM_STATUS = 0x80
def build_frame(cmd: int, payload: bytes = b"") -> bytes:
"""Build a JLink frame: [STX][LEN][CMD][PAYLOAD][CRC_hi][CRC_lo][ETX]"""
body = bytes([cmd]) + payload
length = len(body)
crc = crc16_xmodem(body)
return bytes([JLINK_STX, length, cmd]) + payload + bytes([crc >> 8, crc & 0xFF, JLINK_ETX])
def parse_frames(data: bytes) -> list:
"""
Parse all valid JLink frames from a byte stream.
Returns list of (cmd, payload) tuples for each valid frame.
"""
WAIT_STX, WAIT_LEN, WAIT_DATA, WAIT_CRC_HI, WAIT_CRC_LO, WAIT_ETX = range(6)
state = WAIT_STX
results = []
length = 0
buf = []
crc_hi = 0
for b in data:
if state == WAIT_STX:
if b == JLINK_STX:
state = WAIT_LEN
elif state == WAIT_LEN:
if b == 0 or b > 65:
state = WAIT_STX
else:
length = b
buf = []
state = WAIT_DATA
elif state == WAIT_DATA:
buf.append(b)
if len(buf) == length:
state = WAIT_CRC_HI
elif state == WAIT_CRC_HI:
crc_hi = b
state = WAIT_CRC_LO
elif state == WAIT_CRC_LO:
rx_crc = (crc_hi << 8) | b
calc_crc = crc16_xmodem(bytes(buf))
if rx_crc == calc_crc:
state = WAIT_ETX
else:
state = WAIT_STX
elif state == WAIT_ETX:
if b == JLINK_ETX:
cmd = buf[0]
payload = bytes(buf[1:])
results.append((cmd, payload))
state = WAIT_STX
return results
# ---------------------------------------------------------------------------
# CRC16 Tests
# ---------------------------------------------------------------------------
class TestCRC16XModem:
def test_empty(self):
assert crc16_xmodem(b"") == 0x0000
def test_known_vector_1(self):
# CRC16/XMODEM of b"123456789" = 0x31C3
assert crc16_xmodem(b"123456789") == 0x31C3
def test_single_zero(self):
assert crc16_xmodem(b"\x00") == 0x0000
def test_single_ff(self):
assert crc16_xmodem(b"\xFF") == 0x1EF0
def test_heartbeat_cmd_byte(self):
# CRC of just CMD=0x01
crc = crc16_xmodem(bytes([JLINK_CMD_HEARTBEAT]))
assert isinstance(crc, int)
assert 0 <= crc <= 0xFFFF
def test_drive_payload(self):
payload = struct.pack("<hh", 500, -300) # speed=500, steer=-300
body = bytes([JLINK_CMD_DRIVE]) + payload
crc = crc16_xmodem(body)
# Verify round-trip: same data same CRC
assert crc == crc16_xmodem(body)
def test_different_payloads_different_crc(self):
a = crc16_xmodem(b"\x02\x01\x02")
b = crc16_xmodem(b"\x02\x01\x03")
assert a != b
def test_zero_bytes(self):
assert crc16_xmodem(b"\x00\x00\x00\x00") != crc16_xmodem(b"\x00\x00\x00\x01")
# ---------------------------------------------------------------------------
# Frame Building Tests
# ---------------------------------------------------------------------------
class TestFrameBuilding:
def test_heartbeat_frame_length(self):
frame = build_frame(JLINK_CMD_HEARTBEAT)
# [STX][LEN=1][CMD=0x01][CRC_hi][CRC_lo][ETX] = 6 bytes
assert len(frame) == 6
def test_heartbeat_frame_markers(self):
frame = build_frame(JLINK_CMD_HEARTBEAT)
assert frame[0] == JLINK_STX
assert frame[-1] == JLINK_ETX
def test_heartbeat_frame_len_field(self):
frame = build_frame(JLINK_CMD_HEARTBEAT)
assert frame[1] == 1 # LEN = 1 (CMD only, no payload)
def test_heartbeat_frame_cmd(self):
frame = build_frame(JLINK_CMD_HEARTBEAT)
assert frame[2] == JLINK_CMD_HEARTBEAT
def test_drive_frame_length(self):
payload = struct.pack("<hh", 0, 0)
frame = build_frame(JLINK_CMD_DRIVE, payload)
# [STX][LEN=5][CMD][s16][s16][CRC_hi][CRC_lo][ETX] = 1+1+1+4+2+1 = 10 bytes
assert len(frame) == 10
def test_drive_frame_len_field(self):
payload = struct.pack("<hh", 0, 0)
frame = build_frame(JLINK_CMD_DRIVE, payload)
assert frame[1] == 5 # 1 (CMD) + 4 (payload)
def test_arm_frame(self):
frame = build_frame(JLINK_CMD_ARM)
assert len(frame) == 6
assert frame[2] == JLINK_CMD_ARM
def test_disarm_frame(self):
frame = build_frame(JLINK_CMD_DISARM)
assert len(frame) == 6
assert frame[2] == JLINK_CMD_DISARM
def test_estop_frame(self):
frame = build_frame(JLINK_CMD_ESTOP)
assert len(frame) == 6
assert frame[2] == JLINK_CMD_ESTOP
def test_pid_set_frame_length(self):
payload = struct.pack("<fff", 35.0, 1.0, 1.0)
frame = build_frame(JLINK_CMD_PID_SET, payload)
# [STX][LEN=13][CMD][12 bytes floats][CRC_hi][CRC_lo][ETX] = 18 bytes
assert len(frame) == 18
def test_pid_set_frame_len_field(self):
payload = struct.pack("<fff", 35.0, 1.0, 1.0)
frame = build_frame(JLINK_CMD_PID_SET, payload)
assert frame[1] == 13 # 1 (CMD) + 12 (three floats)
def test_crc_coverage(self):
"""CRC must differ if payload differs by one bit."""
payload_a = struct.pack("<hh", 500, 100)
payload_b = struct.pack("<hh", 500, 101)
frame_a = build_frame(JLINK_CMD_DRIVE, payload_a)
frame_b = build_frame(JLINK_CMD_DRIVE, payload_b)
# Extract CRC bytes (second to last two bytes before ETX)
crc_a = (frame_a[-3] << 8) | frame_a[-2]
crc_b = (frame_b[-3] << 8) | frame_b[-2]
assert crc_a != crc_b
# ---------------------------------------------------------------------------
# Frame Parsing Tests
# ---------------------------------------------------------------------------
class TestFrameParsing:
def test_parse_single_heartbeat(self):
frame = build_frame(JLINK_CMD_HEARTBEAT)
results = parse_frames(frame)
assert len(results) == 1
cmd, payload = results[0]
assert cmd == JLINK_CMD_HEARTBEAT
assert payload == b""
def test_parse_drive_cmd(self):
payload = struct.pack("<hh", 750, -200)
frame = build_frame(JLINK_CMD_DRIVE, payload)
results = parse_frames(frame)
assert len(results) == 1
cmd, data = results[0]
assert cmd == JLINK_CMD_DRIVE
speed, steer = struct.unpack("<hh", data)
assert speed == 750
assert steer == -200
def test_parse_multiple_frames(self):
stream = (
build_frame(JLINK_CMD_HEARTBEAT) +
build_frame(JLINK_CMD_ARM) +
build_frame(JLINK_CMD_DRIVE, struct.pack("<hh", 100, 50))
)
results = parse_frames(stream)
assert len(results) == 3
assert results[0][0] == JLINK_CMD_HEARTBEAT
assert results[1][0] == JLINK_CMD_ARM
assert results[2][0] == JLINK_CMD_DRIVE
def test_bad_crc_dropped(self):
frame = bytearray(build_frame(JLINK_CMD_HEARTBEAT))
frame[-2] ^= 0xFF # Corrupt CRC low byte
results = parse_frames(bytes(frame))
assert len(results) == 0
def test_bad_etx_dropped(self):
frame = bytearray(build_frame(JLINK_CMD_HEARTBEAT))
frame[-1] = 0xFF # Wrong ETX
results = parse_frames(bytes(frame))
assert len(results) == 0
def test_noise_before_frame(self):
noise = bytes([0xAA, 0xBB, 0xCC, 0xDD])
frame = build_frame(JLINK_CMD_ARM)
results = parse_frames(noise + frame)
assert len(results) == 1
assert results[0][0] == JLINK_CMD_ARM
def test_two_frames_concatenated(self):
f1 = build_frame(JLINK_CMD_DISARM)
f2 = build_frame(JLINK_CMD_HEARTBEAT)
results = parse_frames(f1 + f2)
assert len(results) == 2
def test_pid_set_parsed_correctly(self):
kp, ki, kd = 42.5, 0.5, 2.0
payload = struct.pack("<fff", kp, ki, kd)
frame = build_frame(JLINK_CMD_PID_SET, payload)
results = parse_frames(frame)
assert len(results) == 1
cmd, data = results[0]
assert cmd == JLINK_CMD_PID_SET
got_kp, got_ki, got_kd = struct.unpack("<fff", data)
assert abs(got_kp - kp) < 1e-5
assert abs(got_ki - ki) < 1e-5
assert abs(got_kd - kd) < 1e-5
def test_drive_boundary_values(self):
for spd, str_ in [(1000, 1000), (-1000, -1000), (0, 0)]:
frame = build_frame(JLINK_CMD_DRIVE, struct.pack("<hh", spd, str_))
results = parse_frames(frame)
assert len(results) == 1
speed, steer = struct.unpack("<hh", results[0][1])
assert speed == spd
assert steer == str_
# ---------------------------------------------------------------------------
# Telemetry STATUS Frame Tests
# ---------------------------------------------------------------------------
# STATUS payload struct: <hhhhH b B B B B B B B B B
# pitch_x10, roll_x10, yaw_x10, motor_cmd, vbat_mv,
# rssi_dbm, link_quality, balance_state, rc_armed,
# mode, estop, soc_pct, fw_major, fw_minor, fw_patch
STATUS_FMT = "<hhhhHbBBBBBBBBB" # 20 bytes
STATUS_SIZE = struct.calcsize(STATUS_FMT)
def make_status(pitch=0, roll=0, yaw=0, motor_cmd=0, vbat_mv=12600,
rssi_dbm=-85, link_quality=95, balance_state=1,
rc_armed=1, mode=2, estop=0, soc_pct=80,
fw_major=1, fw_minor=0, fw_patch=0) -> bytes:
return struct.pack(STATUS_FMT,
pitch, roll, yaw, motor_cmd, vbat_mv,
rssi_dbm, link_quality, balance_state, rc_armed,
mode, estop, soc_pct, fw_major, fw_minor, fw_patch)
class TestStatusTelemetry:
def test_status_payload_size(self):
assert STATUS_SIZE == 20
def test_status_frame_total_length(self):
payload = make_status()
frame = build_frame(JLINK_TLM_STATUS, payload)
# [STX][LEN=21][0x80][20 bytes][CRC_hi][CRC_lo][ETX] = 26 bytes
assert len(frame) == 26
def test_status_len_field(self):
payload = make_status()
frame = build_frame(JLINK_TLM_STATUS, payload)
assert frame[1] == 21 # 1 (CMD) + 20 (STATUS payload)
def test_status_cmd_byte(self):
payload = make_status()
frame = build_frame(JLINK_TLM_STATUS, payload)
assert frame[2] == JLINK_TLM_STATUS
def test_status_parseable(self):
payload = make_status(pitch=123, roll=-45, yaw=678, motor_cmd=500,
vbat_mv=11800, rssi_dbm=-90, link_quality=80,
balance_state=1, rc_armed=1, mode=2, estop=0,
soc_pct=60, fw_major=1, fw_minor=0, fw_patch=0)
frame = build_frame(JLINK_TLM_STATUS, payload)
results = parse_frames(frame)
assert len(results) == 1
cmd, data = results[0]
assert cmd == JLINK_TLM_STATUS
vals = struct.unpack(STATUS_FMT, data)
pitch, roll, yaw, motor_cmd, vbat_mv, rssi_dbm, lq, bs, ra, mode, es, soc, fwma, fwmi, fwpa = vals
assert pitch == 123
assert roll == -45
assert yaw == 678
assert motor_cmd == 500
assert vbat_mv == 11800
assert rssi_dbm == -90
assert lq == 80
assert bs == 1
assert ra == 1
assert mode == 2
assert es == 0
assert soc == 60
assert fwma == 1
assert fwmi == 0
assert fwpa == 0
def test_pitch_encoding(self):
"""pitch_deg * 10 fits in int16 for ±3276 degrees."""
for deg in [-20.0, 0.0, 5.5, 15.3, -10.1]:
encoded = int(deg * 10)
payload = make_status(pitch=encoded)
frame = build_frame(JLINK_TLM_STATUS, payload)
results = parse_frames(frame)
assert len(results) == 1
vals = struct.unpack(STATUS_FMT, results[0][1])
assert vals[0] == encoded
def test_vbat_range_3s(self):
"""3S battery 990012600 mV fits in uint16."""
for mv in [9900, 11100, 12600]:
payload = make_status(vbat_mv=mv)
frame = build_frame(JLINK_TLM_STATUS, payload)
results = parse_frames(frame)
vals = struct.unpack(STATUS_FMT, results[0][1])
assert vals[4] == mv
def test_vbat_range_4s(self):
"""4S battery 1320016800 mV fits in uint16."""
for mv in [13200, 14800, 16800]:
payload = make_status(vbat_mv=mv)
frame = build_frame(JLINK_TLM_STATUS, payload)
results = parse_frames(frame)
vals = struct.unpack(STATUS_FMT, results[0][1])
assert vals[4] == mv
def test_fw_version_fields(self):
payload = make_status(fw_major=2, fw_minor=3, fw_patch=7)
frame = build_frame(JLINK_TLM_STATUS, payload)
results = parse_frames(frame)
vals = struct.unpack(STATUS_FMT, results[0][1])
assert vals[-3] == 2
assert vals[-2] == 3
assert vals[-1] == 7
def test_soc_pct_unknown(self):
"""soc_pct=255 encodes as 0xFF (unknown)."""
payload = make_status(soc_pct=255)
frame = build_frame(JLINK_TLM_STATUS, payload)
results = parse_frames(frame)
vals = struct.unpack(STATUS_FMT, results[0][1])
assert vals[11] == 255