Phone side — phone/video_bridge.py: - MJPEG streaming server for Android/Termux phone camera - Dual camera backends: OpenCV VideoCapture (V4L2) with automatic fallback to termux-camera-photo for unmodified Android - WebSocket server (ws://0.0.0.0:8765) — binary JPEG frames + JSON info/error control messages; supports multiple concurrent clients - HTTP server (http://0.0.0.0:8766): /stream — multipart/x-mixed-replace MJPEG /snapshot — single JPEG /health — JSON stats (frame count, dropped, resolution, fps) - Thread-safe single-slot FrameBuffer; CaptureThread rate-limited with wall-clock accounting for capture latency - Flags: --ws-port, --http-port, --width, --height, --fps, --quality, --device, --camera-id, --no-http, --debug Jetson side — saltybot_phone/phone_camera_node.py: - ROS2 node: receives JPEG frames, publishes: /saltybot/phone/camera sensor_msgs/Image (bgr8) /saltybot/phone/camera/compressed sensor_msgs/CompressedImage /saltybot/phone/camera/info std_msgs/String (stream metadata) - WebSocket client (primary); HTTP MJPEG polling fallback on WS failure - Auto-reconnect loop (default 3 s) for both transports - Latency warning when frame age > latency_warn_ms (default 200 ms) - 10 s diagnostics log: received/published counts + last frame age - Registered as phone_camera_node console script in setup.py - Added to phone_bringup.py launch with phone_host / phone_cam_enabled args Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
478 lines
18 KiB
Python
478 lines
18 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
video_bridge.py — Phone camera MJPEG streaming server for SaltyBot (Issue #585)
|
||
|
||
Captures frames from the Android camera and serves them over:
|
||
1. WebSocket (binary JPEG frames) on ws://0.0.0.0:<ws-port>
|
||
2. HTTP MJPEG stream (fallback) on http://0.0.0.0:<http-port>/stream
|
||
3. HTTP JPEG snapshot on http://0.0.0.0:<http-port>/snapshot
|
||
|
||
Target: 640×480 @ 15 fps, < 200 ms phone→Jetson latency.
|
||
|
||
Camera backends (tried in order)
|
||
─────────────────────────────────
|
||
1. OpenCV VideoCapture (/dev/video0 or --device) — fastest, needs V4L2
|
||
2. termux-camera-photo capture loop — universal on Termux
|
||
Uses the front or rear camera via termux-api.
|
||
|
||
WebSocket frame format
|
||
───────────────────────
|
||
Binary message: raw JPEG bytes.
|
||
Text message : JSON control frame:
|
||
{"type":"info","width":640,"height":480,"fps":15,"ts":1234567890.0}
|
||
{"type":"error","msg":"..."}
|
||
|
||
Usage
|
||
─────
|
||
python3 phone/video_bridge.py [OPTIONS]
|
||
|
||
--ws-port PORT WebSocket port (default: 8765)
|
||
--http-port PORT HTTP MJPEG port (default: 8766)
|
||
--width INT Frame width px (default: 640)
|
||
--height INT Frame height px (default: 480)
|
||
--fps FLOAT Target capture FPS (default: 15.0)
|
||
--quality INT JPEG quality 1-100 (default: 75)
|
||
--device PATH V4L2 device or camera id (default: /dev/video0)
|
||
--camera-id INT termux-camera-photo id (default: 0 = rear)
|
||
--no-http Disable HTTP server
|
||
--debug Verbose logging
|
||
|
||
Dependencies (Termux)
|
||
──────────────────────
|
||
pkg install termux-api python opencv-python
|
||
pip install websockets
|
||
"""
|
||
|
||
import argparse
|
||
import asyncio
|
||
import io
|
||
import json
|
||
import logging
|
||
import os
|
||
import subprocess
|
||
import sys
|
||
import threading
|
||
import time
|
||
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
||
from typing import Optional
|
||
|
||
try:
|
||
import cv2
|
||
CV2_AVAILABLE = True
|
||
except ImportError:
|
||
CV2_AVAILABLE = False
|
||
|
||
try:
|
||
import websockets
|
||
WS_AVAILABLE = True
|
||
except ImportError:
|
||
WS_AVAILABLE = False
|
||
|
||
# ── Frame buffer (shared between capture, WS, HTTP) ──────────────────────────
|
||
|
||
class FrameBuffer:
|
||
"""Thread-safe single-slot frame buffer — always holds the latest JPEG."""
|
||
|
||
def __init__(self):
|
||
self._lock = threading.Lock()
|
||
self._frame: Optional[bytes] = None
|
||
self._event = threading.Event()
|
||
self.count = 0
|
||
self.dropped = 0
|
||
|
||
def put(self, jpeg_bytes: bytes) -> None:
|
||
with self._lock:
|
||
if self._frame is not None:
|
||
self.dropped += 1
|
||
self._frame = jpeg_bytes
|
||
self.count += 1
|
||
self._event.set()
|
||
|
||
def get(self, timeout: float = 1.0) -> Optional[bytes]:
|
||
"""Block until a new frame is available or timeout."""
|
||
if self._event.wait(timeout):
|
||
self._event.clear()
|
||
with self._lock:
|
||
return self._frame
|
||
return None
|
||
|
||
def latest(self) -> Optional[bytes]:
|
||
"""Return latest frame without blocking (may return None)."""
|
||
with self._lock:
|
||
return self._frame
|
||
|
||
|
||
# ── Camera backends ───────────────────────────────────────────────────────────
|
||
|
||
class OpenCVCapture:
|
||
"""Capture frames via OpenCV VideoCapture (V4L2 on Android/Linux)."""
|
||
|
||
def __init__(self, device: str, width: int, height: int, fps: float, quality: int):
|
||
self._device = device
|
||
self._width = width
|
||
self._height = height
|
||
self._fps = fps
|
||
self._quality = quality
|
||
self._cap = None
|
||
self._encode_params = [cv2.IMWRITE_JPEG_QUALITY, quality]
|
||
|
||
def open(self) -> bool:
|
||
try:
|
||
idx = int(self._device) if self._device.isdigit() else self._device
|
||
cap = cv2.VideoCapture(idx)
|
||
if not cap.isOpened():
|
||
return False
|
||
cap.set(cv2.CAP_PROP_FRAME_WIDTH, self._width)
|
||
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self._height)
|
||
cap.set(cv2.CAP_PROP_FPS, self._fps)
|
||
self._cap = cap
|
||
return True
|
||
except Exception as e:
|
||
logging.debug("OpenCV open failed: %s", e)
|
||
return False
|
||
|
||
def read_jpeg(self) -> Optional[bytes]:
|
||
if self._cap is None:
|
||
return None
|
||
ret, frame = self._cap.read()
|
||
if not ret or frame is None:
|
||
return None
|
||
ok, buf = cv2.imencode(".jpg", frame, self._encode_params)
|
||
if not ok:
|
||
return None
|
||
return bytes(buf)
|
||
|
||
def close(self) -> None:
|
||
if self._cap is not None:
|
||
self._cap.release()
|
||
self._cap = None
|
||
|
||
|
||
class TermuxCapture:
|
||
"""Capture frames via termux-camera-photo (works on unmodified Android)."""
|
||
|
||
def __init__(self, camera_id: int, width: int, height: int, quality: int):
|
||
self._camera_id = camera_id
|
||
self._quality = quality
|
||
self._tmpfile = f"/data/data/com.termux/files/home/.cache/vcam_{os.getpid()}.jpg"
|
||
# Resize params for cv2 (termux-camera-photo outputs native resolution)
|
||
self._target_w = width
|
||
self._target_h = height
|
||
os.makedirs(os.path.dirname(self._tmpfile), exist_ok=True)
|
||
|
||
def open(self) -> bool:
|
||
# Test one capture
|
||
return self._capture_raw() is not None
|
||
|
||
def read_jpeg(self) -> Optional[bytes]:
|
||
raw = self._capture_raw()
|
||
if raw is None:
|
||
return None
|
||
if CV2_AVAILABLE:
|
||
return self._resize_jpeg(raw)
|
||
return raw
|
||
|
||
def _capture_raw(self) -> Optional[bytes]:
|
||
try:
|
||
r = subprocess.run(
|
||
["termux-camera-photo", "-c", str(self._camera_id), self._tmpfile],
|
||
capture_output=True, timeout=3.0,
|
||
)
|
||
if r.returncode != 0 or not os.path.exists(self._tmpfile):
|
||
return None
|
||
with open(self._tmpfile, "rb") as f:
|
||
data = f.read()
|
||
os.unlink(self._tmpfile)
|
||
return data
|
||
except Exception as e:
|
||
logging.debug("termux-camera-photo error: %s", e)
|
||
return None
|
||
|
||
def _resize_jpeg(self, raw: bytes) -> Optional[bytes]:
|
||
try:
|
||
import numpy as np
|
||
buf = np.frombuffer(raw, dtype=np.uint8)
|
||
img = cv2.imdecode(buf, cv2.IMREAD_COLOR)
|
||
if img is None:
|
||
return raw
|
||
resized = cv2.resize(img, (self._target_w, self._target_h))
|
||
ok, enc = cv2.imencode(".jpg", resized,
|
||
[cv2.IMWRITE_JPEG_QUALITY, self._quality])
|
||
return bytes(enc) if ok else raw
|
||
except Exception:
|
||
return raw
|
||
|
||
def close(self) -> None:
|
||
if os.path.exists(self._tmpfile):
|
||
try:
|
||
os.unlink(self._tmpfile)
|
||
except OSError:
|
||
pass
|
||
|
||
|
||
# ── Capture thread ────────────────────────────────────────────────────────────
|
||
|
||
class CaptureThread(threading.Thread):
|
||
"""Drives a camera backend at the target FPS, pushing JPEG into FrameBuffer."""
|
||
|
||
def __init__(self, backend, fps: float, buf: FrameBuffer):
|
||
super().__init__(name="capture", daemon=True)
|
||
self._backend = backend
|
||
self._interval = 1.0 / fps
|
||
self._buf = buf
|
||
self._running = False
|
||
|
||
def run(self) -> None:
|
||
self._running = True
|
||
logging.info("Capture thread started (%.1f Hz)", 1.0 / self._interval)
|
||
while self._running:
|
||
t0 = time.monotonic()
|
||
try:
|
||
jpeg = self._backend.read_jpeg()
|
||
if jpeg:
|
||
self._buf.put(jpeg)
|
||
except Exception as e:
|
||
logging.warning("Capture error: %s", e)
|
||
elapsed = time.monotonic() - t0
|
||
time.sleep(max(0.0, self._interval - elapsed))
|
||
|
||
def stop(self) -> None:
|
||
self._running = False
|
||
|
||
|
||
# ── HTTP MJPEG server ─────────────────────────────────────────────────────────
|
||
|
||
MJPEG_BOUNDARY = b"--mjpeg-boundary"
|
||
|
||
def _make_http_handler(buf: FrameBuffer, width: int, height: int, fps: float):
|
||
|
||
class Handler(BaseHTTPRequestHandler):
|
||
def log_message(self, fmt, *args):
|
||
logging.debug("HTTP %s", fmt % args)
|
||
|
||
def do_GET(self):
|
||
if self.path == "/stream":
|
||
self._stream()
|
||
elif self.path == "/snapshot":
|
||
self._snapshot()
|
||
elif self.path == "/health":
|
||
self._health()
|
||
else:
|
||
self.send_error(404)
|
||
|
||
def _stream(self):
|
||
self.send_response(200)
|
||
self.send_header("Content-Type",
|
||
f"multipart/x-mixed-replace; boundary={MJPEG_BOUNDARY.decode()}")
|
||
self.send_header("Cache-Control", "no-cache")
|
||
self.send_header("Connection", "close")
|
||
self.end_headers()
|
||
try:
|
||
while True:
|
||
jpeg = buf.get(timeout=2.0)
|
||
if jpeg is None:
|
||
continue
|
||
ts = time.time()
|
||
header = (
|
||
f"\r\n{MJPEG_BOUNDARY.decode()}\r\n"
|
||
f"Content-Type: image/jpeg\r\n"
|
||
f"Content-Length: {len(jpeg)}\r\n"
|
||
f"X-Timestamp: {ts:.3f}\r\n\r\n"
|
||
).encode()
|
||
self.wfile.write(header + jpeg)
|
||
self.wfile.flush()
|
||
except (BrokenPipeError, ConnectionResetError):
|
||
pass
|
||
|
||
def _snapshot(self):
|
||
jpeg = buf.latest()
|
||
if jpeg is None:
|
||
self.send_error(503, "No frame available")
|
||
return
|
||
self.send_response(200)
|
||
self.send_header("Content-Type", "image/jpeg")
|
||
self.send_header("Content-Length", str(len(jpeg)))
|
||
self.send_header("X-Timestamp", str(time.time()))
|
||
self.end_headers()
|
||
self.wfile.write(jpeg)
|
||
|
||
def _health(self):
|
||
body = json.dumps({
|
||
"status": "ok",
|
||
"frames": buf.count,
|
||
"dropped": buf.dropped,
|
||
"width": width,
|
||
"height": height,
|
||
"fps": fps,
|
||
}).encode()
|
||
self.send_response(200)
|
||
self.send_header("Content-Type", "application/json")
|
||
self.send_header("Content-Length", str(len(body)))
|
||
self.end_headers()
|
||
self.wfile.write(body)
|
||
|
||
return Handler
|
||
|
||
|
||
# ── WebSocket server ──────────────────────────────────────────────────────────
|
||
|
||
async def ws_handler(websocket, buf: FrameBuffer, width: int, height: int, fps: float):
|
||
"""Handle one WebSocket client connection — streams JPEG frames as binary messages."""
|
||
remote = websocket.remote_address
|
||
logging.info("WS client connected: %s", remote)
|
||
|
||
# Send info frame
|
||
info = json.dumps({
|
||
"type": "info",
|
||
"width": width,
|
||
"height": height,
|
||
"fps": fps,
|
||
"ts": time.time(),
|
||
})
|
||
try:
|
||
await websocket.send(info)
|
||
except Exception:
|
||
return
|
||
|
||
loop = asyncio.get_event_loop()
|
||
try:
|
||
while True:
|
||
# Offload blocking buf.get() to thread pool to keep event loop free
|
||
jpeg = await loop.run_in_executor(None, lambda: buf.get(timeout=1.0))
|
||
if jpeg is None:
|
||
continue
|
||
await websocket.send(jpeg)
|
||
except websockets.exceptions.ConnectionClosedOK:
|
||
logging.info("WS client disconnected (clean): %s", remote)
|
||
except websockets.exceptions.ConnectionClosedError as e:
|
||
logging.info("WS client disconnected (error): %s — %s", remote, e)
|
||
except Exception as e:
|
||
logging.warning("WS handler error: %s", e)
|
||
|
||
|
||
# ── Statistics logger ─────────────────────────────────────────────────────────
|
||
|
||
def _stats_logger(buf: FrameBuffer, stop_evt: threading.Event) -> None:
|
||
prev = 0
|
||
while not stop_evt.wait(10.0):
|
||
delta = buf.count - prev
|
||
prev = buf.count
|
||
logging.info("Capture stats — frames=%d (+%d/10s) dropped=%d",
|
||
buf.count, delta, buf.dropped)
|
||
|
||
|
||
# ── Entrypoint ────────────────────────────────────────────────────────────────
|
||
|
||
def main() -> None:
|
||
parser = argparse.ArgumentParser(
|
||
description="SaltyBot phone video bridge (Issue #585)"
|
||
)
|
||
parser.add_argument("--ws-port", type=int, default=8765,
|
||
help="WebSocket server port (default: 8765)")
|
||
parser.add_argument("--http-port", type=int, default=8766,
|
||
help="HTTP MJPEG port (default: 8766)")
|
||
parser.add_argument("--width", type=int, default=640,
|
||
help="Frame width (default: 640)")
|
||
parser.add_argument("--height", type=int, default=480,
|
||
help="Frame height (default: 480)")
|
||
parser.add_argument("--fps", type=float, default=15.0,
|
||
help="Target FPS (default: 15)")
|
||
parser.add_argument("--quality", type=int, default=75,
|
||
help="JPEG quality 1-100 (default: 75)")
|
||
parser.add_argument("--device", default="/dev/video0",
|
||
help="V4L2 device or index (default: /dev/video0)")
|
||
parser.add_argument("--camera-id", type=int, default=0,
|
||
help="termux-camera-photo camera id (default: 0 = rear)")
|
||
parser.add_argument("--no-http", action="store_true",
|
||
help="Disable HTTP server")
|
||
parser.add_argument("--debug", action="store_true",
|
||
help="Verbose logging")
|
||
args = parser.parse_args()
|
||
|
||
logging.basicConfig(
|
||
level=logging.DEBUG if args.debug else logging.INFO,
|
||
format="%(asctime)s [%(levelname)s] %(message)s",
|
||
)
|
||
|
||
if not WS_AVAILABLE:
|
||
logging.error("websockets not installed. Run: pip install websockets")
|
||
sys.exit(1)
|
||
|
||
# ── Select and open camera backend ───────────────────────────────────────
|
||
backend = None
|
||
if CV2_AVAILABLE:
|
||
cv_backend = OpenCVCapture(args.device, args.width, args.height,
|
||
args.fps, args.quality)
|
||
if cv_backend.open():
|
||
logging.info("Using OpenCV backend (%s)", args.device)
|
||
backend = cv_backend
|
||
else:
|
||
logging.info("OpenCV backend unavailable, falling back to termux-camera-photo")
|
||
|
||
if backend is None:
|
||
tx_backend = TermuxCapture(args.camera_id, args.width, args.height, args.quality)
|
||
if tx_backend.open():
|
||
logging.info("Using termux-camera-photo backend (camera %d)", args.camera_id)
|
||
backend = tx_backend
|
||
else:
|
||
logging.error("No camera backend available. "
|
||
"Install opencv-python or termux-api.")
|
||
sys.exit(1)
|
||
|
||
# ── Frame buffer ─────────────────────────────────────────────────────────
|
||
buf = FrameBuffer()
|
||
|
||
# ── Capture thread ────────────────────────────────────────────────────────
|
||
capture = CaptureThread(backend, args.fps, buf)
|
||
capture.start()
|
||
|
||
# ── HTTP server thread ────────────────────────────────────────────────────
|
||
stop_evt = threading.Event()
|
||
if not args.no_http:
|
||
handler_cls = _make_http_handler(buf, args.width, args.height, args.fps)
|
||
http_server = ThreadingHTTPServer(("0.0.0.0", args.http_port), handler_cls)
|
||
http_thread = threading.Thread(
|
||
target=http_server.serve_forever, name="http", daemon=True
|
||
)
|
||
http_thread.start()
|
||
logging.info("HTTP MJPEG server: http://0.0.0.0:%d/stream", args.http_port)
|
||
logging.info("HTTP snapshot: http://0.0.0.0:%d/snapshot", args.http_port)
|
||
logging.info("HTTP health: http://0.0.0.0:%d/health", args.http_port)
|
||
|
||
# ── Stats logger ──────────────────────────────────────────────────────────
|
||
stats_thread = threading.Thread(
|
||
target=_stats_logger, args=(buf, stop_evt), name="stats", daemon=True
|
||
)
|
||
stats_thread.start()
|
||
|
||
# ── WebSocket server (runs the event loop) ────────────────────────────────
|
||
logging.info("WebSocket server: ws://0.0.0.0:%d", args.ws_port)
|
||
|
||
async def _run_ws():
|
||
async with websockets.serve(
|
||
lambda ws: ws_handler(ws, buf, args.width, args.height, args.fps),
|
||
"0.0.0.0",
|
||
args.ws_port,
|
||
max_size=None, # no message size limit
|
||
ping_interval=5,
|
||
ping_timeout=10,
|
||
):
|
||
logging.info("Ready — streaming %dx%d @ %.0f fps",
|
||
args.width, args.height, args.fps)
|
||
await asyncio.Future() # run forever
|
||
|
||
try:
|
||
asyncio.run(_run_ws())
|
||
except KeyboardInterrupt:
|
||
logging.info("Shutting down...")
|
||
finally:
|
||
stop_evt.set()
|
||
capture.stop()
|
||
backend.close()
|
||
if not args.no_http:
|
||
http_server.shutdown()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|