- LED panel BLE protocol (partial RE from glove firmware) - Panel MAC: E0:6E:41:94:39:70, BLE name: YS623B101069L - led-panel-ble.py: scan, slot switch, cycle tool - ble-capture-only.py: GATT server capture proxy - Glove firmware docs (ESP32 WROOM, 4-button controller) - Slot switch command doesn't work — capture needed
122 lines
3.9 KiB
Python
122 lines
3.9 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
BLE Capture Proxy — capture only, no forwarding.
|
|
Advertises as fake LED panel, logs all writes from phone app.
|
|
No bleak (client) used — avoids adapter conflicts.
|
|
|
|
Usage: sudo python3 ble-capture-only.py
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import signal
|
|
from datetime import datetime
|
|
from bless import BlessServer, BlessGATTCharacteristic, GATTCharacteristicProperties, GATTAttributePermissions
|
|
|
|
SERVICE_UUID = "49535343-fe7d-4ae5-8fa9-9fafd205e455"
|
|
CHAR_WRITE_WR = "49535343-8841-43f4-a8d4-ecbe34729bb3"
|
|
CHAR_WRITE2 = "49535343-6daa-4d02-abf6-19569aca69fe"
|
|
CHAR_NOTIFY = "49535343-1e4d-4bd9-ba61-23c647249616"
|
|
CHAR_WR_NOTIFY = "49535343-aca3-481c-91ec-d85e28a60318"
|
|
|
|
LOG_FILE = "/home/seb/ble-capture-log.jsonl"
|
|
msg_counter = 0
|
|
log_fh = None
|
|
|
|
def log_msg(char_uuid, data):
|
|
global msg_counter
|
|
msg_counter += 1
|
|
short = char_uuid[-4:]
|
|
hex_str = data.hex(' ') if data else ""
|
|
entry = {"n": msg_counter, "ts": datetime.now().isoformat(),
|
|
"char": short, "uuid": char_uuid, "hex": hex_str, "len": len(data) if data else 0}
|
|
|
|
# Try to decode as ASCII if it looks like text
|
|
try:
|
|
text = data.decode('ascii')
|
|
if text.isprintable():
|
|
entry["ascii"] = text
|
|
except:
|
|
pass
|
|
|
|
line = json.dumps(entry)
|
|
print(f"[{msg_counter:04d}] ...{short} len={entry['len']:4d} | {hex_str[:120]}")
|
|
if entry.get("ascii"):
|
|
print(f" ASCII: {entry['ascii']}")
|
|
|
|
if log_fh:
|
|
log_fh.write(line + "\n")
|
|
log_fh.flush()
|
|
|
|
def on_read(char: BlessGATTCharacteristic, **kwargs) -> bytearray:
|
|
print(f" READ request on ...{str(char.uuid)[-4:]}")
|
|
return char.value or bytearray()
|
|
|
|
def on_write(char: BlessGATTCharacteristic, value: bytearray, **kwargs):
|
|
uuid = str(char.uuid)
|
|
log_msg(uuid, bytearray(value))
|
|
|
|
async def main():
|
|
global log_fh
|
|
|
|
log_fh = open(LOG_FILE, "w")
|
|
print(f"Logging to {LOG_FILE}")
|
|
|
|
server = BlessServer(name="YS623B101069L")
|
|
server.read_request_func = on_read
|
|
server.write_request_func = on_write
|
|
|
|
await server.add_new_service(SERVICE_UUID)
|
|
|
|
await server.add_new_characteristic(
|
|
SERVICE_UUID, CHAR_WRITE_WR,
|
|
GATTCharacteristicProperties.write_without_response | GATTCharacteristicProperties.write,
|
|
bytearray(b'\x00'), GATTAttributePermissions.writeable)
|
|
|
|
await server.add_new_characteristic(
|
|
SERVICE_UUID, CHAR_WRITE2,
|
|
GATTCharacteristicProperties.write,
|
|
bytearray(b'\x00'), GATTAttributePermissions.writeable)
|
|
|
|
await server.add_new_characteristic(
|
|
SERVICE_UUID, CHAR_NOTIFY,
|
|
GATTCharacteristicProperties.notify,
|
|
bytearray(b'\x00'), GATTAttributePermissions.readable)
|
|
|
|
await server.add_new_characteristic(
|
|
SERVICE_UUID, CHAR_WR_NOTIFY,
|
|
GATTCharacteristicProperties.write | GATTCharacteristicProperties.notify,
|
|
bytearray(b'\x00'), GATTAttributePermissions.writeable | GATTAttributePermissions.readable)
|
|
|
|
await server.start()
|
|
|
|
print("\n" + "=" * 60)
|
|
print("CAPTURE PROXY RUNNING")
|
|
print("=" * 60)
|
|
print("Advertising as: YS623B101069L")
|
|
print()
|
|
print("1. Make sure real panel is OFF or out of range")
|
|
print("2. Open Amusing LED app on phone")
|
|
print("3. Connect to 'YS623B101069L'")
|
|
print("4. Switch animations, upload images, etc.")
|
|
print("5. All writes logged here + " + LOG_FILE)
|
|
print("6. Ctrl+C to stop")
|
|
print("=" * 60 + "\n")
|
|
print("Waiting for connections...\n")
|
|
|
|
stop = asyncio.Event()
|
|
loop = asyncio.get_event_loop()
|
|
loop.add_signal_handler(signal.SIGINT, stop.set)
|
|
loop.add_signal_handler(signal.SIGTERM, stop.set)
|
|
|
|
await stop.wait()
|
|
|
|
print("\nShutting down...")
|
|
await server.stop()
|
|
if log_fh:
|
|
log_fh.close()
|
|
print(f"Captured {msg_counter} messages. Log: {LOG_FILE}")
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|