saltylab-firmware/phone/motor_test_joystick.py
sl-android f49e84b8bb feat: Phone-based motor test joystick app (Issue #513)
Implements terminal-based curses UI for interactive bench testing of SaltyBot motors via Termux.

Features:
- Interactive keyboard-based joystick (W/A/S/D or arrow keys)
- Conservative velocity defaults: 0.1 m/s linear, 0.3 rad/s angular
- Real-time velocity feedback with bar graphs
- Spacebar e-stop (instantly zeros velocity)
- 500ms timeout safety (sends zero velocity if idle)
- Dual backend: ROS2 (/cmd_vel) or WebSocket
- Graceful fallback if ROS2 unavailable

Safety Features:
- Conservative defaults (0.1/0.3 m/s)
- E-stop button (spacebar)
- 500ms timeout (sends zero if idle)
- Input clamping and exponential decay
- Status/warning displays

Files:
- motor_test_joystick.py: Main application
- MOTOR_TEST_JOYSTICK.md: User documentation
- INSTALL_MOTOR_TEST.md: Installation guide

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-03-06 11:47:15 -05:00

338 lines
12 KiB
Python

#!/usr/bin/env python3
"""
motor_test_joystick.py — Terminal-based joystick for motor testing (Issue #513)
Provides a curses-based interactive joystick UI for bench testing SaltyBot motors.
Sends Twist commands to /cmd_vel via ROS2 (or WebSocket fallback).
Controls:
W/A/S/D or Arrow Keys — Steer/throttle (left/down/right/up)
Spacebar — E-stop (hold = motors disabled)
Q — Quit
Features:
- Conservative velocity defaults: 0.1 m/s linear, 0.3 rad/s angular
- Real-time velocity feedback display
- 500ms timeout safety: stops motors if no command received
- Terminal UI: velocity bars, status line, e-stop indicator
"""
import curses
import threading
import time
import argparse
import sys
from dataclasses import dataclass
from enum import Enum
# Try to import ROS2; fall back to WebSocket if unavailable
try:
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
ROS2_AVAILABLE = True
except ImportError:
ROS2_AVAILABLE = False
import json
import socket
# === Constants ===
DEFAULT_LINEAR_VEL = 0.1 # m/s
DEFAULT_ANGULAR_VEL = 0.3 # rad/s
TIMEOUT_MS = 500 # ms before sending zero velocity
POLL_RATE_HZ = 20 # UI update rate
# === Data Classes ===
@dataclass
class VelocityState:
"""Current velocity state"""
linear_x: float = 0.0
angular_z: float = 0.0
max_linear: float = DEFAULT_LINEAR_VEL
max_angular: float = DEFAULT_ANGULAR_VEL
estop_active: bool = False
last_command_time: float = 0.0
class ControllerBackend(Enum):
"""ROS2 or WebSocket backend"""
ROS2 = "ros2"
WEBSOCKET = "websocket"
# === ROS2 Backend ===
class MotorTestNode(Node):
"""ROS2 node for publishing Twist commands"""
def __init__(self):
super().__init__('motor_test_joystick')
self.pub = self.create_publisher(Twist, '/cmd_vel', 10)
def send_twist(self, linear_x: float, angular_z: float):
"""Publish Twist command"""
twist = Twist()
twist.linear.x = linear_x
twist.angular.z = angular_z
self.pub.publish(twist)
# === WebSocket Backend ===
class WebSocketController:
"""WebSocket client for communicating with Jetson"""
def __init__(self, host: str = "127.0.0.1", port: int = 9090):
self.host = host
self.port = port
self.sock = None
self.connected = False
self._connect()
def _connect(self):
"""Establish WebSocket connection"""
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.host, self.port))
self.connected = True
except Exception as e:
print(f"WebSocket connection failed: {e}")
self.connected = False
def send_twist(self, linear_x: float, angular_z: float):
"""Send Twist via JSON over WebSocket"""
if not self.connected:
return
try:
msg = {
"type": "twist",
"linear_x": float(linear_x),
"angular_z": float(angular_z),
"timestamp": time.time()
}
self.sock.sendall((json.dumps(msg) + "\n").encode())
except Exception as e:
print(f"WebSocket send error: {e}")
self.connected = False
def close(self):
"""Close connection"""
if self.sock:
self.sock.close()
# === Main Controller ===
class MotorTestController:
"""Main controller for joystick UI and motor commands"""
def __init__(self, backend: ControllerBackend = ControllerBackend.ROS2):
self.backend = backend
self.state = VelocityState()
self.running = True
self.lock = threading.Lock()
# Initialize backend
if backend == ControllerBackend.ROS2 and ROS2_AVAILABLE:
rclpy.init()
self.node = MotorTestNode()
self.backend_obj = self.node
else:
self.backend_obj = WebSocketController()
# Start ROS2 spin thread if needed
if isinstance(self.backend_obj, MotorTestNode):
self.spin_thread = threading.Thread(
target=lambda: rclpy.spin(self.node),
daemon=True
)
self.spin_thread.start()
def update_velocity(self, linear_x: float, angular_z: float):
"""Update and send velocity command"""
with self.lock:
# Clamp to max velocities
linear_x = max(-self.state.max_linear, min(self.state.max_linear, linear_x))
angular_z = max(-self.state.max_angular, min(self.state.max_angular, angular_z))
# Zero out if estop active
if self.state.estop_active:
linear_x = 0.0
angular_z = 0.0
# Check timeout
if time.time() - self.state.last_command_time > TIMEOUT_MS / 1000.0:
linear_x = 0.0
angular_z = 0.0
self.state.linear_x = linear_x
self.state.angular_z = angular_z
self.state.last_command_time = time.time()
# Send to backend
if self.backend_obj:
self.backend_obj.send_twist(linear_x, angular_z)
def set_estop(self, active: bool):
"""Set e-stop state"""
with self.lock:
self.state.estop_active = active
if active:
self.state.linear_x = 0.0
self.state.angular_z = 0.0
if self.backend_obj:
self.backend_obj.send_twist(0.0, 0.0)
def shutdown(self):
"""Clean shutdown"""
self.running = False
self.update_velocity(0.0, 0.0)
time.sleep(0.1)
if isinstance(self.backend_obj, MotorTestNode):
self.node.destroy_node()
rclpy.shutdown()
elif isinstance(self.backend_obj, WebSocketController):
self.backend_obj.close()
# === Curses UI ===
def run_joystick_ui(stdscr, controller: MotorTestController):
"""Main curses event loop"""
curses.curs_set(0)
stdscr.nodelay(1)
stdscr.timeout(int(1000 / POLL_RATE_HZ))
# Color pairs
curses.init_pair(1, curses.COLOR_GREEN, curses.COLOR_BLACK)
curses.init_pair(2, curses.COLOR_RED, curses.COLOR_BLACK)
curses.init_pair(3, curses.COLOR_YELLOW, curses.COLOR_BLACK)
linear_input = 0.0
angular_input = 0.0
status_msg = "Ready. W/A/S/D=control, SPACE=estop, Q=quit"
try:
while controller.running:
# Get input
try:
key = stdscr.getch()
except:
key = -1
# Process input
if key == ord('q') or key == ord('Q'):
break
elif key == ord(' '): # Spacebar
controller.set_estop(not controller.state.estop_active)
status_msg = f"E-STOP {'ACTIVE' if controller.state.estop_active else 'INACTIVE'}"
elif key in [ord('w'), ord('W'), curses.KEY_UP]:
linear_input = min(linear_input + 0.02, DEFAULT_LINEAR_VEL)
status_msg = f"Forward: {linear_input:.2f} m/s"
elif key in [ord('s'), ord('S'), curses.KEY_DOWN]:
linear_input = max(linear_input - 0.02, -DEFAULT_LINEAR_VEL)
status_msg = f"Reverse: {linear_input:.2f} m/s"
elif key in [ord('d'), ord('D'), curses.KEY_RIGHT]:
angular_input = max(angular_input - 0.02, -DEFAULT_ANGULAR_VEL)
status_msg = f"Right: {-angular_input:.2f} rad/s"
elif key in [ord('a'), ord('A'), curses.KEY_LEFT]:
angular_input = min(angular_input + 0.02, DEFAULT_ANGULAR_VEL)
status_msg = f"Left: {angular_input:.2f} rad/s"
elif key == ord('r') or key == ord('R'):
linear_input = 0.0
angular_input = 0.0
status_msg = "Velocities reset"
# Exponential decay
if key == -1:
linear_input *= 0.95
angular_input *= 0.95
if abs(linear_input) < 0.01:
linear_input = 0.0
if abs(angular_input) < 0.01:
angular_input = 0.0
# Send velocity
controller.update_velocity(linear_input, angular_input)
# Render UI
stdscr.clear()
height, width = stdscr.getmaxyx()
title = "SaltyBot Motor Test Joystick (Issue #513)"
stdscr.addstr(0, max(0, (width - len(title)) // 2), title,
curses.color_pair(1) | curses.A_BOLD)
y = 2
estop_color = curses.color_pair(2) if controller.state.estop_active else curses.color_pair(1)
estop_text = f"E-STOP: {'ACTIVE' if controller.state.estop_active else 'Inactive'}"
stdscr.addstr(y, 2, estop_text, estop_color)
y += 2
stdscr.addstr(y, 2, f"Linear X: {controller.state.linear_x:+7.3f} m/s (max: {DEFAULT_LINEAR_VEL})")
y += 1
bar_width = 30
bar_fill = int((controller.state.linear_x / DEFAULT_LINEAR_VEL) * (bar_width / 2))
bar_fill = max(-bar_width // 2, min(bar_width // 2, bar_fill))
bar = "[" + " " * (bar_width // 2 + bar_fill) + "" + " " * (bar_width // 2 - bar_fill) + "]"
stdscr.addstr(y, 2, bar, curses.color_pair(3))
y += 2
stdscr.addstr(y, 2, f"Angular Z: {controller.state.angular_z:+7.3f} rad/s (max: {DEFAULT_ANGULAR_VEL})")
y += 1
bar_fill = int((controller.state.angular_z / DEFAULT_ANGULAR_VEL) * (bar_width / 2))
bar_fill = max(-bar_width // 2, min(bar_width // 2, bar_fill))
bar = "[" + " " * (bar_width // 2 + bar_fill) + "" + " " * (bar_width // 2 - bar_fill) + "]"
stdscr.addstr(y, 2, bar, curses.color_pair(3))
y += 2
stdscr.addstr(y, 2, f"Input: L={linear_input:+.3f} A={angular_input:+.3f}")
y += 2
stdscr.addstr(y, 2, "W/↑=Forward S/↓=Reverse A/←=Left D/→=Right SPACE=E-stop R=Reset Q=Quit")
y += 2
stdscr.addstr(y, 2, f"Status: {status_msg[:width-10]}", curses.color_pair(1))
stdscr.refresh()
finally:
controller.shutdown()
# === Main Entry Point ===
def main():
parser = argparse.ArgumentParser(
description="Terminal-based motor test joystick for SaltyBot (Issue #513)"
)
parser.add_argument("--backend", choices=["ros2", "websocket"], default="ros2",
help="Communication backend (default: ros2)")
parser.add_argument("--host", default="127.0.0.1",
help="Jetson hostname/IP (for WebSocket backend)")
parser.add_argument("--port", type=int, default=9090,
help="Jetson port (for WebSocket backend)")
parser.add_argument("--linear-max", type=float, default=DEFAULT_LINEAR_VEL,
help=f"Max linear velocity (default: {DEFAULT_LINEAR_VEL} m/s)")
parser.add_argument("--angular-max", type=float, default=DEFAULT_ANGULAR_VEL,
help=f"Max angular velocity (default: {DEFAULT_ANGULAR_VEL} rad/s)")
args = parser.parse_args()
backend = ControllerBackend.WEBSOCKET if args.backend == "websocket" else ControllerBackend.ROS2
controller = MotorTestController(backend=backend)
controller.state.max_linear = args.linear_max
controller.state.max_angular = args.angular_max
try:
curses.wrapper(run_joystick_ui, controller)
except KeyboardInterrupt:
controller.shutdown()
except Exception as e:
print(f"Error: {e}")
controller.shutdown()
sys.exit(1)
print("Motor test joystick closed. Velocities sent to zero.")
if __name__ == "__main__":
main()