#!/usr/bin/env python3 """ BLE Capture Proxy — capture only, no forwarding. Advertises as fake LED panel, logs all writes from phone app. No bleak (client) used — avoids adapter conflicts. Usage: sudo python3 ble-capture-only.py """ import asyncio import json import signal from datetime import datetime from bless import BlessServer, BlessGATTCharacteristic, GATTCharacteristicProperties, GATTAttributePermissions SERVICE_UUID = "49535343-fe7d-4ae5-8fa9-9fafd205e455" CHAR_WRITE_WR = "49535343-8841-43f4-a8d4-ecbe34729bb3" CHAR_WRITE2 = "49535343-6daa-4d02-abf6-19569aca69fe" CHAR_NOTIFY = "49535343-1e4d-4bd9-ba61-23c647249616" CHAR_WR_NOTIFY = "49535343-aca3-481c-91ec-d85e28a60318" LOG_FILE = "/home/seb/ble-capture-log.jsonl" msg_counter = 0 log_fh = None def log_msg(char_uuid, data): global msg_counter msg_counter += 1 short = char_uuid[-4:] hex_str = data.hex(' ') if data else "" entry = {"n": msg_counter, "ts": datetime.now().isoformat(), "char": short, "uuid": char_uuid, "hex": hex_str, "len": len(data) if data else 0} # Try to decode as ASCII if it looks like text try: text = data.decode('ascii') if text.isprintable(): entry["ascii"] = text except: pass line = json.dumps(entry) print(f"[{msg_counter:04d}] ...{short} len={entry['len']:4d} | {hex_str[:120]}") if entry.get("ascii"): print(f" ASCII: {entry['ascii']}") if log_fh: log_fh.write(line + "\n") log_fh.flush() def on_read(char: BlessGATTCharacteristic, **kwargs) -> bytearray: print(f" READ request on ...{str(char.uuid)[-4:]}") return char.value or bytearray() def on_write(char: BlessGATTCharacteristic, value: bytearray, **kwargs): uuid = str(char.uuid) log_msg(uuid, bytearray(value)) async def main(): global log_fh log_fh = open(LOG_FILE, "w") print(f"Logging to {LOG_FILE}") server = BlessServer(name="YS623B101069L") server.read_request_func = on_read server.write_request_func = on_write await server.add_new_service(SERVICE_UUID) await server.add_new_characteristic( SERVICE_UUID, CHAR_WRITE_WR, GATTCharacteristicProperties.write_without_response | GATTCharacteristicProperties.write, bytearray(b'\x00'), GATTAttributePermissions.writeable) await server.add_new_characteristic( SERVICE_UUID, CHAR_WRITE2, GATTCharacteristicProperties.write, bytearray(b'\x00'), GATTAttributePermissions.writeable) await server.add_new_characteristic( SERVICE_UUID, CHAR_NOTIFY, GATTCharacteristicProperties.notify, bytearray(b'\x00'), GATTAttributePermissions.readable) await server.add_new_characteristic( SERVICE_UUID, CHAR_WR_NOTIFY, GATTCharacteristicProperties.write | GATTCharacteristicProperties.notify, bytearray(b'\x00'), GATTAttributePermissions.writeable | GATTAttributePermissions.readable) await server.start() print("\n" + "=" * 60) print("CAPTURE PROXY RUNNING") print("=" * 60) print("Advertising as: YS623B101069L") print() print("1. Make sure real panel is OFF or out of range") print("2. Open Amusing LED app on phone") print("3. Connect to 'YS623B101069L'") print("4. Switch animations, upload images, etc.") print("5. All writes logged here + " + LOG_FILE) print("6. Ctrl+C to stop") print("=" * 60 + "\n") print("Waiting for connections...\n") stop = asyncio.Event() loop = asyncio.get_event_loop() loop.add_signal_handler(signal.SIGINT, stop.set) loop.add_signal_handler(signal.SIGTERM, stop.set) await stop.wait() print("\nShutting down...") await server.stop() if log_fh: log_fh.close() print(f"Captured {msg_counter} messages. Log: {LOG_FILE}") if __name__ == "__main__": asyncio.run(main())