#!/usr/bin/env python3 """ motor_test_joystick.py — Terminal-based joystick for motor testing (Issue #513) Provides a curses-based interactive joystick UI for bench testing SaltyBot motors. Sends Twist commands to /cmd_vel via ROS2 (or WebSocket fallback). Controls: W/A/S/D or Arrow Keys — Steer/throttle (left/down/right/up) Spacebar — E-stop (hold = motors disabled) Q — Quit Features: - Conservative velocity defaults: 0.1 m/s linear, 0.3 rad/s angular - Real-time velocity feedback display (current, max, min) - 500ms timeout safety: stops motors if no command received - Graceful fallback if ROS2 unavailable (WebSocket to Jetson) - Terminal UI: velocity bars, input prompt, status line """ import curses import threading import time import argparse import sys from dataclasses import dataclass from typing import Optional from enum import Enum # Try to import ROS2; fall back to WebSocket if unavailable try: import rclpy from rclpy.node import Node from geometry_msgs.msg import Twist ROS2_AVAILABLE = True except ImportError: ROS2_AVAILABLE = False import json import socket # === Constants === DEFAULT_LINEAR_VEL = 0.1 # m/s DEFAULT_ANGULAR_VEL = 0.3 # rad/s TIMEOUT_MS = 500 # ms before sending zero velocity POLL_RATE_HZ = 20 # UI update rate # === Data Classes === @dataclass class VelocityState: """Current velocity state""" linear_x: float = 0.0 # m/s angular_z: float = 0.0 # rad/s max_linear: float = DEFAULT_LINEAR_VEL max_angular: float = DEFAULT_ANGULAR_VEL estop_active: bool = False last_command_time: float = 0.0 class ControllerBackend(Enum): """ROS2 or WebSocket backend""" ROS2 = "ros2" WEBSOCKET = "websocket" # === ROS2 Backend === class MotorTestNode(Node): """ROS2 node for publishing Twist commands""" def __init__(self): super().__init__('motor_test_joystick') self.pub = self.create_publisher(Twist, '/cmd_vel', 10) def send_twist(self, linear_x: float, angular_z: float): """Publish Twist command""" twist = Twist() twist.linear.x = linear_x twist.angular.z = angular_z self.pub.publish(twist) # === WebSocket Backend === class WebSocketController: """WebSocket client for communicating with Jetson""" def __init__(self, host: str = "127.0.0.1", port: int = 9090): self.host = host self.port = port self.sock = None self.connected = False self._connect() def _connect(self): """Establish WebSocket connection""" try: self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect((self.host, self.port)) self.connected = True except Exception as e: print(f"WebSocket connection failed: {e}") self.connected = False def send_twist(self, linear_x: float, angular_z: float): """Send Twist via JSON over WebSocket""" if not self.connected: return try: msg = { "type": "twist", "linear_x": float(linear_x), "angular_z": float(angular_z), "timestamp": time.time() } self.sock.sendall((json.dumps(msg) + "\n").encode()) except Exception as e: print(f"WebSocket send error: {e}") self.connected = False def close(self): """Close connection""" if self.sock: self.sock.close() # === Main Controller === class MotorTestController: """Main controller for joystick UI and motor commands""" def __init__(self, backend: ControllerBackend = ControllerBackend.ROS2): self.backend = backend self.state = VelocityState() self.running = True self.lock = threading.Lock() # Initialize backend if backend == ControllerBackend.ROS2 and ROS2_AVAILABLE: rclpy.init() self.node = MotorTestNode() self.backend_obj = self.node else: self.backend_obj = WebSocketController() # Start ROS2 spin thread if needed if isinstance(self.backend_obj, MotorTestNode): self.spin_thread = threading.Thread( target=lambda: rclpy.spin(self.node), daemon=True ) self.spin_thread.start() def update_velocity(self, linear_x: float, angular_z: float): """Update and send velocity command""" with self.lock: # Clamp to max velocities linear_x = max(-self.state.max_linear, min(self.state.max_linear, linear_x)) angular_z = max(-self.state.max_angular, min(self.state.max_angular, angular_z)) # Zero out if estop active if self.state.estop_active: linear_x = 0.0 angular_z = 0.0 # Check timeout (500ms) if time.time() - self.state.last_command_time > TIMEOUT_MS / 1000.0: linear_x = 0.0 angular_z = 0.0 self.state.linear_x = linear_x self.state.angular_z = angular_z self.state.last_command_time = time.time() # Send to backend if self.backend_obj: self.backend_obj.send_twist(linear_x, angular_z) def set_estop(self, active: bool): """Set e-stop state""" with self.lock: self.state.estop_active = active if active: self.state.linear_x = 0.0 self.state.angular_z = 0.0 if self.backend_obj: self.backend_obj.send_twist(0.0, 0.0) def shutdown(self): """Clean shutdown""" self.running = False # Send zero velocity self.update_velocity(0.0, 0.0) time.sleep(0.1) # Cleanup backend if isinstance(self.backend_obj, MotorTestNode): self.node.destroy_node() rclpy.shutdown() elif isinstance(self.backend_obj, WebSocketController): self.backend_obj.close() # === Curses UI === def run_joystick_ui(stdscr, controller: MotorTestController): """Main curses event loop""" curses.curs_set(0) # Hide cursor stdscr.nodelay(1) # Non-blocking getch() stdscr.timeout(int(1000 / POLL_RATE_HZ)) # Refresh rate # Color pairs curses.init_pair(1, curses.COLOR_GREEN, curses.COLOR_BLACK) curses.init_pair(2, curses.COLOR_RED, curses.COLOR_BLACK) curses.init_pair(3, curses.COLOR_YELLOW, curses.COLOR_BLACK) linear_input = 0.0 angular_input = 0.0 status_msg = "Ready for motor test. Press W/A/S/D to control, SPACE to e-stop, Q to quit." try: while controller.running: # Get input try: key = stdscr.getch() except: key = -1 # Process input if key == ord('q') or key == ord('Q'): break elif key == ord(' '): # Spacebar controller.set_estop(not controller.state.estop_active) status_msg = f"E-STOP {'ACTIVE' if controller.state.estop_active else 'INACTIVE'}" elif key in [ord('w'), ord('W'), curses.KEY_UP]: linear_input = min(linear_input + 0.02, DEFAULT_LINEAR_VEL) status_msg = f"Forward: {linear_input:.2f} m/s" elif key in [ord('s'), ord('S'), curses.KEY_DOWN]: linear_input = max(linear_input - 0.02, -DEFAULT_LINEAR_VEL) status_msg = f"Reverse: {linear_input:.2f} m/s" elif key in [ord('d'), ord('D'), curses.KEY_RIGHT]: angular_input = max(angular_input - 0.02, -DEFAULT_ANGULAR_VEL) status_msg = f"Right: {-angular_input:.2f} rad/s" elif key in [ord('a'), ord('A'), curses.KEY_LEFT]: angular_input = min(angular_input + 0.02, DEFAULT_ANGULAR_VEL) status_msg = f"Left: {angular_input:.2f} rad/s" elif key == ord('r') or key == ord('R'): # Reset velocities linear_input = 0.0 angular_input = 0.0 status_msg = "Velocities reset to zero" # Apply exponential decay to input (if no new input, ramp down) if key == -1: # No input linear_input *= 0.95 angular_input *= 0.95 if abs(linear_input) < 0.01: linear_input = 0.0 if abs(angular_input) < 0.01: angular_input = 0.0 # Send updated velocity controller.update_velocity(linear_input, angular_input) # Render UI stdscr.clear() height, width = stdscr.getmaxyx() # Title title = "SaltyBot Motor Test Joystick (Issue #513)" stdscr.addstr(0, (width - len(title)) // 2, title, curses.color_pair(1) | curses.A_BOLD) # Status line y = 2 estop_color = curses.color_pair(2) if controller.state.estop_active else curses.color_pair(1) estop_text = f"E-STOP: {'ACTIVE' if controller.state.estop_active else 'Inactive'}" stdscr.addstr(y, 2, estop_text, estop_color) y += 2 # Velocity displays stdscr.addstr(y, 2, f"Linear X: {controller.state.linear_x:+7.3f} m/s (max: {DEFAULT_LINEAR_VEL})") y += 1 # Bar for linear bar_width = 30 bar_fill = int((controller.state.linear_x / DEFAULT_LINEAR_VEL) * (bar_width / 2)) bar_fill = max(-bar_width // 2, min(bar_width // 2, bar_fill)) bar = "[" + " " * (bar_width // 2 + bar_fill) + "●" + " " * (bar_width // 2 - bar_fill) + "]" stdscr.addstr(y, 2, bar, curses.color_pair(3)) y += 2 stdscr.addstr(y, 2, f"Angular Z: {controller.state.angular_z:+7.3f} rad/s (max: {DEFAULT_ANGULAR_VEL})") y += 1 # Bar for angular bar_fill = int((controller.state.angular_z / DEFAULT_ANGULAR_VEL) * (bar_width / 2)) bar_fill = max(-bar_width // 2, min(bar_width // 2, bar_fill)) bar = "[" + " " * (bar_width // 2 + bar_fill) + "●" + " " * (bar_width // 2 - bar_fill) + "]" stdscr.addstr(y, 2, bar, curses.color_pair(3)) y += 2 # Command input display stdscr.addstr(y, 2, f"Input: Linear={linear_input:+.3f} Angular={angular_input:+.3f}") y += 2 # Controls legend stdscr.addstr(y, 2, "Controls:") y += 1 stdscr.addstr(y, 2, " W/↑ = Forward S/↓ = Reverse A/← = Left D/→ = Right") y += 1 stdscr.addstr(y, 2, " SPACE = E-stop (toggle) R = Reset Q = Quit") y += 2 # Status message stdscr.addstr(y, 2, f"Status: {status_msg[:width-10]}", curses.color_pair(1)) y += 2 # Timeout warning time_since_cmd = time.time() - controller.state.last_command_time if time_since_cmd > (TIMEOUT_MS / 1000.0): warning = f"TIMEOUT: Motors disabled ({time_since_cmd:.1f}s since last command)" stdscr.addstr(y, 2, warning, curses.color_pair(2)) stdscr.refresh() finally: controller.shutdown() # === Main Entry Point === def main(): parser = argparse.ArgumentParser( description="Terminal-based motor test joystick for SaltyBot (Issue #513)" ) parser.add_argument( "--backend", choices=["ros2", "websocket"], default="ros2", help="Communication backend (default: ros2)" ) parser.add_argument( "--host", default="127.0.0.1", help="Jetson hostname/IP (for WebSocket backend)" ) parser.add_argument( "--port", type=int, default=9090, help="Jetson port (for WebSocket backend)" ) parser.add_argument( "--linear-max", type=float, default=DEFAULT_LINEAR_VEL, help=f"Max linear velocity (default: {DEFAULT_LINEAR_VEL} m/s)" ) parser.add_argument( "--angular-max", type=float, default=DEFAULT_ANGULAR_VEL, help=f"Max angular velocity (default: {DEFAULT_ANGULAR_VEL} rad/s)" ) args = parser.parse_args() # Select backend backend = ControllerBackend.WEBSOCKET if args.backend == "websocket" else ControllerBackend.ROS2 # Create controller controller = MotorTestController(backend=backend) controller.state.max_linear = args.linear_max controller.state.max_angular = args.angular_max # Run UI try: curses.wrapper(run_joystick_ui, controller) except KeyboardInterrupt: controller.shutdown() except Exception as e: print(f"Error: {e}") controller.shutdown() sys.exit(1) print("Motor test joystick closed. Velocities sent to zero.") if __name__ == "__main__": main()