#!/usr/bin/env python3 """orin_can_drive_test.py — Emergency UART drive test for ESP32-S3 Balance board. Sends HEARTBEAT → ARM → DRIVE(speed=200, steer=0) over /dev/ttyACM0 at 460800 baud. Reads telemetry and reports via MQTT to 'max'. Frame format: [0xAA][LEN][TYPE][PAYLOAD][CRC8-SMBUS] CRC8 poly=0x07 init=0x00 covers LEN+TYPE+PAYLOAD. VESC IDs post-PR#735: 61=left, 79=right. Usage: python3 orin_can_drive_test.py [--port /dev/ttyACM0] [--duration 10] """ import argparse import json import struct import sys import threading import time try: import serial except ImportError: print("ERROR: pyserial not installed — run: pip3 install pyserial", file=sys.stderr) sys.exit(1) try: import paho.mqtt.client as mqtt MQTT_AVAILABLE = True except ImportError: print("WARNING: paho-mqtt not installed — MQTT reporting disabled", file=sys.stderr) MQTT_AVAILABLE = False # ── Protocol constants ──────────────────────────────────────────────────────── SYNC = 0xAA MAX_PAYLOAD = 62 CMD_HEARTBEAT = 0x01 CMD_DRIVE = 0x02 CMD_ESTOP = 0x03 CMD_ARM = 0x04 TELEM_STATUS = 0x80 TELEM_VESC_LEFT = 0x81 # VESC ID 61 (left) TELEM_VESC_RIGHT = 0x82 # VESC ID 79 (right) RESP_ACK = 0xA0 RESP_NACK = 0xA1 NACK_CODES = { 0x01: "ERR_BAD_CRC", 0x02: "ERR_BAD_LEN", 0x03: "ERR_ESTOP_ACTIVE", 0x04: "ERR_DISARMED", 0x05: "ERR_OTA_BUSY", 0x06: "ERR_OTA_NO_UPDATE", } VESC_ID_MAP = {TELEM_VESC_LEFT: 56, TELEM_VESC_RIGHT: 68} # ── CRC8-SMBUS (poly=0x07, init=0x00) ──────────────────────────────────────── def crc8(data: bytes) -> int: crc = 0 for b in data: crc ^= b for _ in range(8): crc = ((crc << 1) ^ 0x07) if (crc & 0x80) else (crc << 1) crc &= 0xFF return crc # ── Frame encoder ───────────────────────────────────────────────────────────── def build_frame(cmd_type: int, payload: bytes) -> bytes: ln = len(payload) crc = crc8(bytes([ln, cmd_type]) + payload) return bytes([SYNC, ln, cmd_type]) + payload + bytes([crc]) def frame_heartbeat() -> bytes: return build_frame(CMD_HEARTBEAT, b"") def frame_arm(arm: bool) -> bytes: return build_frame(CMD_ARM, bytes([0x01 if arm else 0x00])) def frame_drive(speed: int, steer: int) -> bytes: speed = max(-1000, min(1000, speed)) steer = max(-1000, min(1000, steer)) return build_frame(CMD_DRIVE, struct.pack(">hh", speed, steer)) # ── Streaming frame parser ──────────────────────────────────────────────────── class FrameParser: WAIT_SYNC, WAIT_LEN, WAIT_TYPE, WAIT_PAYLOAD, WAIT_CRC = range(5) def __init__(self): self.frames_ok = 0 self.frames_err = 0 self._reset() def _reset(self): self._state = self.WAIT_SYNC self._len = 0 self._type = 0 self._payload = bytearray() def feed(self, byte: int): s = self._state if s == self.WAIT_SYNC: if byte == SYNC: self._state = self.WAIT_LEN elif s == self.WAIT_LEN: if byte > MAX_PAYLOAD: self.frames_err += 1 self._reset() else: self._len = byte self._state = self.WAIT_TYPE elif s == self.WAIT_TYPE: self._type = byte self._payload = bytearray() self._state = self.WAIT_CRC if self._len == 0 else self.WAIT_PAYLOAD elif s == self.WAIT_PAYLOAD: self._payload.append(byte) if len(self._payload) == self._len: self._state = self.WAIT_CRC elif s == self.WAIT_CRC: expected = crc8(bytes([self._len, self._type]) + self._payload) if byte != expected: self.frames_err += 1 self._reset() return None self.frames_ok += 1 result = self._decode(self._type, bytes(self._payload)) self._reset() return result return None def _decode(self, t: int, p: bytes): try: if t == RESP_ACK: cmd = p[0] if p else 0 return {"type": "ACK", "cmd": hex(cmd)} if t == RESP_NACK: cmd, err = (p[0], p[1]) if len(p) >= 2 else (0, 0) return {"type": "NACK", "cmd": hex(cmd), "err": NACK_CODES.get(err, hex(err))} if t == TELEM_STATUS and len(p) >= 8: pitch_x10, motor_cmd, vbat_mv, state, flags = struct.unpack_from(">hhHBB", p) twai_state = (flags >> 2) & 0x3 return { "type": "STATUS", "pitch_deg": pitch_x10 / 10.0, "motor_cmd": motor_cmd, "vbat_mv": vbat_mv, "state": ["DISARMED","ARMED","TILT_FAULT","ESTOP"][state] if 0 <= state <= 3 else str(state), "flags": flags, "twai_state": ["STOPPED","RUNNING","BUS_OFF","RECOVERING"][twai_state], "can_bus_active": bool(flags & 0x10), "vesc_a_alive": bool(flags & 0x20), "vesc_b_alive": bool(flags & 0x40), } if t in (TELEM_VESC_LEFT, TELEM_VESC_RIGHT) and len(p) >= 10: erpm, voltage_mv, current_ma, temp_x10 = struct.unpack_from(">iHhH", p) return { "type": "VESC", "vesc_id": VESC_ID_MAP[t], "side": "LEFT" if t == TELEM_VESC_LEFT else "RIGHT", "erpm": erpm, "voltage_mv": voltage_mv, "current_ma": current_ma, "temp_c": temp_x10 / 10.0, } except Exception as e: return {"type": "PARSE_ERROR", "raw": p.hex(), "err": str(e)} return {"type": "UNKNOWN", "code": hex(t), "payload": p.hex()} # ── MQTT reporter ───────────────────────────────────────────────────────────── def mqtt_send(host, port, user, password, from_agent, to_agent, body): if not MQTT_AVAILABLE: print(f"[MQTT] Would send to {to_agent}: {body}", flush=True) return try: client = mqtt.Client() client.username_pw_set(user, password) client.connect(host, int(port), keepalive=5) msg = json.dumps({"from": from_agent, "to": to_agent, "body": body, "ts": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())}) client.publish(f"agents/{to_agent}/inbox", msg, qos=1) client.loop(timeout=2.0) client.disconnect() print(f"[MQTT→{to_agent}] {body[:120]}", flush=True) except Exception as e: print(f"[MQTT ERROR] {e}", flush=True) # ── Main test routine ───────────────────────────────────────────────────────── def run_test(port, baud, duration, mqtt_host, mqtt_port, mqtt_user, mqtt_pass): print(f"\n=== CAN Drive Test via UART: {port} @ {baud} baud ===", flush=True) print(f"Speed=200 Steer=0 | Duration={duration}s | VESC 56=LEFT 68=RIGHT\n", flush=True) frames_received = [] errors = [] motors_spinning = False lock = threading.Lock() try: ser = serial.Serial(port, baud, timeout=0.05) except serial.SerialException as e: msg = f"FAILED to open {port}: {e}" print(f"ERROR: {msg}", flush=True) mqtt_send(mqtt_host, mqtt_port, mqtt_user, mqtt_pass, "sl-firmware", "max", f"[ORIN UART TEST] {msg}") return parser = FrameParser() # Reader thread def reader(): while not stop_event.is_set(): try: data = ser.read(64) for b in data: frame = parser.feed(b) if frame: ts = time.strftime("%H:%M:%S") print(f" [{ts}] RX: {frame}", flush=True) with lock: frames_received.append(frame) except Exception as e: with lock: errors.append(str(e)) stop_event = threading.Event() t = threading.Thread(target=reader, daemon=True) t.start() def send(frame_bytes, label): ser.write(frame_bytes) print(f" TX: {label} raw={frame_bytes.hex()}", flush=True) # 1. HEARTBEAT send(frame_heartbeat(), "CMD_HEARTBEAT") time.sleep(0.15) # 2. ARM send(frame_arm(True), "CMD_ARM(1)") time.sleep(0.15) # Check ARM response with lock: arm_acks = [f for f in frames_received if f.get("type") == "ACK" and f.get("cmd") == hex(CMD_ARM)] arm_nacks = [f for f in frames_received if f.get("type") == "NACK" and f.get("cmd") == hex(CMD_ARM)] if arm_nacks: msg = f"ARM REJECTED: {arm_nacks[0]}" print(f"\n!!! {msg}\n", flush=True) mqtt_send(mqtt_host, mqtt_port, mqtt_user, mqtt_pass, "sl-firmware", "max", f"[ORIN UART TEST] {msg}") # 3. Send DRIVE + keep heartbeat alive print(f"\n--- Sending DRIVE speed=200 steer=0 for {duration}s ---\n", flush=True) t_end = time.time() + duration hb_interval = 0.2 drive_interval = 0.1 last_hb = 0.0 while time.time() < t_end: now = time.time() if now - last_hb >= hb_interval: send(frame_heartbeat(), "CMD_HEARTBEAT") last_hb = now send(frame_drive(200, 0), "CMD_DRIVE(speed=200,steer=0)") time.sleep(drive_interval) stop_event.set() ser.close() # ── Analyse results ── with lock: all_frames = list(frames_received) print("\n=== RESULTS ===", flush=True) print(f"Frames received: {len(all_frames)}", flush=True) print(f"Parser OK/ERR: {parser.frames_ok}/{parser.frames_err}", flush=True) acks = [f for f in all_frames if f.get("type") == "ACK"] nacks = [f for f in all_frames if f.get("type") == "NACK"] statuses = [f for f in all_frames if f.get("type") == "STATUS"] vescs = [f for f in all_frames if f.get("type") == "VESC"] print(f"ACKs: {len(acks)} NACKs: {len(nacks)}", flush=True) print(f"STATUS frames: {len(statuses)}", flush=True) print(f"VESC frames: {len(vescs)}", flush=True) if statuses: last_st = statuses[-1] can_diag = (f" twai={last_st.get('twai_state')} " f"can_bus_active={last_st.get('can_bus_active')} " f"vesc_a_alive={last_st.get('vesc_a_alive')} " f"vesc_b_alive={last_st.get('vesc_b_alive')}") print(f"\nLast STATUS: state={last_st.get('state')} vbat={last_st.get('vbat_mv')}mV", flush=True) print(f"CAN diag:{can_diag}", flush=True) left_vescs = [f for f in vescs if f.get("vesc_id") == 56] right_vescs = [f for f in vescs if f.get("vesc_id") == 68] if left_vescs: lv = left_vescs[-1] print(f"LEFT VESC(56): erpm={lv['erpm']} {lv['voltage_mv']}mV {lv['current_ma']}mA {lv['temp_c']}°C", flush=True) if right_vescs: rv = right_vescs[-1] print(f"RIGHT VESC(68): erpm={rv['erpm']} {rv['voltage_mv']}mV {rv['current_ma']}mA {rv['temp_c']}°C", flush=True) left_erpm = left_vescs[-1]["erpm"] if left_vescs else None right_erpm = right_vescs[-1]["erpm"] if right_vescs else None motors_spinning = bool(left_erpm and abs(left_erpm) > 50 or right_erpm and abs(right_erpm) > 50) print(f"\nMotors spinning? {'YES ✓' if motors_spinning else 'NO — check CAN/VESC'}", flush=True) if not vescs and statuses: st = statuses[-1] if not st.get("can_bus_active"): print(" → can_bus_active=False: no CAN frames received at all. Check wiring.", flush=True) elif not st.get("vesc_a_alive") and not st.get("vesc_b_alive"): print(" → CAN bus active but no PONG from either VESC. VESCs not broadcasting STATUS.", flush=True) print(" Fix: enable CAN Status Message Mode (mode ≥ 1) in VESC Tool for both VESCs.", flush=True) if nacks: print(f"NACKs received: {nacks}", flush=True) # ── MQTT report to max ── can_bus = statuses[-1].get("can_bus_active", "?") if statuses else "?" va = statuses[-1].get("vesc_a_alive", "?") if statuses else "?" vb = statuses[-1].get("vesc_b_alive", "?") if statuses else "?" summary_lines = [ f"[ORIN UART TEST] port={port} baud={baud} speed=200 steer=0 dur={duration}s", f"Frames: {len(all_frames)} ok | parser {parser.frames_ok}/{parser.frames_err}", f"ACKs={len(acks)} NACKs={len(nacks)} STATUS={len(statuses)} VESC={len(vescs)}", f"CAN: bus_active={can_bus} vesc_a={va} vesc_b={vb}", ] if statuses: st = statuses[-1] summary_lines.append( f"Balance state={st.get('state')} vbat={st.get('vbat_mv')}mV twai={st.get('twai_state')}" ) if left_vescs: summary_lines.append(f"LEFT(56): erpm={left_erpm} {left_vescs[-1]['voltage_mv']}mV {left_vescs[-1]['current_ma']}mA") if right_vescs: summary_lines.append(f"RIGHT(68): erpm={right_erpm} {right_vescs[-1]['voltage_mv']}mV {right_vescs[-1]['current_ma']}mA") if nacks: summary_lines.append(f"NACK details: {nacks[:3]}") summary_lines.append(f"MOTORS SPINNING: {'YES' if motors_spinning else 'NO'}") report = " | ".join(summary_lines) mqtt_send(mqtt_host, mqtt_port, mqtt_user, mqtt_pass, "sl-firmware", "max", report) return motors_spinning if __name__ == "__main__": ap = argparse.ArgumentParser() ap.add_argument("--port", default="/dev/ttyACM0") ap.add_argument("--baud", type=int, default=460800) ap.add_argument("--duration", type=int, default=10, help="seconds to run drive loop") ap.add_argument("--mqtt-host", default="192.168.87.29") ap.add_argument("--mqtt-port", type=int, default=1883) ap.add_argument("--mqtt-user", default="mqtt_seb") ap.add_argument("--mqtt-pass", default="mqtt_pass") args = ap.parse_args() run_test( port=args.port, baud=args.baud, duration=args.duration, mqtt_host=args.mqtt_host, mqtt_port=args.mqtt_port, mqtt_user=args.mqtt_user, mqtt_pass=args.mqtt_pass, )