#!/usr/bin/env python3 """ video_bridge.py — Phone camera MJPEG streaming server for SaltyBot (Issue #585) Captures frames from the Android camera and serves them over: 1. WebSocket (binary JPEG frames) on ws://0.0.0.0: 2. HTTP MJPEG stream (fallback) on http://0.0.0.0:/stream 3. HTTP JPEG snapshot on http://0.0.0.0:/snapshot Target: 640×480 @ 15 fps, < 200 ms phone→Jetson latency. Camera backends (tried in order) ───────────────────────────────── 1. OpenCV VideoCapture (/dev/video0 or --device) — fastest, needs V4L2 2. termux-camera-photo capture loop — universal on Termux Uses the front or rear camera via termux-api. WebSocket frame format ─────────────────────── Binary message: raw JPEG bytes. Text message : JSON control frame: {"type":"info","width":640,"height":480,"fps":15,"ts":1234567890.0} {"type":"error","msg":"..."} Usage ───── python3 phone/video_bridge.py [OPTIONS] --ws-port PORT WebSocket port (default: 8765) --http-port PORT HTTP MJPEG port (default: 8766) --width INT Frame width px (default: 640) --height INT Frame height px (default: 480) --fps FLOAT Target capture FPS (default: 15.0) --quality INT JPEG quality 1-100 (default: 75) --device PATH V4L2 device or camera id (default: /dev/video0) --camera-id INT termux-camera-photo id (default: 0 = rear) --no-http Disable HTTP server --debug Verbose logging Dependencies (Termux) ────────────────────── pkg install termux-api python opencv-python pip install websockets """ import argparse import asyncio import io import json import logging import os import subprocess import sys import threading import time from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from typing import Optional try: import cv2 CV2_AVAILABLE = True except ImportError: CV2_AVAILABLE = False try: import websockets WS_AVAILABLE = True except ImportError: WS_AVAILABLE = False # ── Frame buffer (shared between capture, WS, HTTP) ────────────────────────── class FrameBuffer: """Thread-safe single-slot frame buffer — always holds the latest JPEG.""" def __init__(self): self._lock = threading.Lock() self._frame: Optional[bytes] = None self._event = threading.Event() self.count = 0 self.dropped = 0 def put(self, jpeg_bytes: bytes) -> None: with self._lock: if self._frame is not None: self.dropped += 1 self._frame = jpeg_bytes self.count += 1 self._event.set() def get(self, timeout: float = 1.0) -> Optional[bytes]: """Block until a new frame is available or timeout.""" if self._event.wait(timeout): self._event.clear() with self._lock: return self._frame return None def latest(self) -> Optional[bytes]: """Return latest frame without blocking (may return None).""" with self._lock: return self._frame # ── Camera backends ─────────────────────────────────────────────────────────── class OpenCVCapture: """Capture frames via OpenCV VideoCapture (V4L2 on Android/Linux).""" def __init__(self, device: str, width: int, height: int, fps: float, quality: int): self._device = device self._width = width self._height = height self._fps = fps self._quality = quality self._cap = None self._encode_params = [cv2.IMWRITE_JPEG_QUALITY, quality] def open(self) -> bool: try: idx = int(self._device) if self._device.isdigit() else self._device cap = cv2.VideoCapture(idx) if not cap.isOpened(): return False cap.set(cv2.CAP_PROP_FRAME_WIDTH, self._width) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self._height) cap.set(cv2.CAP_PROP_FPS, self._fps) self._cap = cap return True except Exception as e: logging.debug("OpenCV open failed: %s", e) return False def read_jpeg(self) -> Optional[bytes]: if self._cap is None: return None ret, frame = self._cap.read() if not ret or frame is None: return None ok, buf = cv2.imencode(".jpg", frame, self._encode_params) if not ok: return None return bytes(buf) def close(self) -> None: if self._cap is not None: self._cap.release() self._cap = None class TermuxCapture: """Capture frames via termux-camera-photo (works on unmodified Android).""" def __init__(self, camera_id: int, width: int, height: int, quality: int): self._camera_id = camera_id self._quality = quality self._tmpfile = f"/data/data/com.termux/files/home/.cache/vcam_{os.getpid()}.jpg" # Resize params for cv2 (termux-camera-photo outputs native resolution) self._target_w = width self._target_h = height os.makedirs(os.path.dirname(self._tmpfile), exist_ok=True) def open(self) -> bool: # Test one capture return self._capture_raw() is not None def read_jpeg(self) -> Optional[bytes]: raw = self._capture_raw() if raw is None: return None if CV2_AVAILABLE: return self._resize_jpeg(raw) return raw def _capture_raw(self) -> Optional[bytes]: try: r = subprocess.run( ["termux-camera-photo", "-c", str(self._camera_id), self._tmpfile], capture_output=True, timeout=3.0, ) if r.returncode != 0 or not os.path.exists(self._tmpfile): return None with open(self._tmpfile, "rb") as f: data = f.read() os.unlink(self._tmpfile) return data except Exception as e: logging.debug("termux-camera-photo error: %s", e) return None def _resize_jpeg(self, raw: bytes) -> Optional[bytes]: try: import numpy as np buf = np.frombuffer(raw, dtype=np.uint8) img = cv2.imdecode(buf, cv2.IMREAD_COLOR) if img is None: return raw resized = cv2.resize(img, (self._target_w, self._target_h)) ok, enc = cv2.imencode(".jpg", resized, [cv2.IMWRITE_JPEG_QUALITY, self._quality]) return bytes(enc) if ok else raw except Exception: return raw def close(self) -> None: if os.path.exists(self._tmpfile): try: os.unlink(self._tmpfile) except OSError: pass # ── Capture thread ──────────────────────────────────────────────────────────── class CaptureThread(threading.Thread): """Drives a camera backend at the target FPS, pushing JPEG into FrameBuffer.""" def __init__(self, backend, fps: float, buf: FrameBuffer): super().__init__(name="capture", daemon=True) self._backend = backend self._interval = 1.0 / fps self._buf = buf self._running = False def run(self) -> None: self._running = True logging.info("Capture thread started (%.1f Hz)", 1.0 / self._interval) while self._running: t0 = time.monotonic() try: jpeg = self._backend.read_jpeg() if jpeg: self._buf.put(jpeg) except Exception as e: logging.warning("Capture error: %s", e) elapsed = time.monotonic() - t0 time.sleep(max(0.0, self._interval - elapsed)) def stop(self) -> None: self._running = False # ── HTTP MJPEG server ───────────────────────────────────────────────────────── MJPEG_BOUNDARY = b"--mjpeg-boundary" def _make_http_handler(buf: FrameBuffer, width: int, height: int, fps: float): class Handler(BaseHTTPRequestHandler): def log_message(self, fmt, *args): logging.debug("HTTP %s", fmt % args) def do_GET(self): if self.path == "/stream": self._stream() elif self.path == "/snapshot": self._snapshot() elif self.path == "/health": self._health() else: self.send_error(404) def _stream(self): self.send_response(200) self.send_header("Content-Type", f"multipart/x-mixed-replace; boundary={MJPEG_BOUNDARY.decode()}") self.send_header("Cache-Control", "no-cache") self.send_header("Connection", "close") self.end_headers() try: while True: jpeg = buf.get(timeout=2.0) if jpeg is None: continue ts = time.time() header = ( f"\r\n{MJPEG_BOUNDARY.decode()}\r\n" f"Content-Type: image/jpeg\r\n" f"Content-Length: {len(jpeg)}\r\n" f"X-Timestamp: {ts:.3f}\r\n\r\n" ).encode() self.wfile.write(header + jpeg) self.wfile.flush() except (BrokenPipeError, ConnectionResetError): pass def _snapshot(self): jpeg = buf.latest() if jpeg is None: self.send_error(503, "No frame available") return self.send_response(200) self.send_header("Content-Type", "image/jpeg") self.send_header("Content-Length", str(len(jpeg))) self.send_header("X-Timestamp", str(time.time())) self.end_headers() self.wfile.write(jpeg) def _health(self): body = json.dumps({ "status": "ok", "frames": buf.count, "dropped": buf.dropped, "width": width, "height": height, "fps": fps, }).encode() self.send_response(200) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) return Handler # ── WebSocket server ────────────────────────────────────────────────────────── async def ws_handler(websocket, buf: FrameBuffer, width: int, height: int, fps: float): """Handle one WebSocket client connection — streams JPEG frames as binary messages.""" remote = websocket.remote_address logging.info("WS client connected: %s", remote) # Send info frame info = json.dumps({ "type": "info", "width": width, "height": height, "fps": fps, "ts": time.time(), }) try: await websocket.send(info) except Exception: return loop = asyncio.get_event_loop() try: while True: # Offload blocking buf.get() to thread pool to keep event loop free jpeg = await loop.run_in_executor(None, lambda: buf.get(timeout=1.0)) if jpeg is None: continue await websocket.send(jpeg) except websockets.exceptions.ConnectionClosedOK: logging.info("WS client disconnected (clean): %s", remote) except websockets.exceptions.ConnectionClosedError as e: logging.info("WS client disconnected (error): %s — %s", remote, e) except Exception as e: logging.warning("WS handler error: %s", e) # ── Statistics logger ───────────────────────────────────────────────────────── def _stats_logger(buf: FrameBuffer, stop_evt: threading.Event) -> None: prev = 0 while not stop_evt.wait(10.0): delta = buf.count - prev prev = buf.count logging.info("Capture stats — frames=%d (+%d/10s) dropped=%d", buf.count, delta, buf.dropped) # ── Entrypoint ──────────────────────────────────────────────────────────────── def main() -> None: parser = argparse.ArgumentParser( description="SaltyBot phone video bridge (Issue #585)" ) parser.add_argument("--ws-port", type=int, default=8765, help="WebSocket server port (default: 8765)") parser.add_argument("--http-port", type=int, default=8766, help="HTTP MJPEG port (default: 8766)") parser.add_argument("--width", type=int, default=640, help="Frame width (default: 640)") parser.add_argument("--height", type=int, default=480, help="Frame height (default: 480)") parser.add_argument("--fps", type=float, default=15.0, help="Target FPS (default: 15)") parser.add_argument("--quality", type=int, default=75, help="JPEG quality 1-100 (default: 75)") parser.add_argument("--device", default="/dev/video0", help="V4L2 device or index (default: /dev/video0)") parser.add_argument("--camera-id", type=int, default=0, help="termux-camera-photo camera id (default: 0 = rear)") parser.add_argument("--no-http", action="store_true", help="Disable HTTP server") parser.add_argument("--debug", action="store_true", help="Verbose logging") args = parser.parse_args() logging.basicConfig( level=logging.DEBUG if args.debug else logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) if not WS_AVAILABLE: logging.error("websockets not installed. Run: pip install websockets") sys.exit(1) # ── Select and open camera backend ─────────────────────────────────────── backend = None if CV2_AVAILABLE: cv_backend = OpenCVCapture(args.device, args.width, args.height, args.fps, args.quality) if cv_backend.open(): logging.info("Using OpenCV backend (%s)", args.device) backend = cv_backend else: logging.info("OpenCV backend unavailable, falling back to termux-camera-photo") if backend is None: tx_backend = TermuxCapture(args.camera_id, args.width, args.height, args.quality) if tx_backend.open(): logging.info("Using termux-camera-photo backend (camera %d)", args.camera_id) backend = tx_backend else: logging.error("No camera backend available. " "Install opencv-python or termux-api.") sys.exit(1) # ── Frame buffer ───────────────────────────────────────────────────────── buf = FrameBuffer() # ── Capture thread ──────────────────────────────────────────────────────── capture = CaptureThread(backend, args.fps, buf) capture.start() # ── HTTP server thread ──────────────────────────────────────────────────── stop_evt = threading.Event() if not args.no_http: handler_cls = _make_http_handler(buf, args.width, args.height, args.fps) http_server = ThreadingHTTPServer(("0.0.0.0", args.http_port), handler_cls) http_thread = threading.Thread( target=http_server.serve_forever, name="http", daemon=True ) http_thread.start() logging.info("HTTP MJPEG server: http://0.0.0.0:%d/stream", args.http_port) logging.info("HTTP snapshot: http://0.0.0.0:%d/snapshot", args.http_port) logging.info("HTTP health: http://0.0.0.0:%d/health", args.http_port) # ── Stats logger ────────────────────────────────────────────────────────── stats_thread = threading.Thread( target=_stats_logger, args=(buf, stop_evt), name="stats", daemon=True ) stats_thread.start() # ── WebSocket server (runs the event loop) ──────────────────────────────── logging.info("WebSocket server: ws://0.0.0.0:%d", args.ws_port) async def _run_ws(): async with websockets.serve( lambda ws: ws_handler(ws, buf, args.width, args.height, args.fps), "0.0.0.0", args.ws_port, max_size=None, # no message size limit ping_interval=5, ping_timeout=10, ): logging.info("Ready — streaming %dx%d @ %.0f fps", args.width, args.height, args.fps) await asyncio.Future() # run forever try: asyncio.run(_run_ws()) except KeyboardInterrupt: logging.info("Shutting down...") finally: stop_evt.set() capture.stop() backend.close() if not args.no_http: http_server.shutdown() if __name__ == "__main__": main()