Merge pull request 'feat: Phone video bridge (Issue #585)' (#592) from sl-android/issue-585-video-bridge into main

This commit is contained in:
sl-jetson 2026-03-14 13:32:24 -04:00
commit 1da1d50171
4 changed files with 820 additions and 1 deletions

View File

@ -6,6 +6,8 @@ from launch.substitutions import LaunchConfiguration
def generate_launch_description():
camera_device_arg = DeclareLaunchArgument('camera_device', default_value='/dev/video0', description='Camera device path')
phone_host_arg = DeclareLaunchArgument('phone_host', default_value='192.168.1.200', description='Phone IP for video bridge')
phone_cam_enabled_arg = DeclareLaunchArgument('phone_cam_enabled', default_value='true', description='Enable phone camera node')
gps_enabled_arg = DeclareLaunchArgument('gps_enabled', default_value='true', description='Enable GPS node')
imu_enabled_arg = DeclareLaunchArgument('imu_enabled', default_value='true', description='Enable IMU node')
chat_enabled_arg = DeclareLaunchArgument('chat_enabled', default_value='true', description='Enable OpenClaw chat node')
@ -51,4 +53,25 @@ def generate_launch_description():
output='screen',
)
return LaunchDescription([camera_device_arg, gps_enabled_arg, imu_enabled_arg, chat_enabled_arg, bridge_url_arg, camera_node, gps_node, imu_node, chat_node, bridge_node])
phone_camera_node = Node(
package='saltybot_phone',
executable='phone_camera_node',
name='phone_camera_node',
parameters=[{
'phone_host': LaunchConfiguration('phone_host'),
'ws_port': 8765,
'http_port': 8766,
'frame_id': 'phone_camera',
'publish_raw': True,
'publish_compressed': True,
'reconnect_s': 3.0,
'latency_warn_ms': 200.0,
}],
output='screen',
)
return LaunchDescription([
camera_device_arg, gps_enabled_arg, imu_enabled_arg, chat_enabled_arg,
bridge_url_arg, phone_host_arg, phone_cam_enabled_arg,
camera_node, gps_node, imu_node, chat_node, bridge_node, phone_camera_node,
])

View File

@ -0,0 +1,318 @@
#!/usr/bin/env python3
"""
phone_camera_node.py Jetson ROS2 receiver for phone video stream (Issue #585)
Connects to the phone's video_bridge.py WebSocket server, receives JPEG frames,
decodes them, and publishes:
/saltybot/phone/camera sensor_msgs/Image (bgr8)
/saltybot/phone/camera/compressed sensor_msgs/CompressedImage (jpeg)
Falls back to HTTP MJPEG polling if the WebSocket connection fails.
Parameters
phone_host str Phone IP or hostname (default: 192.168.1.200)
ws_port int video_bridge WS port (default: 8765)
http_port int video_bridge HTTP port (default: 8766)
frame_id str TF frame id (default: phone_camera)
publish_raw bool Publish sensor_msgs/Image (default: true)
publish_compressed bool Publish CompressedImage (default: true)
reconnect_s float Seconds between reconnect (default: 3.0)
latency_warn_ms float Warn if frame age > this (default: 200.0)
Publishes
/saltybot/phone/camera sensor_msgs/Image
/saltybot/phone/camera/compressed sensor_msgs/CompressedImage
/saltybot/phone/camera/info std_msgs/String (JSON stream metadata)
"""
import asyncio
import json
import threading
import time
from typing import Optional
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image, CompressedImage
from std_msgs.msg import String, Header
try:
import cv2
import numpy as np
CV2_AVAILABLE = True
except ImportError:
CV2_AVAILABLE = False
try:
import websockets
WS_AVAILABLE = True
except ImportError:
WS_AVAILABLE = False
try:
import urllib.request
HTTP_AVAILABLE = True
except ImportError:
HTTP_AVAILABLE = False
MJPEG_BOUNDARY = b"--mjpeg-boundary"
class PhoneCameraNode(Node):
"""
Receives JPEG frames from phone/video_bridge.py and publishes ROS2 Image topics.
Connection strategy:
1. Try WebSocket (ws://<phone_host>:<ws_port>) lowest latency
2. On WS failure, fall back to HTTP MJPEG stream guaranteed to work
3. Reconnect automatically on disconnect
"""
def __init__(self):
super().__init__("phone_camera_node")
# Parameters
self.declare_parameter("phone_host", "192.168.1.200")
self.declare_parameter("ws_port", 8765)
self.declare_parameter("http_port", 8766)
self.declare_parameter("frame_id", "phone_camera")
self.declare_parameter("publish_raw", True)
self.declare_parameter("publish_compressed", True)
self.declare_parameter("reconnect_s", 3.0)
self.declare_parameter("latency_warn_ms", 200.0)
self._host = self.get_parameter("phone_host").value
self._ws_port = self.get_parameter("ws_port").value
self._http_port = self.get_parameter("http_port").value
self._frame_id = self.get_parameter("frame_id").value
self._pub_raw = self.get_parameter("publish_raw").value
self._pub_comp = self.get_parameter("publish_compressed").value
self._reconnect_s = self.get_parameter("reconnect_s").value
self._latency_warn = self.get_parameter("latency_warn_ms").value
if not CV2_AVAILABLE:
self.get_logger().error(
"opencv-python not installed — cannot decode JPEG frames."
)
# Publishers
self._raw_pub = self.create_publisher(Image, "/saltybot/phone/camera", 10)
self._comp_pub = self.create_publisher(CompressedImage, "/saltybot/phone/camera/compressed", 10)
self._info_pub = self.create_publisher(String, "/saltybot/phone/camera/info", 10)
# Stats
self._frames_received = 0
self._frames_published = 0
self._last_frame_ts = 0.0
self._stream_info: dict = {}
# Diagnostics timer (every 10 s)
self.create_timer(10.0, self._diag_cb)
# Start receiver thread
self._stop_event = threading.Event()
self._recv_thread = threading.Thread(
target=self._recv_loop, name="phone_cam_recv", daemon=True
)
self._recv_thread.start()
self.get_logger().info(
"PhoneCameraNode ready — connecting to %s (ws:%d / http:%d)",
self._host, self._ws_port, self._http_port,
)
# ── Publish ───────────────────────────────────────────────────────────────
def _publish_jpeg(self, jpeg_bytes: bytes, recv_ts: float) -> None:
"""Decode JPEG and publish Image and/or CompressedImage."""
self._frames_received += 1
age_ms = (time.time() - recv_ts) * 1000.0
if age_ms > self._latency_warn:
self.get_logger().warning(
"Phone camera frame age %.0f ms (target < %.0f ms)",
age_ms, self._latency_warn,
)
now_msg = self.get_clock().now().to_msg()
# CompressedImage — no decode needed
if self._pub_comp:
comp = CompressedImage()
comp.header = Header()
comp.header.stamp = now_msg
comp.header.frame_id = self._frame_id
comp.format = "jpeg"
comp.data = list(jpeg_bytes)
self._comp_pub.publish(comp)
# Image — decode via OpenCV
if self._pub_raw and CV2_AVAILABLE:
try:
buf = np.frombuffer(jpeg_bytes, dtype=np.uint8)
frame = cv2.imdecode(buf, cv2.IMREAD_COLOR)
if frame is None:
return
h, w = frame.shape[:2]
img = Image()
img.header = Header()
img.header.stamp = now_msg
img.header.frame_id = self._frame_id
img.height = h
img.width = w
img.encoding = "bgr8"
img.is_bigendian = 0
img.step = w * 3
img.data = frame.tobytes()
self._raw_pub.publish(img)
except Exception as e:
self.get_logger().debug("JPEG decode error: %s", str(e))
return
self._frames_published += 1
self._last_frame_ts = time.time()
# ── WebSocket receiver ────────────────────────────────────────────────────
async def _ws_recv(self) -> bool:
"""
Connect via WebSocket and receive frames until disconnect.
Returns True if we received at least one frame (connection was working).
"""
uri = f"ws://{self._host}:{self._ws_port}"
try:
async with websockets.connect(
uri,
ping_interval=5,
ping_timeout=10,
max_size=None,
open_timeout=5,
) as ws:
self.get_logger().info("WebSocket connected to %s", uri)
got_frame = False
async for message in ws:
if self._stop_event.is_set():
return got_frame
recv_ts = time.time()
if isinstance(message, bytes):
# Binary → JPEG frame
self._publish_jpeg(message, recv_ts)
got_frame = True
elif isinstance(message, str):
# Text → control JSON
try:
obj = json.loads(message)
if obj.get("type") == "info":
self._stream_info = obj
info_msg = String()
info_msg.data = message
self._info_pub.publish(info_msg)
self.get_logger().info(
"Stream info: %dx%d @ %.0f fps",
obj.get("width", 0),
obj.get("height", 0),
obj.get("fps", 0),
)
except json.JSONDecodeError:
pass
return got_frame
except (websockets.exceptions.WebSocketException, OSError,
asyncio.TimeoutError) as e:
self.get_logger().debug("WS error: %s", str(e))
return False
# ── HTTP MJPEG fallback ───────────────────────────────────────────────────
def _http_mjpeg_recv(self) -> None:
"""Receive MJPEG stream over HTTP and publish frames."""
url = f"http://{self._host}:{self._http_port}/stream"
self.get_logger().info("Falling back to HTTP MJPEG: %s", url)
buf = b""
try:
req = urllib.request.urlopen(url, timeout=10)
while not self._stop_event.is_set():
chunk = req.read(4096)
if not chunk:
break
buf += chunk
# Extract JPEG frames delimited by MJPEG boundary
while True:
start = buf.find(b"\xff\xd8") # JPEG SOI
end = buf.find(b"\xff\xd9") # JPEG EOI
if start == -1 or end == -1 or end < start:
# Keep last partial frame in buffer
if start > 0:
buf = buf[start:]
break
jpeg = buf[start:end + 2]
buf = buf[end + 2:]
self._publish_jpeg(jpeg, time.time())
except Exception as e:
self.get_logger().debug("HTTP MJPEG error: %s", str(e))
# ── Receiver loop ─────────────────────────────────────────────────────────
def _recv_loop(self) -> None:
"""
Main receiver loop tries WS first, falls back to HTTP, then retries.
Runs in a daemon thread.
"""
while not self._stop_event.is_set():
# Try WebSocket
if WS_AVAILABLE:
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
got = loop.run_until_complete(self._ws_recv())
loop.close()
if got and not self._stop_event.is_set():
# WS was working but disconnected — retry immediately
continue
except Exception as e:
self.get_logger().debug("WS loop error: %s", str(e))
if self._stop_event.is_set():
break
# WS failed — try HTTP MJPEG fallback
if HTTP_AVAILABLE:
self._http_mjpeg_recv()
if not self._stop_event.is_set():
self.get_logger().info(
"Phone camera stream lost — retrying in %.0f s", self._reconnect_s
)
self._stop_event.wait(self._reconnect_s)
# ── Diagnostics ───────────────────────────────────────────────────────────
def _diag_cb(self) -> None:
age = time.time() - self._last_frame_ts if self._last_frame_ts > 0 else -1.0
self.get_logger().info(
"Phone camera — received=%d published=%d last_frame_age=%.1fs",
self._frames_received, self._frames_published, age,
)
# ── Lifecycle ─────────────────────────────────────────────────────────────
def destroy_node(self) -> None:
self._stop_event.set()
super().destroy_node()
def main(args=None) -> None:
rclpy.init(args=args)
node = PhoneCameraNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.try_shutdown()
if __name__ == "__main__":
main()

View File

@ -28,6 +28,7 @@ setup(
'imu_node = saltybot_phone.imu_node:main',
'openclaw_chat = saltybot_phone.openclaw_chat_node:main',
'phone_bridge = saltybot_phone.ws_bridge:main',
'phone_camera_node = saltybot_phone.phone_camera_node:main',
],
},
)

477
phone/video_bridge.py Normal file
View File

@ -0,0 +1,477 @@
#!/usr/bin/env python3
"""
video_bridge.py Phone camera MJPEG streaming server for SaltyBot (Issue #585)
Captures frames from the Android camera and serves them over:
1. WebSocket (binary JPEG frames) on ws://0.0.0.0:<ws-port>
2. HTTP MJPEG stream (fallback) on http://0.0.0.0:<http-port>/stream
3. HTTP JPEG snapshot on http://0.0.0.0:<http-port>/snapshot
Target: 640×480 @ 15 fps, < 200 ms phoneJetson latency.
Camera backends (tried in order)
1. OpenCV VideoCapture (/dev/video0 or --device) fastest, needs V4L2
2. termux-camera-photo capture loop universal on Termux
Uses the front or rear camera via termux-api.
WebSocket frame format
Binary message: raw JPEG bytes.
Text message : JSON control frame:
{"type":"info","width":640,"height":480,"fps":15,"ts":1234567890.0}
{"type":"error","msg":"..."}
Usage
python3 phone/video_bridge.py [OPTIONS]
--ws-port PORT WebSocket port (default: 8765)
--http-port PORT HTTP MJPEG port (default: 8766)
--width INT Frame width px (default: 640)
--height INT Frame height px (default: 480)
--fps FLOAT Target capture FPS (default: 15.0)
--quality INT JPEG quality 1-100 (default: 75)
--device PATH V4L2 device or camera id (default: /dev/video0)
--camera-id INT termux-camera-photo id (default: 0 = rear)
--no-http Disable HTTP server
--debug Verbose logging
Dependencies (Termux)
pkg install termux-api python opencv-python
pip install websockets
"""
import argparse
import asyncio
import io
import json
import logging
import os
import subprocess
import sys
import threading
import time
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from typing import Optional
try:
import cv2
CV2_AVAILABLE = True
except ImportError:
CV2_AVAILABLE = False
try:
import websockets
WS_AVAILABLE = True
except ImportError:
WS_AVAILABLE = False
# ── Frame buffer (shared between capture, WS, HTTP) ──────────────────────────
class FrameBuffer:
"""Thread-safe single-slot frame buffer — always holds the latest JPEG."""
def __init__(self):
self._lock = threading.Lock()
self._frame: Optional[bytes] = None
self._event = threading.Event()
self.count = 0
self.dropped = 0
def put(self, jpeg_bytes: bytes) -> None:
with self._lock:
if self._frame is not None:
self.dropped += 1
self._frame = jpeg_bytes
self.count += 1
self._event.set()
def get(self, timeout: float = 1.0) -> Optional[bytes]:
"""Block until a new frame is available or timeout."""
if self._event.wait(timeout):
self._event.clear()
with self._lock:
return self._frame
return None
def latest(self) -> Optional[bytes]:
"""Return latest frame without blocking (may return None)."""
with self._lock:
return self._frame
# ── Camera backends ───────────────────────────────────────────────────────────
class OpenCVCapture:
"""Capture frames via OpenCV VideoCapture (V4L2 on Android/Linux)."""
def __init__(self, device: str, width: int, height: int, fps: float, quality: int):
self._device = device
self._width = width
self._height = height
self._fps = fps
self._quality = quality
self._cap = None
self._encode_params = [cv2.IMWRITE_JPEG_QUALITY, quality]
def open(self) -> bool:
try:
idx = int(self._device) if self._device.isdigit() else self._device
cap = cv2.VideoCapture(idx)
if not cap.isOpened():
return False
cap.set(cv2.CAP_PROP_FRAME_WIDTH, self._width)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self._height)
cap.set(cv2.CAP_PROP_FPS, self._fps)
self._cap = cap
return True
except Exception as e:
logging.debug("OpenCV open failed: %s", e)
return False
def read_jpeg(self) -> Optional[bytes]:
if self._cap is None:
return None
ret, frame = self._cap.read()
if not ret or frame is None:
return None
ok, buf = cv2.imencode(".jpg", frame, self._encode_params)
if not ok:
return None
return bytes(buf)
def close(self) -> None:
if self._cap is not None:
self._cap.release()
self._cap = None
class TermuxCapture:
"""Capture frames via termux-camera-photo (works on unmodified Android)."""
def __init__(self, camera_id: int, width: int, height: int, quality: int):
self._camera_id = camera_id
self._quality = quality
self._tmpfile = f"/data/data/com.termux/files/home/.cache/vcam_{os.getpid()}.jpg"
# Resize params for cv2 (termux-camera-photo outputs native resolution)
self._target_w = width
self._target_h = height
os.makedirs(os.path.dirname(self._tmpfile), exist_ok=True)
def open(self) -> bool:
# Test one capture
return self._capture_raw() is not None
def read_jpeg(self) -> Optional[bytes]:
raw = self._capture_raw()
if raw is None:
return None
if CV2_AVAILABLE:
return self._resize_jpeg(raw)
return raw
def _capture_raw(self) -> Optional[bytes]:
try:
r = subprocess.run(
["termux-camera-photo", "-c", str(self._camera_id), self._tmpfile],
capture_output=True, timeout=3.0,
)
if r.returncode != 0 or not os.path.exists(self._tmpfile):
return None
with open(self._tmpfile, "rb") as f:
data = f.read()
os.unlink(self._tmpfile)
return data
except Exception as e:
logging.debug("termux-camera-photo error: %s", e)
return None
def _resize_jpeg(self, raw: bytes) -> Optional[bytes]:
try:
import numpy as np
buf = np.frombuffer(raw, dtype=np.uint8)
img = cv2.imdecode(buf, cv2.IMREAD_COLOR)
if img is None:
return raw
resized = cv2.resize(img, (self._target_w, self._target_h))
ok, enc = cv2.imencode(".jpg", resized,
[cv2.IMWRITE_JPEG_QUALITY, self._quality])
return bytes(enc) if ok else raw
except Exception:
return raw
def close(self) -> None:
if os.path.exists(self._tmpfile):
try:
os.unlink(self._tmpfile)
except OSError:
pass
# ── Capture thread ────────────────────────────────────────────────────────────
class CaptureThread(threading.Thread):
"""Drives a camera backend at the target FPS, pushing JPEG into FrameBuffer."""
def __init__(self, backend, fps: float, buf: FrameBuffer):
super().__init__(name="capture", daemon=True)
self._backend = backend
self._interval = 1.0 / fps
self._buf = buf
self._running = False
def run(self) -> None:
self._running = True
logging.info("Capture thread started (%.1f Hz)", 1.0 / self._interval)
while self._running:
t0 = time.monotonic()
try:
jpeg = self._backend.read_jpeg()
if jpeg:
self._buf.put(jpeg)
except Exception as e:
logging.warning("Capture error: %s", e)
elapsed = time.monotonic() - t0
time.sleep(max(0.0, self._interval - elapsed))
def stop(self) -> None:
self._running = False
# ── HTTP MJPEG server ─────────────────────────────────────────────────────────
MJPEG_BOUNDARY = b"--mjpeg-boundary"
def _make_http_handler(buf: FrameBuffer, width: int, height: int, fps: float):
class Handler(BaseHTTPRequestHandler):
def log_message(self, fmt, *args):
logging.debug("HTTP %s", fmt % args)
def do_GET(self):
if self.path == "/stream":
self._stream()
elif self.path == "/snapshot":
self._snapshot()
elif self.path == "/health":
self._health()
else:
self.send_error(404)
def _stream(self):
self.send_response(200)
self.send_header("Content-Type",
f"multipart/x-mixed-replace; boundary={MJPEG_BOUNDARY.decode()}")
self.send_header("Cache-Control", "no-cache")
self.send_header("Connection", "close")
self.end_headers()
try:
while True:
jpeg = buf.get(timeout=2.0)
if jpeg is None:
continue
ts = time.time()
header = (
f"\r\n{MJPEG_BOUNDARY.decode()}\r\n"
f"Content-Type: image/jpeg\r\n"
f"Content-Length: {len(jpeg)}\r\n"
f"X-Timestamp: {ts:.3f}\r\n\r\n"
).encode()
self.wfile.write(header + jpeg)
self.wfile.flush()
except (BrokenPipeError, ConnectionResetError):
pass
def _snapshot(self):
jpeg = buf.latest()
if jpeg is None:
self.send_error(503, "No frame available")
return
self.send_response(200)
self.send_header("Content-Type", "image/jpeg")
self.send_header("Content-Length", str(len(jpeg)))
self.send_header("X-Timestamp", str(time.time()))
self.end_headers()
self.wfile.write(jpeg)
def _health(self):
body = json.dumps({
"status": "ok",
"frames": buf.count,
"dropped": buf.dropped,
"width": width,
"height": height,
"fps": fps,
}).encode()
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
return Handler
# ── WebSocket server ──────────────────────────────────────────────────────────
async def ws_handler(websocket, buf: FrameBuffer, width: int, height: int, fps: float):
"""Handle one WebSocket client connection — streams JPEG frames as binary messages."""
remote = websocket.remote_address
logging.info("WS client connected: %s", remote)
# Send info frame
info = json.dumps({
"type": "info",
"width": width,
"height": height,
"fps": fps,
"ts": time.time(),
})
try:
await websocket.send(info)
except Exception:
return
loop = asyncio.get_event_loop()
try:
while True:
# Offload blocking buf.get() to thread pool to keep event loop free
jpeg = await loop.run_in_executor(None, lambda: buf.get(timeout=1.0))
if jpeg is None:
continue
await websocket.send(jpeg)
except websockets.exceptions.ConnectionClosedOK:
logging.info("WS client disconnected (clean): %s", remote)
except websockets.exceptions.ConnectionClosedError as e:
logging.info("WS client disconnected (error): %s%s", remote, e)
except Exception as e:
logging.warning("WS handler error: %s", e)
# ── Statistics logger ─────────────────────────────────────────────────────────
def _stats_logger(buf: FrameBuffer, stop_evt: threading.Event) -> None:
prev = 0
while not stop_evt.wait(10.0):
delta = buf.count - prev
prev = buf.count
logging.info("Capture stats — frames=%d (+%d/10s) dropped=%d",
buf.count, delta, buf.dropped)
# ── Entrypoint ────────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(
description="SaltyBot phone video bridge (Issue #585)"
)
parser.add_argument("--ws-port", type=int, default=8765,
help="WebSocket server port (default: 8765)")
parser.add_argument("--http-port", type=int, default=8766,
help="HTTP MJPEG port (default: 8766)")
parser.add_argument("--width", type=int, default=640,
help="Frame width (default: 640)")
parser.add_argument("--height", type=int, default=480,
help="Frame height (default: 480)")
parser.add_argument("--fps", type=float, default=15.0,
help="Target FPS (default: 15)")
parser.add_argument("--quality", type=int, default=75,
help="JPEG quality 1-100 (default: 75)")
parser.add_argument("--device", default="/dev/video0",
help="V4L2 device or index (default: /dev/video0)")
parser.add_argument("--camera-id", type=int, default=0,
help="termux-camera-photo camera id (default: 0 = rear)")
parser.add_argument("--no-http", action="store_true",
help="Disable HTTP server")
parser.add_argument("--debug", action="store_true",
help="Verbose logging")
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.debug else logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
if not WS_AVAILABLE:
logging.error("websockets not installed. Run: pip install websockets")
sys.exit(1)
# ── Select and open camera backend ───────────────────────────────────────
backend = None
if CV2_AVAILABLE:
cv_backend = OpenCVCapture(args.device, args.width, args.height,
args.fps, args.quality)
if cv_backend.open():
logging.info("Using OpenCV backend (%s)", args.device)
backend = cv_backend
else:
logging.info("OpenCV backend unavailable, falling back to termux-camera-photo")
if backend is None:
tx_backend = TermuxCapture(args.camera_id, args.width, args.height, args.quality)
if tx_backend.open():
logging.info("Using termux-camera-photo backend (camera %d)", args.camera_id)
backend = tx_backend
else:
logging.error("No camera backend available. "
"Install opencv-python or termux-api.")
sys.exit(1)
# ── Frame buffer ─────────────────────────────────────────────────────────
buf = FrameBuffer()
# ── Capture thread ────────────────────────────────────────────────────────
capture = CaptureThread(backend, args.fps, buf)
capture.start()
# ── HTTP server thread ────────────────────────────────────────────────────
stop_evt = threading.Event()
if not args.no_http:
handler_cls = _make_http_handler(buf, args.width, args.height, args.fps)
http_server = ThreadingHTTPServer(("0.0.0.0", args.http_port), handler_cls)
http_thread = threading.Thread(
target=http_server.serve_forever, name="http", daemon=True
)
http_thread.start()
logging.info("HTTP MJPEG server: http://0.0.0.0:%d/stream", args.http_port)
logging.info("HTTP snapshot: http://0.0.0.0:%d/snapshot", args.http_port)
logging.info("HTTP health: http://0.0.0.0:%d/health", args.http_port)
# ── Stats logger ──────────────────────────────────────────────────────────
stats_thread = threading.Thread(
target=_stats_logger, args=(buf, stop_evt), name="stats", daemon=True
)
stats_thread.start()
# ── WebSocket server (runs the event loop) ────────────────────────────────
logging.info("WebSocket server: ws://0.0.0.0:%d", args.ws_port)
async def _run_ws():
async with websockets.serve(
lambda ws: ws_handler(ws, buf, args.width, args.height, args.fps),
"0.0.0.0",
args.ws_port,
max_size=None, # no message size limit
ping_interval=5,
ping_timeout=10,
):
logging.info("Ready — streaming %dx%d @ %.0f fps",
args.width, args.height, args.fps)
await asyncio.Future() # run forever
try:
asyncio.run(_run_ws())
except KeyboardInterrupt:
logging.info("Shutting down...")
finally:
stop_evt.set()
capture.stop()
backend.close()
if not args.no_http:
http_server.shutdown()
if __name__ == "__main__":
main()