saltylab-firmware/phone/video_bridge.py
sl-android 08bc23f6df feat: Phone video streaming bridge (Issue #585)
Phone side — phone/video_bridge.py:
- MJPEG streaming server for Android/Termux phone camera
- Dual camera backends: OpenCV VideoCapture (V4L2) with automatic
  fallback to termux-camera-photo for unmodified Android
- WebSocket server (ws://0.0.0.0:8765) — binary JPEG frames + JSON
  info/error control messages; supports multiple concurrent clients
- HTTP server (http://0.0.0.0:8766):
    /stream    — multipart/x-mixed-replace MJPEG
    /snapshot  — single JPEG
    /health    — JSON stats (frame count, dropped, resolution, fps)
- Thread-safe single-slot FrameBuffer; CaptureThread rate-limited with
  wall-clock accounting for capture latency
- Flags: --ws-port, --http-port, --width, --height, --fps, --quality,
  --device, --camera-id, --no-http, --debug

Jetson side — saltybot_phone/phone_camera_node.py:
- ROS2 node: receives JPEG frames, publishes:
    /saltybot/phone/camera            sensor_msgs/Image (bgr8)
    /saltybot/phone/camera/compressed sensor_msgs/CompressedImage
    /saltybot/phone/camera/info       std_msgs/String (stream metadata)
- WebSocket client (primary); HTTP MJPEG polling fallback on WS failure
- Auto-reconnect loop (default 3 s) for both transports
- Latency warning when frame age > latency_warn_ms (default 200 ms)
- 10 s diagnostics log: received/published counts + last frame age
- Registered as phone_camera_node console script in setup.py
- Added to phone_bringup.py launch with phone_host / phone_cam_enabled args

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-14 12:20:28 -04:00

478 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
video_bridge.py — Phone camera MJPEG streaming server for SaltyBot (Issue #585)
Captures frames from the Android camera and serves them over:
1. WebSocket (binary JPEG frames) on ws://0.0.0.0:<ws-port>
2. HTTP MJPEG stream (fallback) on http://0.0.0.0:<http-port>/stream
3. HTTP JPEG snapshot on http://0.0.0.0:<http-port>/snapshot
Target: 640×480 @ 15 fps, < 200 ms phone→Jetson latency.
Camera backends (tried in order)
─────────────────────────────────
1. OpenCV VideoCapture (/dev/video0 or --device) — fastest, needs V4L2
2. termux-camera-photo capture loop — universal on Termux
Uses the front or rear camera via termux-api.
WebSocket frame format
───────────────────────
Binary message: raw JPEG bytes.
Text message : JSON control frame:
{"type":"info","width":640,"height":480,"fps":15,"ts":1234567890.0}
{"type":"error","msg":"..."}
Usage
─────
python3 phone/video_bridge.py [OPTIONS]
--ws-port PORT WebSocket port (default: 8765)
--http-port PORT HTTP MJPEG port (default: 8766)
--width INT Frame width px (default: 640)
--height INT Frame height px (default: 480)
--fps FLOAT Target capture FPS (default: 15.0)
--quality INT JPEG quality 1-100 (default: 75)
--device PATH V4L2 device or camera id (default: /dev/video0)
--camera-id INT termux-camera-photo id (default: 0 = rear)
--no-http Disable HTTP server
--debug Verbose logging
Dependencies (Termux)
──────────────────────
pkg install termux-api python opencv-python
pip install websockets
"""
import argparse
import asyncio
import io
import json
import logging
import os
import subprocess
import sys
import threading
import time
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from typing import Optional
try:
import cv2
CV2_AVAILABLE = True
except ImportError:
CV2_AVAILABLE = False
try:
import websockets
WS_AVAILABLE = True
except ImportError:
WS_AVAILABLE = False
# ── Frame buffer (shared between capture, WS, HTTP) ──────────────────────────
class FrameBuffer:
"""Thread-safe single-slot frame buffer — always holds the latest JPEG."""
def __init__(self):
self._lock = threading.Lock()
self._frame: Optional[bytes] = None
self._event = threading.Event()
self.count = 0
self.dropped = 0
def put(self, jpeg_bytes: bytes) -> None:
with self._lock:
if self._frame is not None:
self.dropped += 1
self._frame = jpeg_bytes
self.count += 1
self._event.set()
def get(self, timeout: float = 1.0) -> Optional[bytes]:
"""Block until a new frame is available or timeout."""
if self._event.wait(timeout):
self._event.clear()
with self._lock:
return self._frame
return None
def latest(self) -> Optional[bytes]:
"""Return latest frame without blocking (may return None)."""
with self._lock:
return self._frame
# ── Camera backends ───────────────────────────────────────────────────────────
class OpenCVCapture:
"""Capture frames via OpenCV VideoCapture (V4L2 on Android/Linux)."""
def __init__(self, device: str, width: int, height: int, fps: float, quality: int):
self._device = device
self._width = width
self._height = height
self._fps = fps
self._quality = quality
self._cap = None
self._encode_params = [cv2.IMWRITE_JPEG_QUALITY, quality]
def open(self) -> bool:
try:
idx = int(self._device) if self._device.isdigit() else self._device
cap = cv2.VideoCapture(idx)
if not cap.isOpened():
return False
cap.set(cv2.CAP_PROP_FRAME_WIDTH, self._width)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self._height)
cap.set(cv2.CAP_PROP_FPS, self._fps)
self._cap = cap
return True
except Exception as e:
logging.debug("OpenCV open failed: %s", e)
return False
def read_jpeg(self) -> Optional[bytes]:
if self._cap is None:
return None
ret, frame = self._cap.read()
if not ret or frame is None:
return None
ok, buf = cv2.imencode(".jpg", frame, self._encode_params)
if not ok:
return None
return bytes(buf)
def close(self) -> None:
if self._cap is not None:
self._cap.release()
self._cap = None
class TermuxCapture:
"""Capture frames via termux-camera-photo (works on unmodified Android)."""
def __init__(self, camera_id: int, width: int, height: int, quality: int):
self._camera_id = camera_id
self._quality = quality
self._tmpfile = f"/data/data/com.termux/files/home/.cache/vcam_{os.getpid()}.jpg"
# Resize params for cv2 (termux-camera-photo outputs native resolution)
self._target_w = width
self._target_h = height
os.makedirs(os.path.dirname(self._tmpfile), exist_ok=True)
def open(self) -> bool:
# Test one capture
return self._capture_raw() is not None
def read_jpeg(self) -> Optional[bytes]:
raw = self._capture_raw()
if raw is None:
return None
if CV2_AVAILABLE:
return self._resize_jpeg(raw)
return raw
def _capture_raw(self) -> Optional[bytes]:
try:
r = subprocess.run(
["termux-camera-photo", "-c", str(self._camera_id), self._tmpfile],
capture_output=True, timeout=3.0,
)
if r.returncode != 0 or not os.path.exists(self._tmpfile):
return None
with open(self._tmpfile, "rb") as f:
data = f.read()
os.unlink(self._tmpfile)
return data
except Exception as e:
logging.debug("termux-camera-photo error: %s", e)
return None
def _resize_jpeg(self, raw: bytes) -> Optional[bytes]:
try:
import numpy as np
buf = np.frombuffer(raw, dtype=np.uint8)
img = cv2.imdecode(buf, cv2.IMREAD_COLOR)
if img is None:
return raw
resized = cv2.resize(img, (self._target_w, self._target_h))
ok, enc = cv2.imencode(".jpg", resized,
[cv2.IMWRITE_JPEG_QUALITY, self._quality])
return bytes(enc) if ok else raw
except Exception:
return raw
def close(self) -> None:
if os.path.exists(self._tmpfile):
try:
os.unlink(self._tmpfile)
except OSError:
pass
# ── Capture thread ────────────────────────────────────────────────────────────
class CaptureThread(threading.Thread):
"""Drives a camera backend at the target FPS, pushing JPEG into FrameBuffer."""
def __init__(self, backend, fps: float, buf: FrameBuffer):
super().__init__(name="capture", daemon=True)
self._backend = backend
self._interval = 1.0 / fps
self._buf = buf
self._running = False
def run(self) -> None:
self._running = True
logging.info("Capture thread started (%.1f Hz)", 1.0 / self._interval)
while self._running:
t0 = time.monotonic()
try:
jpeg = self._backend.read_jpeg()
if jpeg:
self._buf.put(jpeg)
except Exception as e:
logging.warning("Capture error: %s", e)
elapsed = time.monotonic() - t0
time.sleep(max(0.0, self._interval - elapsed))
def stop(self) -> None:
self._running = False
# ── HTTP MJPEG server ─────────────────────────────────────────────────────────
MJPEG_BOUNDARY = b"--mjpeg-boundary"
def _make_http_handler(buf: FrameBuffer, width: int, height: int, fps: float):
class Handler(BaseHTTPRequestHandler):
def log_message(self, fmt, *args):
logging.debug("HTTP %s", fmt % args)
def do_GET(self):
if self.path == "/stream":
self._stream()
elif self.path == "/snapshot":
self._snapshot()
elif self.path == "/health":
self._health()
else:
self.send_error(404)
def _stream(self):
self.send_response(200)
self.send_header("Content-Type",
f"multipart/x-mixed-replace; boundary={MJPEG_BOUNDARY.decode()}")
self.send_header("Cache-Control", "no-cache")
self.send_header("Connection", "close")
self.end_headers()
try:
while True:
jpeg = buf.get(timeout=2.0)
if jpeg is None:
continue
ts = time.time()
header = (
f"\r\n{MJPEG_BOUNDARY.decode()}\r\n"
f"Content-Type: image/jpeg\r\n"
f"Content-Length: {len(jpeg)}\r\n"
f"X-Timestamp: {ts:.3f}\r\n\r\n"
).encode()
self.wfile.write(header + jpeg)
self.wfile.flush()
except (BrokenPipeError, ConnectionResetError):
pass
def _snapshot(self):
jpeg = buf.latest()
if jpeg is None:
self.send_error(503, "No frame available")
return
self.send_response(200)
self.send_header("Content-Type", "image/jpeg")
self.send_header("Content-Length", str(len(jpeg)))
self.send_header("X-Timestamp", str(time.time()))
self.end_headers()
self.wfile.write(jpeg)
def _health(self):
body = json.dumps({
"status": "ok",
"frames": buf.count,
"dropped": buf.dropped,
"width": width,
"height": height,
"fps": fps,
}).encode()
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
return Handler
# ── WebSocket server ──────────────────────────────────────────────────────────
async def ws_handler(websocket, buf: FrameBuffer, width: int, height: int, fps: float):
"""Handle one WebSocket client connection — streams JPEG frames as binary messages."""
remote = websocket.remote_address
logging.info("WS client connected: %s", remote)
# Send info frame
info = json.dumps({
"type": "info",
"width": width,
"height": height,
"fps": fps,
"ts": time.time(),
})
try:
await websocket.send(info)
except Exception:
return
loop = asyncio.get_event_loop()
try:
while True:
# Offload blocking buf.get() to thread pool to keep event loop free
jpeg = await loop.run_in_executor(None, lambda: buf.get(timeout=1.0))
if jpeg is None:
continue
await websocket.send(jpeg)
except websockets.exceptions.ConnectionClosedOK:
logging.info("WS client disconnected (clean): %s", remote)
except websockets.exceptions.ConnectionClosedError as e:
logging.info("WS client disconnected (error): %s%s", remote, e)
except Exception as e:
logging.warning("WS handler error: %s", e)
# ── Statistics logger ─────────────────────────────────────────────────────────
def _stats_logger(buf: FrameBuffer, stop_evt: threading.Event) -> None:
prev = 0
while not stop_evt.wait(10.0):
delta = buf.count - prev
prev = buf.count
logging.info("Capture stats — frames=%d (+%d/10s) dropped=%d",
buf.count, delta, buf.dropped)
# ── Entrypoint ────────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(
description="SaltyBot phone video bridge (Issue #585)"
)
parser.add_argument("--ws-port", type=int, default=8765,
help="WebSocket server port (default: 8765)")
parser.add_argument("--http-port", type=int, default=8766,
help="HTTP MJPEG port (default: 8766)")
parser.add_argument("--width", type=int, default=640,
help="Frame width (default: 640)")
parser.add_argument("--height", type=int, default=480,
help="Frame height (default: 480)")
parser.add_argument("--fps", type=float, default=15.0,
help="Target FPS (default: 15)")
parser.add_argument("--quality", type=int, default=75,
help="JPEG quality 1-100 (default: 75)")
parser.add_argument("--device", default="/dev/video0",
help="V4L2 device or index (default: /dev/video0)")
parser.add_argument("--camera-id", type=int, default=0,
help="termux-camera-photo camera id (default: 0 = rear)")
parser.add_argument("--no-http", action="store_true",
help="Disable HTTP server")
parser.add_argument("--debug", action="store_true",
help="Verbose logging")
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.debug else logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
if not WS_AVAILABLE:
logging.error("websockets not installed. Run: pip install websockets")
sys.exit(1)
# ── Select and open camera backend ───────────────────────────────────────
backend = None
if CV2_AVAILABLE:
cv_backend = OpenCVCapture(args.device, args.width, args.height,
args.fps, args.quality)
if cv_backend.open():
logging.info("Using OpenCV backend (%s)", args.device)
backend = cv_backend
else:
logging.info("OpenCV backend unavailable, falling back to termux-camera-photo")
if backend is None:
tx_backend = TermuxCapture(args.camera_id, args.width, args.height, args.quality)
if tx_backend.open():
logging.info("Using termux-camera-photo backend (camera %d)", args.camera_id)
backend = tx_backend
else:
logging.error("No camera backend available. "
"Install opencv-python or termux-api.")
sys.exit(1)
# ── Frame buffer ─────────────────────────────────────────────────────────
buf = FrameBuffer()
# ── Capture thread ────────────────────────────────────────────────────────
capture = CaptureThread(backend, args.fps, buf)
capture.start()
# ── HTTP server thread ────────────────────────────────────────────────────
stop_evt = threading.Event()
if not args.no_http:
handler_cls = _make_http_handler(buf, args.width, args.height, args.fps)
http_server = ThreadingHTTPServer(("0.0.0.0", args.http_port), handler_cls)
http_thread = threading.Thread(
target=http_server.serve_forever, name="http", daemon=True
)
http_thread.start()
logging.info("HTTP MJPEG server: http://0.0.0.0:%d/stream", args.http_port)
logging.info("HTTP snapshot: http://0.0.0.0:%d/snapshot", args.http_port)
logging.info("HTTP health: http://0.0.0.0:%d/health", args.http_port)
# ── Stats logger ──────────────────────────────────────────────────────────
stats_thread = threading.Thread(
target=_stats_logger, args=(buf, stop_evt), name="stats", daemon=True
)
stats_thread.start()
# ── WebSocket server (runs the event loop) ────────────────────────────────
logging.info("WebSocket server: ws://0.0.0.0:%d", args.ws_port)
async def _run_ws():
async with websockets.serve(
lambda ws: ws_handler(ws, buf, args.width, args.height, args.fps),
"0.0.0.0",
args.ws_port,
max_size=None, # no message size limit
ping_interval=5,
ping_timeout=10,
):
logging.info("Ready — streaming %dx%d @ %.0f fps",
args.width, args.height, args.fps)
await asyncio.Future() # run forever
try:
asyncio.run(_run_ws())
except KeyboardInterrupt:
logging.info("Shutting down...")
finally:
stop_evt.set()
capture.stop()
backend.close()
if not args.no_http:
http_server.shutdown()
if __name__ == "__main__":
main()