Merge pull request 'feat(firmware): Jetson binary serial protocol on USART1 (Issue #120)' (#132) from sl-firmware/issue-120-serial-protocol into main

This commit is contained in:
sl-jetson 2026-03-02 09:26:45 -05:00
commit 6da4ae885d
5 changed files with 904 additions and 4 deletions

View File

@ -120,6 +120,16 @@
#define PINIO2_PORT GPIOC
#define PINIO2_PIN GPIO_PIN_0 // pinio_config = 129 (USER2)
// --- JLink: Jetson Serial Binary Protocol (USART1, Issue #120) ---
#define JLINK_BAUD 921600 /* USART1 baud rate */
#define JLINK_HB_TIMEOUT_MS 1000 /* Jetson heartbeat timeout (ms) */
#define JLINK_TLM_HZ 50 /* STATUS telemetry TX rate (Hz) */
// --- Firmware Version ---
#define FW_MAJOR 1
#define FW_MINOR 0
#define FW_PATCH 0
// --- SaltyLab Assignments ---
// Hoverboard ESC: USART2 (PA2=TX, PA3=RX) or USART3
// ELRS Receiver: UART4 (PA0=TX, PA1=RX) — CRSF 420000 baud

127
include/jlink.h Normal file
View File

@ -0,0 +1,127 @@
#ifndef JLINK_H
#define JLINK_H
#include <stdint.h>
#include <stdbool.h>
/*
* JLink Jetson serial binary protocol over USART1 (PB6=TX, PB7=RX).
*
* Issue #120: replaces jetson_cmd ASCII-over-USB-CDC with a dedicated
* hardware UART at 921600 baud using DMA circular RX and IDLE interrupt.
*
* Frame format (both directions):
* [STX=0x02][LEN][CMD][PAYLOAD...][CRC16_hi][CRC16_lo][ETX=0x03]
*
* STX : frame start sentinel (0x02)
* LEN : count of CMD + PAYLOAD bytes (1 + payload_len)
* CMD : command/telemetry type byte
* PAYLOAD: 0..N bytes depending on CMD
* CRC16 : CRC16-XModem over CMD+PAYLOAD (poly 0x1021, init 0), big-endian
* ETX : frame end sentinel (0x03)
*
* Jetson STM32 commands:
* 0x01 HEARTBEAT no payload; refreshes heartbeat timer
* 0x02 DRIVE int16 speed (-1000..+1000), int16 steer (-1000..+1000)
* 0x03 ARM no payload; request arm (same interlock as CDC 'A')
* 0x04 DISARM no payload; disarm immediately
* 0x05 PID_SET float kp, float ki, float kd (12 bytes, IEEE-754 LE)
* 0x07 ESTOP no payload; engage emergency stop
*
* STM32 Jetson telemetry:
* 0x80 STATUS jlink_tlm_status_t (20 bytes), sent at JLINK_TLM_HZ
*
* Priority: CRSF RC always takes precedence. Jetson steer/speed only applied
* when mode_manager_active() == MODE_AUTONOMOUS (CH6 high). In RC_MANUAL and
* RC_ASSISTED modes the Jetson speed offset and steer are injected via
* mode_manager_set_auto_cmd() and blended per the existing blend ramp.
*
* Heartbeat: if no valid frame arrives within JLINK_HB_TIMEOUT_MS (1000ms),
* jlink_is_active() returns false and the main loop clears the auto command.
*/
/* ---- Frame constants ---- */
#define JLINK_STX 0x02u
#define JLINK_ETX 0x03u
/* ---- Command IDs (Jetson → STM32) ---- */
#define JLINK_CMD_HEARTBEAT 0x01u
#define JLINK_CMD_DRIVE 0x02u
#define JLINK_CMD_ARM 0x03u
#define JLINK_CMD_DISARM 0x04u
#define JLINK_CMD_PID_SET 0x05u
#define JLINK_CMD_ESTOP 0x07u
/* ---- Telemetry IDs (STM32 → Jetson) ---- */
#define JLINK_TLM_STATUS 0x80u
/* ---- Telemetry STATUS payload (20 bytes, packed) ---- */
typedef struct __attribute__((packed)) {
int16_t pitch_x10; /* pitch degrees ×10 */
int16_t roll_x10; /* roll degrees ×10 */
int16_t yaw_x10; /* yaw degrees ×10 (gyro-integrated) */
int16_t motor_cmd; /* ESC output -1000..+1000 */
uint16_t vbat_mv; /* battery millivolts */
int8_t rssi_dbm; /* CRSF RSSI (dBm, negative) */
uint8_t link_quality; /* CRSF LQ 0-100 */
uint8_t balance_state; /* 0=DISARMED, 1=ARMED, 2=TILT_FAULT */
uint8_t rc_armed; /* crsf_state.armed (1=armed) */
uint8_t mode; /* robot_mode_t: 0=RC_MANUAL,1=ASSISTED,2=AUTONOMOUS */
uint8_t estop; /* EstopSource value */
uint8_t soc_pct; /* state-of-charge 0-100, 255=unknown */
uint8_t fw_major;
uint8_t fw_minor;
uint8_t fw_patch;
} jlink_tlm_status_t; /* 20 bytes */
/* ---- Volatile state (read from main loop) ---- */
typedef struct {
/* Drive command — updated on JLINK_CMD_DRIVE */
volatile int16_t speed; /* -1000..+1000 */
volatile int16_t steer; /* -1000..+1000 */
/* Heartbeat timer — updated on any valid frame */
volatile uint32_t last_rx_ms; /* HAL_GetTick() of last valid frame; 0=none */
/* One-shot request flags — set by parser, cleared by main loop */
volatile uint8_t arm_req;
volatile uint8_t disarm_req;
volatile uint8_t estop_req;
/* PID update — set by parser, cleared by main loop */
volatile uint8_t pid_updated;
volatile float pid_kp;
volatile float pid_ki;
volatile float pid_kd;
} JLinkState;
extern volatile JLinkState jlink_state;
/* ---- API ---- */
/*
* jlink_init() configure USART1 (PB6=TX, PB7=RX) at 921600 baud with
* DMA2_Stream2_Channel4 circular RX (128-byte buffer) and IDLE interrupt.
* Call once before safety_init().
*/
void jlink_init(void);
/*
* jlink_is_active(now_ms) returns true if a valid frame arrived within
* JLINK_HB_TIMEOUT_MS. Returns false if no frame ever received.
*/
bool jlink_is_active(uint32_t now_ms);
/*
* jlink_send_telemetry(status) build and transmit a JLINK_TLM_STATUS frame
* over USART1 TX (blocking, ~0.2ms at 921600). Call at JLINK_TLM_HZ.
*/
void jlink_send_telemetry(const jlink_tlm_status_t *status);
/*
* jlink_process() drain DMA circular buffer and parse frames.
* Call from main loop every iteration (not ISR). Lightweight: O(bytes_pending).
*/
void jlink_process(void);
#endif /* JLINK_H */

279
src/jlink.c Normal file
View File

@ -0,0 +1,279 @@
#include "jlink.h"
#include "config.h"
#include "stm32f7xx_hal.h"
#include <string.h>
/* ---- DMA circular RX buffer ---- */
#define JLINK_RX_BUF_LEN 128u /* must be power-of-2 */
static uint8_t s_rx_buf[JLINK_RX_BUF_LEN];
static uint32_t s_rx_tail = 0; /* consumer index (byte already processed) */
/* ---- HAL handles ---- */
static UART_HandleTypeDef s_uart;
static DMA_HandleTypeDef s_dma_rx;
/* ---- Volatile state ---- */
volatile JLinkState jlink_state;
/* ---- CRC16-XModem (poly 0x1021, init 0x0000) ---- */
static uint16_t crc16_xmodem(const uint8_t *data, uint16_t len)
{
uint16_t crc = 0x0000u;
for (uint16_t i = 0; i < len; i++) {
crc ^= (uint16_t)data[i] << 8;
for (uint8_t b = 0; b < 8; b++) {
if (crc & 0x8000u)
crc = (crc << 1) ^ 0x1021u;
else
crc <<= 1;
}
}
return crc;
}
/* ---- jlink_init() ---- */
void jlink_init(void)
{
/* GPIO: PB6=TX AF7 (USART1_TX), PB7=RX AF7 (USART1_RX) */
__HAL_RCC_GPIOB_CLK_ENABLE();
GPIO_InitTypeDef gpio = {0};
gpio.Pin = GPIO_PIN_6 | GPIO_PIN_7;
gpio.Mode = GPIO_MODE_AF_PP;
gpio.Pull = GPIO_PULLUP;
gpio.Speed = GPIO_SPEED_FREQ_HIGH;
gpio.Alternate = GPIO_AF7_USART1;
HAL_GPIO_Init(GPIOB, &gpio);
/* DMA2 Stream2 Channel4 — USART1_RX circular */
__HAL_RCC_DMA2_CLK_ENABLE();
s_dma_rx.Instance = DMA2_Stream2;
s_dma_rx.Init.Channel = DMA_CHANNEL_4;
s_dma_rx.Init.Direction = DMA_PERIPH_TO_MEMORY;
s_dma_rx.Init.PeriphInc = DMA_PINC_DISABLE;
s_dma_rx.Init.MemInc = DMA_MINC_ENABLE;
s_dma_rx.Init.PeriphDataAlignment = DMA_PDATAALIGN_BYTE;
s_dma_rx.Init.MemDataAlignment = DMA_MDATAALIGN_BYTE;
s_dma_rx.Init.Mode = DMA_CIRCULAR;
s_dma_rx.Init.Priority = DMA_PRIORITY_MEDIUM;
s_dma_rx.Init.FIFOMode = DMA_FIFOMODE_DISABLE;
HAL_DMA_Init(&s_dma_rx);
__HAL_LINKDMA(&s_uart, hdmarx, s_dma_rx);
/* USART1 at JLINK_BAUD (921600) */
__HAL_RCC_USART1_CLK_ENABLE();
s_uart.Instance = USART1;
s_uart.Init.BaudRate = JLINK_BAUD;
s_uart.Init.WordLength = UART_WORDLENGTH_8B;
s_uart.Init.StopBits = UART_STOPBITS_1;
s_uart.Init.Parity = UART_PARITY_NONE;
s_uart.Init.Mode = UART_MODE_TX_RX;
s_uart.Init.HwFlowCtl = UART_HWCONTROL_NONE;
s_uart.Init.OverSampling = UART_OVERSAMPLING_16;
HAL_UART_Init(&s_uart);
/* Enable USART1 IDLE interrupt for circular buffer draining */
__HAL_UART_ENABLE_IT(&s_uart, UART_IT_IDLE);
HAL_NVIC_SetPriority(USART1_IRQn, 6, 0);
HAL_NVIC_EnableIRQ(USART1_IRQn);
/* DMA2_Stream2 IRQ (for error handling) */
HAL_NVIC_SetPriority(DMA2_Stream2_IRQn, 7, 0);
HAL_NVIC_EnableIRQ(DMA2_Stream2_IRQn);
/* Start circular DMA RX — never stops */
HAL_UART_Receive_DMA(&s_uart, s_rx_buf, JLINK_RX_BUF_LEN);
memset((void *)&jlink_state, 0, sizeof(jlink_state));
s_rx_tail = 0;
}
/* ---- IRQ handlers ---- */
void USART1_IRQHandler(void)
{
/* Clear IDLE flag by reading SR then DR */
if (__HAL_UART_GET_FLAG(&s_uart, UART_FLAG_IDLE)) {
__HAL_UART_CLEAR_IDLEFLAG(&s_uart);
/* jlink_process() drains the buffer from main loop — no work here */
}
HAL_UART_IRQHandler(&s_uart);
}
void DMA2_Stream2_IRQHandler(void)
{
HAL_DMA_IRQHandler(&s_dma_rx);
}
/* ---- jlink_is_active() ---- */
bool jlink_is_active(uint32_t now_ms)
{
if (jlink_state.last_rx_ms == 0u) return false;
return (now_ms - jlink_state.last_rx_ms) < JLINK_HB_TIMEOUT_MS;
}
/* ---- Frame dispatch ---- */
static void dispatch(const uint8_t *payload, uint8_t cmd, uint8_t plen)
{
/* Update heartbeat timestamp on every valid frame */
jlink_state.last_rx_ms = HAL_GetTick();
switch (cmd) {
case JLINK_CMD_HEARTBEAT:
/* Heartbeat only — no payload action needed */
break;
case JLINK_CMD_DRIVE:
if (plen == 4u) {
int16_t spd, str;
memcpy(&spd, payload, 2);
memcpy(&str, payload + 2, 2);
/* Clamp to ±1000 */
if (spd > 1000) spd = 1000;
if (spd < -1000) spd = -1000;
if (str > 1000) str = 1000;
if (str < -1000) str = -1000;
jlink_state.speed = spd;
jlink_state.steer = str;
}
break;
case JLINK_CMD_ARM:
jlink_state.arm_req = 1u;
break;
case JLINK_CMD_DISARM:
jlink_state.disarm_req = 1u;
break;
case JLINK_CMD_PID_SET:
if (plen == 12u) {
float kp, ki, kd;
memcpy(&kp, payload, 4);
memcpy(&ki, payload + 4, 4);
memcpy(&kd, payload + 8, 4);
/* Sanity bounds — same as USB CDC PID handler in main.c */
if (kp >= 0.0f && kp <= 500.0f) jlink_state.pid_kp = kp;
if (ki >= 0.0f && ki <= 50.0f) jlink_state.pid_ki = ki;
if (kd >= 0.0f && kd <= 50.0f) jlink_state.pid_kd = kd;
jlink_state.pid_updated = 1u;
}
break;
case JLINK_CMD_ESTOP:
jlink_state.estop_req = 1u;
break;
default:
break;
}
}
/* ---- jlink_process() — call from main loop every tick ---- */
/*
* Parser state machine.
* Frame: [STX][LEN][CMD][PAYLOAD 0..LEN-1][CRC_hi][CRC_lo][ETX]
* LEN = count of CMD + PAYLOAD bytes (1..253).
* CRC16-XModem over CMD+PAYLOAD.
* Maximum payload = 253 - 1 = 252 bytes (LEN field is 1 byte, max 0xFF=255,
* but we cap at 64 for safety).
*/
#define JLINK_MAX_PAYLOAD 64u
typedef enum {
PS_WAIT_STX = 0,
PS_WAIT_LEN,
PS_WAIT_DATA, /* receiving CMD + PAYLOAD (len bytes total) */
PS_WAIT_CRC_HI,
PS_WAIT_CRC_LO,
PS_WAIT_ETX,
} ParseState;
void jlink_process(void)
{
static ParseState s_state = PS_WAIT_STX;
static uint8_t s_len = 0; /* expected CMD+PAYLOAD length */
static uint8_t s_count = 0; /* bytes received so far in PS_WAIT_DATA */
static uint8_t s_frame[JLINK_MAX_PAYLOAD + 1u]; /* [0]=CMD, [1..]=PAYLOAD */
static uint8_t s_crc_hi = 0;
/* Compute how many bytes the DMA has written since last drain */
uint32_t head = JLINK_RX_BUF_LEN - __HAL_DMA_GET_COUNTER(&s_dma_rx);
uint32_t bytes = (head - s_rx_tail) & (JLINK_RX_BUF_LEN - 1u);
for (uint32_t i = 0; i < bytes; i++) {
uint8_t b = s_rx_buf[s_rx_tail];
s_rx_tail = (s_rx_tail + 1u) & (JLINK_RX_BUF_LEN - 1u);
switch (s_state) {
case PS_WAIT_STX:
if (b == JLINK_STX) s_state = PS_WAIT_LEN;
break;
case PS_WAIT_LEN:
if (b == 0u || b > JLINK_MAX_PAYLOAD + 1u) {
/* Invalid length — resync */
s_state = PS_WAIT_STX;
} else {
s_len = b;
s_count = 0;
s_state = PS_WAIT_DATA;
}
break;
case PS_WAIT_DATA:
s_frame[s_count++] = b;
if (s_count == s_len) s_state = PS_WAIT_CRC_HI;
break;
case PS_WAIT_CRC_HI:
s_crc_hi = b;
s_state = PS_WAIT_CRC_LO;
break;
case PS_WAIT_CRC_LO: {
uint16_t rx_crc = ((uint16_t)s_crc_hi << 8) | b;
uint16_t calc_crc = crc16_xmodem(s_frame, s_len);
if (rx_crc == calc_crc)
s_state = PS_WAIT_ETX;
else
s_state = PS_WAIT_STX; /* CRC mismatch — drop */
break;
}
case PS_WAIT_ETX:
if (b == JLINK_ETX) {
/* Valid frame: s_frame[0]=CMD, s_frame[1..s_len-1]=PAYLOAD */
dispatch(s_frame + 1, s_frame[0], s_len - 1u);
}
/* Either way, go back to idle (resync on bad ETX) */
s_state = PS_WAIT_STX;
break;
}
}
}
/* ---- jlink_send_telemetry() ---- */
void jlink_send_telemetry(const jlink_tlm_status_t *status)
{
/*
* Frame: [STX][LEN][0x80][20 bytes STATUS][CRC_hi][CRC_lo][ETX]
* LEN = 1 (CMD) + 20 (payload) = 21
* Total frame length = 1+1+1+20+2+1 = 26 bytes
* At 921600 baud (10 bits/byte): 26×10/921600 0.28ms safe to block.
*/
static uint8_t frame[26];
const uint8_t plen = (uint8_t)sizeof(jlink_tlm_status_t); /* 20 */
const uint8_t len = 1u + plen; /* 21 */
frame[0] = JLINK_STX;
frame[1] = len;
frame[2] = JLINK_TLM_STATUS;
memcpy(&frame[3], status, plen);
uint16_t crc = crc16_xmodem(&frame[2], len); /* over CMD + PAYLOAD */
frame[3 + plen] = (uint8_t)(crc >> 8);
frame[3 + plen + 1] = (uint8_t)(crc & 0xFFu);
frame[3 + plen + 2] = JLINK_ETX;
HAL_UART_Transmit(&s_uart, frame, sizeof(frame), 5u);
}

View File

@ -16,6 +16,7 @@
#include "bmp280.h"
#include "mag.h"
#include "jetson_cmd.h"
#include "jlink.h"
#include "battery.h"
#include <math.h>
#include <string.h>
@ -139,6 +140,9 @@ int main(void) {
/* Init CRSF/ELRS receiver on UART4 (PA0/PA1) with DMA */
crsf_init();
/* Init Jetson serial binary protocol on USART1 (PB6/PB7) at 921600 baud */
jlink_init();
/* Init mode manager (RC/autonomous blend; CH6 mode switch) */
mode_manager_t mode;
mode_manager_init(&mode);
@ -172,6 +176,7 @@ int main(void) {
uint32_t balance_tick = 0;
uint32_t esc_tick = 0;
uint32_t crsf_telem_tick = 0; /* CRSF uplink telemetry TX timer */
uint32_t jlink_tlm_tick = 0; /* Jetson binary telemetry TX timer */
const float dt = 1.0f / PID_LOOP_HZ; /* 1ms at 1kHz */
while (1) {
@ -199,6 +204,37 @@ int main(void) {
safety_remote_estop_clear();
}
/* Drain JLink DMA circular buffer and parse frames */
jlink_process();
/* Handle JLink one-shot flags from Jetson binary protocol */
if (jlink_state.estop_req) {
jlink_state.estop_req = 0u;
safety_remote_estop(ESTOP_REMOTE);
safety_arm_cancel();
balance_disarm(&bal);
motor_driver_estop(&motors);
}
if (jlink_state.arm_req) {
jlink_state.arm_req = 0u;
if (!safety_remote_estop_active() &&
mpu6000_is_calibrated() &&
bal.state == BALANCE_DISARMED && fabsf(bal.pitch_deg) < 10.0f) {
safety_arm_start(now);
}
}
if (jlink_state.disarm_req) {
jlink_state.disarm_req = 0u;
safety_arm_cancel();
balance_disarm(&bal);
}
if (jlink_state.pid_updated) {
jlink_state.pid_updated = 0u;
bal.kp = jlink_state.pid_kp;
bal.ki = jlink_state.pid_ki;
bal.kd = jlink_state.pid_kd;
}
/* RC CH5 kill switch: disarm immediately if RC is alive and CH5 off.
* Applies regardless of active mode (CH5 always has kill authority). */
if (mode.rc_alive && !crsf_state.armed && bal.state == BALANCE_ARMED) {
@ -279,11 +315,15 @@ int main(void) {
jetson_cmd_process();
}
/* Balance PID at 1kHz — apply Jetson speed offset when active */
/* Balance PID at 1kHz — apply Jetson speed offset when active.
* jlink takes priority; fall back to legacy CDC jetson_cmd if alive. */
if (imu_ret == 0 && now - balance_tick >= 1) {
balance_tick = now;
float base_sp = bal.setpoint;
if (jetson_cmd_is_active(now)) bal.setpoint += jetson_cmd_sp_offset();
if (jlink_is_active(now))
bal.setpoint += ((float)jlink_state.speed / 1000.0f) * JETSON_SPEED_MAX_DEG;
else if (jetson_cmd_is_active(now))
bal.setpoint += jetson_cmd_sp_offset();
balance_update(&bal, &imu, dt);
bal.setpoint = base_sp;
}
@ -297,8 +337,11 @@ int main(void) {
}
/* Feed autonomous steer from Jetson into mode manager.
* jlink takes priority over legacy CDC jetson_cmd.
* mode_manager_get_steer() blends it with RC steer per active mode. */
if (jetson_cmd_is_active(now))
if (jlink_is_active(now))
mode_manager_set_auto_cmd(&mode, jlink_state.steer, 0);
else if (jetson_cmd_is_active(now))
mode_manager_set_auto_cmd(&mode, jetson_cmd_steer(), 0);
/* Send to hoverboard ESC at 50Hz (every 20ms) */
@ -329,6 +372,30 @@ int main(void) {
crsf_send_flight_mode(bal.state == BALANCE_ARMED);
}
/* JLink STATUS telemetry at JLINK_TLM_HZ (50Hz) back to Jetson */
if (now - jlink_tlm_tick >= (1000u / JLINK_TLM_HZ)) {
jlink_tlm_tick = now;
jlink_tlm_status_t tlm;
tlm.pitch_x10 = (int16_t)(bal.pitch_deg * 10.0f);
tlm.roll_x10 = (int16_t)(imu.roll * 10.0f);
tlm.yaw_x10 = (int16_t)(imu.yaw * 10.0f);
tlm.motor_cmd = bal.motor_cmd;
uint32_t vbat = battery_read_mv();
tlm.vbat_mv = (vbat > 65535u) ? 65535u : (uint16_t)vbat;
tlm.rssi_dbm = crsf_state.rssi_dbm;
tlm.link_quality = crsf_state.link_quality;
tlm.balance_state = (uint8_t)bal.state;
tlm.rc_armed = crsf_state.armed ? 1u : 0u;
tlm.mode = (uint8_t)mode_manager_active(&mode);
EstopSource _es = safety_get_estop();
tlm.estop = (uint8_t)_es;
tlm.soc_pct = battery_estimate_pct(vbat);
tlm.fw_major = FW_MAJOR;
tlm.fw_minor = FW_MINOR;
tlm.fw_patch = FW_PATCH;
jlink_send_telemetry(&tlm);
}
/* USB telemetry at 50Hz (only when streaming enabled and calibration done) */
if (cdc_streaming && mpu6000_is_calibrated() && now - send_tick >= 20) {
send_tick = now;
@ -391,8 +458,9 @@ int main(void) {
if (_rs >= ESTOP_REMOTE) es = (int)_rs;
else if (bal.state == BALANCE_TILT_FAULT) es = ESTOP_TILT;
else es = ESTOP_CLEAR;
n = snprintf(p, rem, ",\"ja\":%d,\"txc\":%u,\"rxc\":%u,\"es\":%d}\n",
n = snprintf(p, rem, ",\"ja\":%d,\"jl\":%d,\"txc\":%u,\"rxc\":%u,\"es\":%d}\n",
jetson_cmd_is_active(now) ? 1 : 0,
jlink_is_active(now) ? 1 : 0,
(unsigned)tx_count,
(unsigned)cdc_rx_count, es);
len = (int)(p + n - buf);

416
test/test_jlink_frames.py Normal file
View File

@ -0,0 +1,416 @@
"""
test_jlink_frames.py Issue #120: JLink binary protocol unit tests.
Tests CRC16-XModem, frame building, frame parsing (state machine),
command payload encoding, and telemetry frame layout.
No HAL or STM32 hardware required pure Python logic.
"""
import struct
import pytest
# ---------------------------------------------------------------------------
# CRC16-XModem (poly 0x1021, init 0x0000) — mirrors crc16_xmodem() in jlink.c
# ---------------------------------------------------------------------------
def crc16_xmodem(data: bytes) -> int:
crc = 0x0000
for byte in data:
crc ^= byte << 8
for _ in range(8):
if crc & 0x8000:
crc = (crc << 1) ^ 0x1021
else:
crc <<= 1
crc &= 0xFFFF
return crc
# ---------------------------------------------------------------------------
# Frame builder — mirrors jlink_send_telemetry() structure
# ---------------------------------------------------------------------------
JLINK_STX = 0x02
JLINK_ETX = 0x03
JLINK_CMD_HEARTBEAT = 0x01
JLINK_CMD_DRIVE = 0x02
JLINK_CMD_ARM = 0x03
JLINK_CMD_DISARM = 0x04
JLINK_CMD_PID_SET = 0x05
JLINK_CMD_ESTOP = 0x07
JLINK_TLM_STATUS = 0x80
def build_frame(cmd: int, payload: bytes = b"") -> bytes:
"""Build a JLink frame: [STX][LEN][CMD][PAYLOAD][CRC_hi][CRC_lo][ETX]"""
body = bytes([cmd]) + payload
length = len(body)
crc = crc16_xmodem(body)
return bytes([JLINK_STX, length, cmd]) + payload + bytes([crc >> 8, crc & 0xFF, JLINK_ETX])
def parse_frames(data: bytes) -> list:
"""
Parse all valid JLink frames from a byte stream.
Returns list of (cmd, payload) tuples for each valid frame.
"""
WAIT_STX, WAIT_LEN, WAIT_DATA, WAIT_CRC_HI, WAIT_CRC_LO, WAIT_ETX = range(6)
state = WAIT_STX
results = []
length = 0
buf = []
crc_hi = 0
for b in data:
if state == WAIT_STX:
if b == JLINK_STX:
state = WAIT_LEN
elif state == WAIT_LEN:
if b == 0 or b > 65:
state = WAIT_STX
else:
length = b
buf = []
state = WAIT_DATA
elif state == WAIT_DATA:
buf.append(b)
if len(buf) == length:
state = WAIT_CRC_HI
elif state == WAIT_CRC_HI:
crc_hi = b
state = WAIT_CRC_LO
elif state == WAIT_CRC_LO:
rx_crc = (crc_hi << 8) | b
calc_crc = crc16_xmodem(bytes(buf))
if rx_crc == calc_crc:
state = WAIT_ETX
else:
state = WAIT_STX
elif state == WAIT_ETX:
if b == JLINK_ETX:
cmd = buf[0]
payload = bytes(buf[1:])
results.append((cmd, payload))
state = WAIT_STX
return results
# ---------------------------------------------------------------------------
# CRC16 Tests
# ---------------------------------------------------------------------------
class TestCRC16XModem:
def test_empty(self):
assert crc16_xmodem(b"") == 0x0000
def test_known_vector_1(self):
# CRC16/XMODEM of b"123456789" = 0x31C3
assert crc16_xmodem(b"123456789") == 0x31C3
def test_single_zero(self):
assert crc16_xmodem(b"\x00") == 0x0000
def test_single_ff(self):
assert crc16_xmodem(b"\xFF") == 0x1EF0
def test_heartbeat_cmd_byte(self):
# CRC of just CMD=0x01
crc = crc16_xmodem(bytes([JLINK_CMD_HEARTBEAT]))
assert isinstance(crc, int)
assert 0 <= crc <= 0xFFFF
def test_drive_payload(self):
payload = struct.pack("<hh", 500, -300) # speed=500, steer=-300
body = bytes([JLINK_CMD_DRIVE]) + payload
crc = crc16_xmodem(body)
# Verify round-trip: same data same CRC
assert crc == crc16_xmodem(body)
def test_different_payloads_different_crc(self):
a = crc16_xmodem(b"\x02\x01\x02")
b = crc16_xmodem(b"\x02\x01\x03")
assert a != b
def test_zero_bytes(self):
assert crc16_xmodem(b"\x00\x00\x00\x00") != crc16_xmodem(b"\x00\x00\x00\x01")
# ---------------------------------------------------------------------------
# Frame Building Tests
# ---------------------------------------------------------------------------
class TestFrameBuilding:
def test_heartbeat_frame_length(self):
frame = build_frame(JLINK_CMD_HEARTBEAT)
# [STX][LEN=1][CMD=0x01][CRC_hi][CRC_lo][ETX] = 6 bytes
assert len(frame) == 6
def test_heartbeat_frame_markers(self):
frame = build_frame(JLINK_CMD_HEARTBEAT)
assert frame[0] == JLINK_STX
assert frame[-1] == JLINK_ETX
def test_heartbeat_frame_len_field(self):
frame = build_frame(JLINK_CMD_HEARTBEAT)
assert frame[1] == 1 # LEN = 1 (CMD only, no payload)
def test_heartbeat_frame_cmd(self):
frame = build_frame(JLINK_CMD_HEARTBEAT)
assert frame[2] == JLINK_CMD_HEARTBEAT
def test_drive_frame_length(self):
payload = struct.pack("<hh", 0, 0)
frame = build_frame(JLINK_CMD_DRIVE, payload)
# [STX][LEN=5][CMD][s16][s16][CRC_hi][CRC_lo][ETX] = 1+1+1+4+2+1 = 10 bytes
assert len(frame) == 10
def test_drive_frame_len_field(self):
payload = struct.pack("<hh", 0, 0)
frame = build_frame(JLINK_CMD_DRIVE, payload)
assert frame[1] == 5 # 1 (CMD) + 4 (payload)
def test_arm_frame(self):
frame = build_frame(JLINK_CMD_ARM)
assert len(frame) == 6
assert frame[2] == JLINK_CMD_ARM
def test_disarm_frame(self):
frame = build_frame(JLINK_CMD_DISARM)
assert len(frame) == 6
assert frame[2] == JLINK_CMD_DISARM
def test_estop_frame(self):
frame = build_frame(JLINK_CMD_ESTOP)
assert len(frame) == 6
assert frame[2] == JLINK_CMD_ESTOP
def test_pid_set_frame_length(self):
payload = struct.pack("<fff", 35.0, 1.0, 1.0)
frame = build_frame(JLINK_CMD_PID_SET, payload)
# [STX][LEN=13][CMD][12 bytes floats][CRC_hi][CRC_lo][ETX] = 18 bytes
assert len(frame) == 18
def test_pid_set_frame_len_field(self):
payload = struct.pack("<fff", 35.0, 1.0, 1.0)
frame = build_frame(JLINK_CMD_PID_SET, payload)
assert frame[1] == 13 # 1 (CMD) + 12 (three floats)
def test_crc_coverage(self):
"""CRC must differ if payload differs by one bit."""
payload_a = struct.pack("<hh", 500, 100)
payload_b = struct.pack("<hh", 500, 101)
frame_a = build_frame(JLINK_CMD_DRIVE, payload_a)
frame_b = build_frame(JLINK_CMD_DRIVE, payload_b)
# Extract CRC bytes (second to last two bytes before ETX)
crc_a = (frame_a[-3] << 8) | frame_a[-2]
crc_b = (frame_b[-3] << 8) | frame_b[-2]
assert crc_a != crc_b
# ---------------------------------------------------------------------------
# Frame Parsing Tests
# ---------------------------------------------------------------------------
class TestFrameParsing:
def test_parse_single_heartbeat(self):
frame = build_frame(JLINK_CMD_HEARTBEAT)
results = parse_frames(frame)
assert len(results) == 1
cmd, payload = results[0]
assert cmd == JLINK_CMD_HEARTBEAT
assert payload == b""
def test_parse_drive_cmd(self):
payload = struct.pack("<hh", 750, -200)
frame = build_frame(JLINK_CMD_DRIVE, payload)
results = parse_frames(frame)
assert len(results) == 1
cmd, data = results[0]
assert cmd == JLINK_CMD_DRIVE
speed, steer = struct.unpack("<hh", data)
assert speed == 750
assert steer == -200
def test_parse_multiple_frames(self):
stream = (
build_frame(JLINK_CMD_HEARTBEAT) +
build_frame(JLINK_CMD_ARM) +
build_frame(JLINK_CMD_DRIVE, struct.pack("<hh", 100, 50))
)
results = parse_frames(stream)
assert len(results) == 3
assert results[0][0] == JLINK_CMD_HEARTBEAT
assert results[1][0] == JLINK_CMD_ARM
assert results[2][0] == JLINK_CMD_DRIVE
def test_bad_crc_dropped(self):
frame = bytearray(build_frame(JLINK_CMD_HEARTBEAT))
frame[-2] ^= 0xFF # Corrupt CRC low byte
results = parse_frames(bytes(frame))
assert len(results) == 0
def test_bad_etx_dropped(self):
frame = bytearray(build_frame(JLINK_CMD_HEARTBEAT))
frame[-1] = 0xFF # Wrong ETX
results = parse_frames(bytes(frame))
assert len(results) == 0
def test_noise_before_frame(self):
noise = bytes([0xAA, 0xBB, 0xCC, 0xDD])
frame = build_frame(JLINK_CMD_ARM)
results = parse_frames(noise + frame)
assert len(results) == 1
assert results[0][0] == JLINK_CMD_ARM
def test_two_frames_concatenated(self):
f1 = build_frame(JLINK_CMD_DISARM)
f2 = build_frame(JLINK_CMD_HEARTBEAT)
results = parse_frames(f1 + f2)
assert len(results) == 2
def test_pid_set_parsed_correctly(self):
kp, ki, kd = 42.5, 0.5, 2.0
payload = struct.pack("<fff", kp, ki, kd)
frame = build_frame(JLINK_CMD_PID_SET, payload)
results = parse_frames(frame)
assert len(results) == 1
cmd, data = results[0]
assert cmd == JLINK_CMD_PID_SET
got_kp, got_ki, got_kd = struct.unpack("<fff", data)
assert abs(got_kp - kp) < 1e-5
assert abs(got_ki - ki) < 1e-5
assert abs(got_kd - kd) < 1e-5
def test_drive_boundary_values(self):
for spd, str_ in [(1000, 1000), (-1000, -1000), (0, 0)]:
frame = build_frame(JLINK_CMD_DRIVE, struct.pack("<hh", spd, str_))
results = parse_frames(frame)
assert len(results) == 1
speed, steer = struct.unpack("<hh", results[0][1])
assert speed == spd
assert steer == str_
# ---------------------------------------------------------------------------
# Telemetry STATUS Frame Tests
# ---------------------------------------------------------------------------
# STATUS payload struct: <hhhhH b B B B B B B B B B
# pitch_x10, roll_x10, yaw_x10, motor_cmd, vbat_mv,
# rssi_dbm, link_quality, balance_state, rc_armed,
# mode, estop, soc_pct, fw_major, fw_minor, fw_patch
STATUS_FMT = "<hhhhHbBBBBBBBBB" # 20 bytes
STATUS_SIZE = struct.calcsize(STATUS_FMT)
def make_status(pitch=0, roll=0, yaw=0, motor_cmd=0, vbat_mv=12600,
rssi_dbm=-85, link_quality=95, balance_state=1,
rc_armed=1, mode=2, estop=0, soc_pct=80,
fw_major=1, fw_minor=0, fw_patch=0) -> bytes:
return struct.pack(STATUS_FMT,
pitch, roll, yaw, motor_cmd, vbat_mv,
rssi_dbm, link_quality, balance_state, rc_armed,
mode, estop, soc_pct, fw_major, fw_minor, fw_patch)
class TestStatusTelemetry:
def test_status_payload_size(self):
assert STATUS_SIZE == 20
def test_status_frame_total_length(self):
payload = make_status()
frame = build_frame(JLINK_TLM_STATUS, payload)
# [STX][LEN=21][0x80][20 bytes][CRC_hi][CRC_lo][ETX] = 26 bytes
assert len(frame) == 26
def test_status_len_field(self):
payload = make_status()
frame = build_frame(JLINK_TLM_STATUS, payload)
assert frame[1] == 21 # 1 (CMD) + 20 (STATUS payload)
def test_status_cmd_byte(self):
payload = make_status()
frame = build_frame(JLINK_TLM_STATUS, payload)
assert frame[2] == JLINK_TLM_STATUS
def test_status_parseable(self):
payload = make_status(pitch=123, roll=-45, yaw=678, motor_cmd=500,
vbat_mv=11800, rssi_dbm=-90, link_quality=80,
balance_state=1, rc_armed=1, mode=2, estop=0,
soc_pct=60, fw_major=1, fw_minor=0, fw_patch=0)
frame = build_frame(JLINK_TLM_STATUS, payload)
results = parse_frames(frame)
assert len(results) == 1
cmd, data = results[0]
assert cmd == JLINK_TLM_STATUS
vals = struct.unpack(STATUS_FMT, data)
pitch, roll, yaw, motor_cmd, vbat_mv, rssi_dbm, lq, bs, ra, mode, es, soc, fwma, fwmi, fwpa = vals
assert pitch == 123
assert roll == -45
assert yaw == 678
assert motor_cmd == 500
assert vbat_mv == 11800
assert rssi_dbm == -90
assert lq == 80
assert bs == 1
assert ra == 1
assert mode == 2
assert es == 0
assert soc == 60
assert fwma == 1
assert fwmi == 0
assert fwpa == 0
def test_pitch_encoding(self):
"""pitch_deg * 10 fits in int16 for ±3276 degrees."""
for deg in [-20.0, 0.0, 5.5, 15.3, -10.1]:
encoded = int(deg * 10)
payload = make_status(pitch=encoded)
frame = build_frame(JLINK_TLM_STATUS, payload)
results = parse_frames(frame)
assert len(results) == 1
vals = struct.unpack(STATUS_FMT, results[0][1])
assert vals[0] == encoded
def test_vbat_range_3s(self):
"""3S battery 990012600 mV fits in uint16."""
for mv in [9900, 11100, 12600]:
payload = make_status(vbat_mv=mv)
frame = build_frame(JLINK_TLM_STATUS, payload)
results = parse_frames(frame)
vals = struct.unpack(STATUS_FMT, results[0][1])
assert vals[4] == mv
def test_vbat_range_4s(self):
"""4S battery 1320016800 mV fits in uint16."""
for mv in [13200, 14800, 16800]:
payload = make_status(vbat_mv=mv)
frame = build_frame(JLINK_TLM_STATUS, payload)
results = parse_frames(frame)
vals = struct.unpack(STATUS_FMT, results[0][1])
assert vals[4] == mv
def test_fw_version_fields(self):
payload = make_status(fw_major=2, fw_minor=3, fw_patch=7)
frame = build_frame(JLINK_TLM_STATUS, payload)
results = parse_frames(frame)
vals = struct.unpack(STATUS_FMT, results[0][1])
assert vals[-3] == 2
assert vals[-2] == 3
assert vals[-1] == 7
def test_soc_pct_unknown(self):
"""soc_pct=255 encodes as 0xFF (unknown)."""
payload = make_status(soc_pct=255)
frame = build_frame(JLINK_TLM_STATUS, payload)
results = parse_frames(frame)
vals = struct.unpack(STATUS_FMT, results[0][1])
assert vals[11] == 255