From 08bc23f6dfaf9f2ffc0b1afe483fdf12f33b0104 Mon Sep 17 00:00:00 2001 From: sl-android Date: Sat, 14 Mar 2026 12:19:18 -0400 Subject: [PATCH] feat: Phone video streaming bridge (Issue #585) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phone side — phone/video_bridge.py: - MJPEG streaming server for Android/Termux phone camera - Dual camera backends: OpenCV VideoCapture (V4L2) with automatic fallback to termux-camera-photo for unmodified Android - WebSocket server (ws://0.0.0.0:8765) — binary JPEG frames + JSON info/error control messages; supports multiple concurrent clients - HTTP server (http://0.0.0.0:8766): /stream — multipart/x-mixed-replace MJPEG /snapshot — single JPEG /health — JSON stats (frame count, dropped, resolution, fps) - Thread-safe single-slot FrameBuffer; CaptureThread rate-limited with wall-clock accounting for capture latency - Flags: --ws-port, --http-port, --width, --height, --fps, --quality, --device, --camera-id, --no-http, --debug Jetson side — saltybot_phone/phone_camera_node.py: - ROS2 node: receives JPEG frames, publishes: /saltybot/phone/camera sensor_msgs/Image (bgr8) /saltybot/phone/camera/compressed sensor_msgs/CompressedImage /saltybot/phone/camera/info std_msgs/String (stream metadata) - WebSocket client (primary); HTTP MJPEG polling fallback on WS failure - Auto-reconnect loop (default 3 s) for both transports - Latency warning when frame age > latency_warn_ms (default 200 ms) - 10 s diagnostics log: received/published counts + last frame age - Registered as phone_camera_node console script in setup.py - Added to phone_bringup.py launch with phone_host / phone_cam_enabled args Co-Authored-By: Claude Sonnet 4.6 --- .../saltybot_phone/launch/phone_bringup.py | 25 +- .../saltybot_phone/phone_camera_node.py | 318 ++++++++++++ jetson/ros2_ws/src/saltybot_phone/setup.py | 1 + phone/video_bridge.py | 477 ++++++++++++++++++ 4 files changed, 820 insertions(+), 1 deletion(-) create mode 100644 jetson/ros2_ws/src/saltybot_phone/saltybot_phone/phone_camera_node.py create mode 100644 phone/video_bridge.py diff --git a/jetson/ros2_ws/src/saltybot_phone/launch/phone_bringup.py b/jetson/ros2_ws/src/saltybot_phone/launch/phone_bringup.py index c43fb0d..ac5c6a6 100755 --- a/jetson/ros2_ws/src/saltybot_phone/launch/phone_bringup.py +++ b/jetson/ros2_ws/src/saltybot_phone/launch/phone_bringup.py @@ -6,6 +6,8 @@ from launch.substitutions import LaunchConfiguration def generate_launch_description(): camera_device_arg = DeclareLaunchArgument('camera_device', default_value='/dev/video0', description='Camera device path') + phone_host_arg = DeclareLaunchArgument('phone_host', default_value='192.168.1.200', description='Phone IP for video bridge') + phone_cam_enabled_arg = DeclareLaunchArgument('phone_cam_enabled', default_value='true', description='Enable phone camera node') gps_enabled_arg = DeclareLaunchArgument('gps_enabled', default_value='true', description='Enable GPS node') imu_enabled_arg = DeclareLaunchArgument('imu_enabled', default_value='true', description='Enable IMU node') chat_enabled_arg = DeclareLaunchArgument('chat_enabled', default_value='true', description='Enable OpenClaw chat node') @@ -51,4 +53,25 @@ def generate_launch_description(): output='screen', ) - return LaunchDescription([camera_device_arg, gps_enabled_arg, imu_enabled_arg, chat_enabled_arg, bridge_url_arg, camera_node, gps_node, imu_node, chat_node, bridge_node]) + phone_camera_node = Node( + package='saltybot_phone', + executable='phone_camera_node', + name='phone_camera_node', + parameters=[{ + 'phone_host': LaunchConfiguration('phone_host'), + 'ws_port': 8765, + 'http_port': 8766, + 'frame_id': 'phone_camera', + 'publish_raw': True, + 'publish_compressed': True, + 'reconnect_s': 3.0, + 'latency_warn_ms': 200.0, + }], + output='screen', + ) + + return LaunchDescription([ + camera_device_arg, gps_enabled_arg, imu_enabled_arg, chat_enabled_arg, + bridge_url_arg, phone_host_arg, phone_cam_enabled_arg, + camera_node, gps_node, imu_node, chat_node, bridge_node, phone_camera_node, + ]) diff --git a/jetson/ros2_ws/src/saltybot_phone/saltybot_phone/phone_camera_node.py b/jetson/ros2_ws/src/saltybot_phone/saltybot_phone/phone_camera_node.py new file mode 100644 index 0000000..c05b858 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_phone/saltybot_phone/phone_camera_node.py @@ -0,0 +1,318 @@ +#!/usr/bin/env python3 +""" +phone_camera_node.py — Jetson ROS2 receiver for phone video stream (Issue #585) + +Connects to the phone's video_bridge.py WebSocket server, receives JPEG frames, +decodes them, and publishes: + /saltybot/phone/camera sensor_msgs/Image (bgr8) + /saltybot/phone/camera/compressed sensor_msgs/CompressedImage (jpeg) + +Falls back to HTTP MJPEG polling if the WebSocket connection fails. + +Parameters +────────── + phone_host str Phone IP or hostname (default: 192.168.1.200) + ws_port int video_bridge WS port (default: 8765) + http_port int video_bridge HTTP port (default: 8766) + frame_id str TF frame id (default: phone_camera) + publish_raw bool Publish sensor_msgs/Image (default: true) + publish_compressed bool Publish CompressedImage (default: true) + reconnect_s float Seconds between reconnect (default: 3.0) + latency_warn_ms float Warn if frame age > this (default: 200.0) + +Publishes +───────── + /saltybot/phone/camera sensor_msgs/Image + /saltybot/phone/camera/compressed sensor_msgs/CompressedImage + /saltybot/phone/camera/info std_msgs/String (JSON stream metadata) +""" + +import asyncio +import json +import threading +import time +from typing import Optional + +import rclpy +from rclpy.node import Node +from sensor_msgs.msg import Image, CompressedImage +from std_msgs.msg import String, Header + +try: + import cv2 + import numpy as np + CV2_AVAILABLE = True +except ImportError: + CV2_AVAILABLE = False + +try: + import websockets + WS_AVAILABLE = True +except ImportError: + WS_AVAILABLE = False + +try: + import urllib.request + HTTP_AVAILABLE = True +except ImportError: + HTTP_AVAILABLE = False + + +MJPEG_BOUNDARY = b"--mjpeg-boundary" + + +class PhoneCameraNode(Node): + """ + Receives JPEG frames from phone/video_bridge.py and publishes ROS2 Image topics. + + Connection strategy: + 1. Try WebSocket (ws://:) — lowest latency + 2. On WS failure, fall back to HTTP MJPEG stream — guaranteed to work + 3. Reconnect automatically on disconnect + """ + + def __init__(self): + super().__init__("phone_camera_node") + + # Parameters + self.declare_parameter("phone_host", "192.168.1.200") + self.declare_parameter("ws_port", 8765) + self.declare_parameter("http_port", 8766) + self.declare_parameter("frame_id", "phone_camera") + self.declare_parameter("publish_raw", True) + self.declare_parameter("publish_compressed", True) + self.declare_parameter("reconnect_s", 3.0) + self.declare_parameter("latency_warn_ms", 200.0) + + self._host = self.get_parameter("phone_host").value + self._ws_port = self.get_parameter("ws_port").value + self._http_port = self.get_parameter("http_port").value + self._frame_id = self.get_parameter("frame_id").value + self._pub_raw = self.get_parameter("publish_raw").value + self._pub_comp = self.get_parameter("publish_compressed").value + self._reconnect_s = self.get_parameter("reconnect_s").value + self._latency_warn = self.get_parameter("latency_warn_ms").value + + if not CV2_AVAILABLE: + self.get_logger().error( + "opencv-python not installed — cannot decode JPEG frames." + ) + + # Publishers + self._raw_pub = self.create_publisher(Image, "/saltybot/phone/camera", 10) + self._comp_pub = self.create_publisher(CompressedImage, "/saltybot/phone/camera/compressed", 10) + self._info_pub = self.create_publisher(String, "/saltybot/phone/camera/info", 10) + + # Stats + self._frames_received = 0 + self._frames_published = 0 + self._last_frame_ts = 0.0 + self._stream_info: dict = {} + + # Diagnostics timer (every 10 s) + self.create_timer(10.0, self._diag_cb) + + # Start receiver thread + self._stop_event = threading.Event() + self._recv_thread = threading.Thread( + target=self._recv_loop, name="phone_cam_recv", daemon=True + ) + self._recv_thread.start() + + self.get_logger().info( + "PhoneCameraNode ready — connecting to %s (ws:%d / http:%d)", + self._host, self._ws_port, self._http_port, + ) + + # ── Publish ─────────────────────────────────────────────────────────────── + + def _publish_jpeg(self, jpeg_bytes: bytes, recv_ts: float) -> None: + """Decode JPEG and publish Image and/or CompressedImage.""" + self._frames_received += 1 + age_ms = (time.time() - recv_ts) * 1000.0 + if age_ms > self._latency_warn: + self.get_logger().warning( + "Phone camera frame age %.0f ms (target < %.0f ms)", + age_ms, self._latency_warn, + ) + + now_msg = self.get_clock().now().to_msg() + + # CompressedImage — no decode needed + if self._pub_comp: + comp = CompressedImage() + comp.header = Header() + comp.header.stamp = now_msg + comp.header.frame_id = self._frame_id + comp.format = "jpeg" + comp.data = list(jpeg_bytes) + self._comp_pub.publish(comp) + + # Image — decode via OpenCV + if self._pub_raw and CV2_AVAILABLE: + try: + buf = np.frombuffer(jpeg_bytes, dtype=np.uint8) + frame = cv2.imdecode(buf, cv2.IMREAD_COLOR) + if frame is None: + return + h, w = frame.shape[:2] + img = Image() + img.header = Header() + img.header.stamp = now_msg + img.header.frame_id = self._frame_id + img.height = h + img.width = w + img.encoding = "bgr8" + img.is_bigendian = 0 + img.step = w * 3 + img.data = frame.tobytes() + self._raw_pub.publish(img) + except Exception as e: + self.get_logger().debug("JPEG decode error: %s", str(e)) + return + + self._frames_published += 1 + self._last_frame_ts = time.time() + + # ── WebSocket receiver ──────────────────────────────────────────────────── + + async def _ws_recv(self) -> bool: + """ + Connect via WebSocket and receive frames until disconnect. + Returns True if we received at least one frame (connection was working). + """ + uri = f"ws://{self._host}:{self._ws_port}" + try: + async with websockets.connect( + uri, + ping_interval=5, + ping_timeout=10, + max_size=None, + open_timeout=5, + ) as ws: + self.get_logger().info("WebSocket connected to %s", uri) + got_frame = False + async for message in ws: + if self._stop_event.is_set(): + return got_frame + recv_ts = time.time() + if isinstance(message, bytes): + # Binary → JPEG frame + self._publish_jpeg(message, recv_ts) + got_frame = True + elif isinstance(message, str): + # Text → control JSON + try: + obj = json.loads(message) + if obj.get("type") == "info": + self._stream_info = obj + info_msg = String() + info_msg.data = message + self._info_pub.publish(info_msg) + self.get_logger().info( + "Stream info: %dx%d @ %.0f fps", + obj.get("width", 0), + obj.get("height", 0), + obj.get("fps", 0), + ) + except json.JSONDecodeError: + pass + return got_frame + except (websockets.exceptions.WebSocketException, OSError, + asyncio.TimeoutError) as e: + self.get_logger().debug("WS error: %s", str(e)) + return False + + # ── HTTP MJPEG fallback ─────────────────────────────────────────────────── + + def _http_mjpeg_recv(self) -> None: + """Receive MJPEG stream over HTTP and publish frames.""" + url = f"http://{self._host}:{self._http_port}/stream" + self.get_logger().info("Falling back to HTTP MJPEG: %s", url) + buf = b"" + try: + req = urllib.request.urlopen(url, timeout=10) + while not self._stop_event.is_set(): + chunk = req.read(4096) + if not chunk: + break + buf += chunk + # Extract JPEG frames delimited by MJPEG boundary + while True: + start = buf.find(b"\xff\xd8") # JPEG SOI + end = buf.find(b"\xff\xd9") # JPEG EOI + if start == -1 or end == -1 or end < start: + # Keep last partial frame in buffer + if start > 0: + buf = buf[start:] + break + jpeg = buf[start:end + 2] + buf = buf[end + 2:] + self._publish_jpeg(jpeg, time.time()) + except Exception as e: + self.get_logger().debug("HTTP MJPEG error: %s", str(e)) + + # ── Receiver loop ───────────────────────────────────────────────────────── + + def _recv_loop(self) -> None: + """ + Main receiver loop — tries WS first, falls back to HTTP, then retries. + Runs in a daemon thread. + """ + while not self._stop_event.is_set(): + # Try WebSocket + if WS_AVAILABLE: + try: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + got = loop.run_until_complete(self._ws_recv()) + loop.close() + if got and not self._stop_event.is_set(): + # WS was working but disconnected — retry immediately + continue + except Exception as e: + self.get_logger().debug("WS loop error: %s", str(e)) + + if self._stop_event.is_set(): + break + + # WS failed — try HTTP MJPEG fallback + if HTTP_AVAILABLE: + self._http_mjpeg_recv() + + if not self._stop_event.is_set(): + self.get_logger().info( + "Phone camera stream lost — retrying in %.0f s", self._reconnect_s + ) + self._stop_event.wait(self._reconnect_s) + + # ── Diagnostics ─────────────────────────────────────────────────────────── + + def _diag_cb(self) -> None: + age = time.time() - self._last_frame_ts if self._last_frame_ts > 0 else -1.0 + self.get_logger().info( + "Phone camera — received=%d published=%d last_frame_age=%.1fs", + self._frames_received, self._frames_published, age, + ) + + # ── Lifecycle ───────────────────────────────────────────────────────────── + + def destroy_node(self) -> None: + self._stop_event.set() + super().destroy_node() + + +def main(args=None) -> None: + rclpy.init(args=args) + node = PhoneCameraNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.try_shutdown() + + +if __name__ == "__main__": + main() diff --git a/jetson/ros2_ws/src/saltybot_phone/setup.py b/jetson/ros2_ws/src/saltybot_phone/setup.py index f87ba6a..dc3b12f 100644 --- a/jetson/ros2_ws/src/saltybot_phone/setup.py +++ b/jetson/ros2_ws/src/saltybot_phone/setup.py @@ -28,6 +28,7 @@ setup( 'imu_node = saltybot_phone.imu_node:main', 'openclaw_chat = saltybot_phone.openclaw_chat_node:main', 'phone_bridge = saltybot_phone.ws_bridge:main', + 'phone_camera_node = saltybot_phone.phone_camera_node:main', ], }, ) \ No newline at end of file diff --git a/phone/video_bridge.py b/phone/video_bridge.py new file mode 100644 index 0000000..380a42c --- /dev/null +++ b/phone/video_bridge.py @@ -0,0 +1,477 @@ +#!/usr/bin/env python3 +""" +video_bridge.py — Phone camera MJPEG streaming server for SaltyBot (Issue #585) + +Captures frames from the Android camera and serves them over: + 1. WebSocket (binary JPEG frames) on ws://0.0.0.0: + 2. HTTP MJPEG stream (fallback) on http://0.0.0.0:/stream + 3. HTTP JPEG snapshot on http://0.0.0.0:/snapshot + +Target: 640×480 @ 15 fps, < 200 ms phone→Jetson latency. + +Camera backends (tried in order) +───────────────────────────────── + 1. OpenCV VideoCapture (/dev/video0 or --device) — fastest, needs V4L2 + 2. termux-camera-photo capture loop — universal on Termux + Uses the front or rear camera via termux-api. + +WebSocket frame format +─────────────────────── + Binary message: raw JPEG bytes. + Text message : JSON control frame: + {"type":"info","width":640,"height":480,"fps":15,"ts":1234567890.0} + {"type":"error","msg":"..."} + +Usage +───── + python3 phone/video_bridge.py [OPTIONS] + + --ws-port PORT WebSocket port (default: 8765) + --http-port PORT HTTP MJPEG port (default: 8766) + --width INT Frame width px (default: 640) + --height INT Frame height px (default: 480) + --fps FLOAT Target capture FPS (default: 15.0) + --quality INT JPEG quality 1-100 (default: 75) + --device PATH V4L2 device or camera id (default: /dev/video0) + --camera-id INT termux-camera-photo id (default: 0 = rear) + --no-http Disable HTTP server + --debug Verbose logging + +Dependencies (Termux) +────────────────────── + pkg install termux-api python opencv-python + pip install websockets +""" + +import argparse +import asyncio +import io +import json +import logging +import os +import subprocess +import sys +import threading +import time +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from typing import Optional + +try: + import cv2 + CV2_AVAILABLE = True +except ImportError: + CV2_AVAILABLE = False + +try: + import websockets + WS_AVAILABLE = True +except ImportError: + WS_AVAILABLE = False + +# ── Frame buffer (shared between capture, WS, HTTP) ────────────────────────── + +class FrameBuffer: + """Thread-safe single-slot frame buffer — always holds the latest JPEG.""" + + def __init__(self): + self._lock = threading.Lock() + self._frame: Optional[bytes] = None + self._event = threading.Event() + self.count = 0 + self.dropped = 0 + + def put(self, jpeg_bytes: bytes) -> None: + with self._lock: + if self._frame is not None: + self.dropped += 1 + self._frame = jpeg_bytes + self.count += 1 + self._event.set() + + def get(self, timeout: float = 1.0) -> Optional[bytes]: + """Block until a new frame is available or timeout.""" + if self._event.wait(timeout): + self._event.clear() + with self._lock: + return self._frame + return None + + def latest(self) -> Optional[bytes]: + """Return latest frame without blocking (may return None).""" + with self._lock: + return self._frame + + +# ── Camera backends ─────────────────────────────────────────────────────────── + +class OpenCVCapture: + """Capture frames via OpenCV VideoCapture (V4L2 on Android/Linux).""" + + def __init__(self, device: str, width: int, height: int, fps: float, quality: int): + self._device = device + self._width = width + self._height = height + self._fps = fps + self._quality = quality + self._cap = None + self._encode_params = [cv2.IMWRITE_JPEG_QUALITY, quality] + + def open(self) -> bool: + try: + idx = int(self._device) if self._device.isdigit() else self._device + cap = cv2.VideoCapture(idx) + if not cap.isOpened(): + return False + cap.set(cv2.CAP_PROP_FRAME_WIDTH, self._width) + cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self._height) + cap.set(cv2.CAP_PROP_FPS, self._fps) + self._cap = cap + return True + except Exception as e: + logging.debug("OpenCV open failed: %s", e) + return False + + def read_jpeg(self) -> Optional[bytes]: + if self._cap is None: + return None + ret, frame = self._cap.read() + if not ret or frame is None: + return None + ok, buf = cv2.imencode(".jpg", frame, self._encode_params) + if not ok: + return None + return bytes(buf) + + def close(self) -> None: + if self._cap is not None: + self._cap.release() + self._cap = None + + +class TermuxCapture: + """Capture frames via termux-camera-photo (works on unmodified Android).""" + + def __init__(self, camera_id: int, width: int, height: int, quality: int): + self._camera_id = camera_id + self._quality = quality + self._tmpfile = f"/data/data/com.termux/files/home/.cache/vcam_{os.getpid()}.jpg" + # Resize params for cv2 (termux-camera-photo outputs native resolution) + self._target_w = width + self._target_h = height + os.makedirs(os.path.dirname(self._tmpfile), exist_ok=True) + + def open(self) -> bool: + # Test one capture + return self._capture_raw() is not None + + def read_jpeg(self) -> Optional[bytes]: + raw = self._capture_raw() + if raw is None: + return None + if CV2_AVAILABLE: + return self._resize_jpeg(raw) + return raw + + def _capture_raw(self) -> Optional[bytes]: + try: + r = subprocess.run( + ["termux-camera-photo", "-c", str(self._camera_id), self._tmpfile], + capture_output=True, timeout=3.0, + ) + if r.returncode != 0 or not os.path.exists(self._tmpfile): + return None + with open(self._tmpfile, "rb") as f: + data = f.read() + os.unlink(self._tmpfile) + return data + except Exception as e: + logging.debug("termux-camera-photo error: %s", e) + return None + + def _resize_jpeg(self, raw: bytes) -> Optional[bytes]: + try: + import numpy as np + buf = np.frombuffer(raw, dtype=np.uint8) + img = cv2.imdecode(buf, cv2.IMREAD_COLOR) + if img is None: + return raw + resized = cv2.resize(img, (self._target_w, self._target_h)) + ok, enc = cv2.imencode(".jpg", resized, + [cv2.IMWRITE_JPEG_QUALITY, self._quality]) + return bytes(enc) if ok else raw + except Exception: + return raw + + def close(self) -> None: + if os.path.exists(self._tmpfile): + try: + os.unlink(self._tmpfile) + except OSError: + pass + + +# ── Capture thread ──────────────────────────────────────────────────────────── + +class CaptureThread(threading.Thread): + """Drives a camera backend at the target FPS, pushing JPEG into FrameBuffer.""" + + def __init__(self, backend, fps: float, buf: FrameBuffer): + super().__init__(name="capture", daemon=True) + self._backend = backend + self._interval = 1.0 / fps + self._buf = buf + self._running = False + + def run(self) -> None: + self._running = True + logging.info("Capture thread started (%.1f Hz)", 1.0 / self._interval) + while self._running: + t0 = time.monotonic() + try: + jpeg = self._backend.read_jpeg() + if jpeg: + self._buf.put(jpeg) + except Exception as e: + logging.warning("Capture error: %s", e) + elapsed = time.monotonic() - t0 + time.sleep(max(0.0, self._interval - elapsed)) + + def stop(self) -> None: + self._running = False + + +# ── HTTP MJPEG server ───────────────────────────────────────────────────────── + +MJPEG_BOUNDARY = b"--mjpeg-boundary" + +def _make_http_handler(buf: FrameBuffer, width: int, height: int, fps: float): + + class Handler(BaseHTTPRequestHandler): + def log_message(self, fmt, *args): + logging.debug("HTTP %s", fmt % args) + + def do_GET(self): + if self.path == "/stream": + self._stream() + elif self.path == "/snapshot": + self._snapshot() + elif self.path == "/health": + self._health() + else: + self.send_error(404) + + def _stream(self): + self.send_response(200) + self.send_header("Content-Type", + f"multipart/x-mixed-replace; boundary={MJPEG_BOUNDARY.decode()}") + self.send_header("Cache-Control", "no-cache") + self.send_header("Connection", "close") + self.end_headers() + try: + while True: + jpeg = buf.get(timeout=2.0) + if jpeg is None: + continue + ts = time.time() + header = ( + f"\r\n{MJPEG_BOUNDARY.decode()}\r\n" + f"Content-Type: image/jpeg\r\n" + f"Content-Length: {len(jpeg)}\r\n" + f"X-Timestamp: {ts:.3f}\r\n\r\n" + ).encode() + self.wfile.write(header + jpeg) + self.wfile.flush() + except (BrokenPipeError, ConnectionResetError): + pass + + def _snapshot(self): + jpeg = buf.latest() + if jpeg is None: + self.send_error(503, "No frame available") + return + self.send_response(200) + self.send_header("Content-Type", "image/jpeg") + self.send_header("Content-Length", str(len(jpeg))) + self.send_header("X-Timestamp", str(time.time())) + self.end_headers() + self.wfile.write(jpeg) + + def _health(self): + body = json.dumps({ + "status": "ok", + "frames": buf.count, + "dropped": buf.dropped, + "width": width, + "height": height, + "fps": fps, + }).encode() + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + return Handler + + +# ── WebSocket server ────────────────────────────────────────────────────────── + +async def ws_handler(websocket, buf: FrameBuffer, width: int, height: int, fps: float): + """Handle one WebSocket client connection — streams JPEG frames as binary messages.""" + remote = websocket.remote_address + logging.info("WS client connected: %s", remote) + + # Send info frame + info = json.dumps({ + "type": "info", + "width": width, + "height": height, + "fps": fps, + "ts": time.time(), + }) + try: + await websocket.send(info) + except Exception: + return + + loop = asyncio.get_event_loop() + try: + while True: + # Offload blocking buf.get() to thread pool to keep event loop free + jpeg = await loop.run_in_executor(None, lambda: buf.get(timeout=1.0)) + if jpeg is None: + continue + await websocket.send(jpeg) + except websockets.exceptions.ConnectionClosedOK: + logging.info("WS client disconnected (clean): %s", remote) + except websockets.exceptions.ConnectionClosedError as e: + logging.info("WS client disconnected (error): %s — %s", remote, e) + except Exception as e: + logging.warning("WS handler error: %s", e) + + +# ── Statistics logger ───────────────────────────────────────────────────────── + +def _stats_logger(buf: FrameBuffer, stop_evt: threading.Event) -> None: + prev = 0 + while not stop_evt.wait(10.0): + delta = buf.count - prev + prev = buf.count + logging.info("Capture stats — frames=%d (+%d/10s) dropped=%d", + buf.count, delta, buf.dropped) + + +# ── Entrypoint ──────────────────────────────────────────────────────────────── + +def main() -> None: + parser = argparse.ArgumentParser( + description="SaltyBot phone video bridge (Issue #585)" + ) + parser.add_argument("--ws-port", type=int, default=8765, + help="WebSocket server port (default: 8765)") + parser.add_argument("--http-port", type=int, default=8766, + help="HTTP MJPEG port (default: 8766)") + parser.add_argument("--width", type=int, default=640, + help="Frame width (default: 640)") + parser.add_argument("--height", type=int, default=480, + help="Frame height (default: 480)") + parser.add_argument("--fps", type=float, default=15.0, + help="Target FPS (default: 15)") + parser.add_argument("--quality", type=int, default=75, + help="JPEG quality 1-100 (default: 75)") + parser.add_argument("--device", default="/dev/video0", + help="V4L2 device or index (default: /dev/video0)") + parser.add_argument("--camera-id", type=int, default=0, + help="termux-camera-photo camera id (default: 0 = rear)") + parser.add_argument("--no-http", action="store_true", + help="Disable HTTP server") + parser.add_argument("--debug", action="store_true", + help="Verbose logging") + args = parser.parse_args() + + logging.basicConfig( + level=logging.DEBUG if args.debug else logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + ) + + if not WS_AVAILABLE: + logging.error("websockets not installed. Run: pip install websockets") + sys.exit(1) + + # ── Select and open camera backend ─────────────────────────────────────── + backend = None + if CV2_AVAILABLE: + cv_backend = OpenCVCapture(args.device, args.width, args.height, + args.fps, args.quality) + if cv_backend.open(): + logging.info("Using OpenCV backend (%s)", args.device) + backend = cv_backend + else: + logging.info("OpenCV backend unavailable, falling back to termux-camera-photo") + + if backend is None: + tx_backend = TermuxCapture(args.camera_id, args.width, args.height, args.quality) + if tx_backend.open(): + logging.info("Using termux-camera-photo backend (camera %d)", args.camera_id) + backend = tx_backend + else: + logging.error("No camera backend available. " + "Install opencv-python or termux-api.") + sys.exit(1) + + # ── Frame buffer ───────────────────────────────────────────────────────── + buf = FrameBuffer() + + # ── Capture thread ──────────────────────────────────────────────────────── + capture = CaptureThread(backend, args.fps, buf) + capture.start() + + # ── HTTP server thread ──────────────────────────────────────────────────── + stop_evt = threading.Event() + if not args.no_http: + handler_cls = _make_http_handler(buf, args.width, args.height, args.fps) + http_server = ThreadingHTTPServer(("0.0.0.0", args.http_port), handler_cls) + http_thread = threading.Thread( + target=http_server.serve_forever, name="http", daemon=True + ) + http_thread.start() + logging.info("HTTP MJPEG server: http://0.0.0.0:%d/stream", args.http_port) + logging.info("HTTP snapshot: http://0.0.0.0:%d/snapshot", args.http_port) + logging.info("HTTP health: http://0.0.0.0:%d/health", args.http_port) + + # ── Stats logger ────────────────────────────────────────────────────────── + stats_thread = threading.Thread( + target=_stats_logger, args=(buf, stop_evt), name="stats", daemon=True + ) + stats_thread.start() + + # ── WebSocket server (runs the event loop) ──────────────────────────────── + logging.info("WebSocket server: ws://0.0.0.0:%d", args.ws_port) + + async def _run_ws(): + async with websockets.serve( + lambda ws: ws_handler(ws, buf, args.width, args.height, args.fps), + "0.0.0.0", + args.ws_port, + max_size=None, # no message size limit + ping_interval=5, + ping_timeout=10, + ): + logging.info("Ready — streaming %dx%d @ %.0f fps", + args.width, args.height, args.fps) + await asyncio.Future() # run forever + + try: + asyncio.run(_run_ws()) + except KeyboardInterrupt: + logging.info("Shutting down...") + finally: + stop_evt.set() + capture.stop() + backend.close() + if not args.no_http: + http_server.shutdown() + + +if __name__ == "__main__": + main() -- 2.47.2