Enable Jetson autonomous arming while keeping RC as optional override. Changes: - RC kill switch (CH5 OFF) now triggers emergency stop instead of disarm → Allows Jetson-armed robots to remain armed when RC disconnects → Maintains kill switch safety for emergency situations - RC disarm only triggers on explicit CH5 falling edge (RC still alive) → RC disconnect doesn't disarm Jetson-controlled missions → RC failsafe timer (500ms) handles signal loss separately - Jetson arming via CDC 'A' command works independently of RC state → Robots can operate fully autonomous without RC transmitter → Heartbeat timeout (500ms) prevents runaway if Jetson crashes Safety maintained: - Arming hold timer: 500ms (prevents accidental arm) - Tilt limit: ±10° level required - IMU calibration: Required before any arm attempt - Remote E-stop: Blocks all arming - RC failsafe: 500ms signal loss = disarm - Jetson timeout: 500ms heartbeat = zero motors Command protocol (unchanged): - Jetson: A=arm, D=disarm, E=estop, Z=clear estop - RC: CH5 switch (optional override) - Heartbeat: H command every ≤500ms - Drive: C<speed>,<steer> every ≤200ms See AUTONOMOUS_ARMING.md for complete protocol and testing checklist. Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
385 lines
13 KiB
Python
385 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
motor_test_joystick.py — Terminal-based joystick for motor testing (Issue #513)
|
|
|
|
Provides a curses-based interactive joystick UI for bench testing SaltyBot motors.
|
|
Sends Twist commands to /cmd_vel via ROS2 (or WebSocket fallback).
|
|
|
|
Controls:
|
|
W/A/S/D or Arrow Keys — Steer/throttle (left/down/right/up)
|
|
Spacebar — E-stop (hold = motors disabled)
|
|
Q — Quit
|
|
|
|
Features:
|
|
- Conservative velocity defaults: 0.1 m/s linear, 0.3 rad/s angular
|
|
- Real-time velocity feedback display (current, max, min)
|
|
- 500ms timeout safety: stops motors if no command received
|
|
- Graceful fallback if ROS2 unavailable (WebSocket to Jetson)
|
|
- Terminal UI: velocity bars, input prompt, status line
|
|
"""
|
|
|
|
import curses
|
|
import threading
|
|
import time
|
|
import argparse
|
|
import sys
|
|
from dataclasses import dataclass
|
|
from typing import Optional
|
|
from enum import Enum
|
|
|
|
# Try to import ROS2; fall back to WebSocket if unavailable
|
|
try:
|
|
import rclpy
|
|
from rclpy.node import Node
|
|
from geometry_msgs.msg import Twist
|
|
ROS2_AVAILABLE = True
|
|
except ImportError:
|
|
ROS2_AVAILABLE = False
|
|
import json
|
|
import socket
|
|
|
|
# === Constants ===
|
|
DEFAULT_LINEAR_VEL = 0.1 # m/s
|
|
DEFAULT_ANGULAR_VEL = 0.3 # rad/s
|
|
TIMEOUT_MS = 500 # ms before sending zero velocity
|
|
POLL_RATE_HZ = 20 # UI update rate
|
|
|
|
# === Data Classes ===
|
|
@dataclass
|
|
class VelocityState:
|
|
"""Current velocity state"""
|
|
linear_x: float = 0.0 # m/s
|
|
angular_z: float = 0.0 # rad/s
|
|
max_linear: float = DEFAULT_LINEAR_VEL
|
|
max_angular: float = DEFAULT_ANGULAR_VEL
|
|
estop_active: bool = False
|
|
last_command_time: float = 0.0
|
|
|
|
|
|
class ControllerBackend(Enum):
|
|
"""ROS2 or WebSocket backend"""
|
|
ROS2 = "ros2"
|
|
WEBSOCKET = "websocket"
|
|
|
|
|
|
# === ROS2 Backend ===
|
|
class MotorTestNode(Node):
|
|
"""ROS2 node for publishing Twist commands"""
|
|
|
|
def __init__(self):
|
|
super().__init__('motor_test_joystick')
|
|
self.pub = self.create_publisher(Twist, '/cmd_vel', 10)
|
|
|
|
def send_twist(self, linear_x: float, angular_z: float):
|
|
"""Publish Twist command"""
|
|
twist = Twist()
|
|
twist.linear.x = linear_x
|
|
twist.angular.z = angular_z
|
|
self.pub.publish(twist)
|
|
|
|
|
|
# === WebSocket Backend ===
|
|
class WebSocketController:
|
|
"""WebSocket client for communicating with Jetson"""
|
|
|
|
def __init__(self, host: str = "127.0.0.1", port: int = 9090):
|
|
self.host = host
|
|
self.port = port
|
|
self.sock = None
|
|
self.connected = False
|
|
self._connect()
|
|
|
|
def _connect(self):
|
|
"""Establish WebSocket connection"""
|
|
try:
|
|
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
self.sock.connect((self.host, self.port))
|
|
self.connected = True
|
|
except Exception as e:
|
|
print(f"WebSocket connection failed: {e}")
|
|
self.connected = False
|
|
|
|
def send_twist(self, linear_x: float, angular_z: float):
|
|
"""Send Twist via JSON over WebSocket"""
|
|
if not self.connected:
|
|
return
|
|
|
|
try:
|
|
msg = {
|
|
"type": "twist",
|
|
"linear_x": float(linear_x),
|
|
"angular_z": float(angular_z),
|
|
"timestamp": time.time()
|
|
}
|
|
self.sock.sendall((json.dumps(msg) + "\n").encode())
|
|
except Exception as e:
|
|
print(f"WebSocket send error: {e}")
|
|
self.connected = False
|
|
|
|
def close(self):
|
|
"""Close connection"""
|
|
if self.sock:
|
|
self.sock.close()
|
|
|
|
|
|
# === Main Controller ===
|
|
class MotorTestController:
|
|
"""Main controller for joystick UI and motor commands"""
|
|
|
|
def __init__(self, backend: ControllerBackend = ControllerBackend.ROS2):
|
|
self.backend = backend
|
|
self.state = VelocityState()
|
|
self.running = True
|
|
self.lock = threading.Lock()
|
|
|
|
# Initialize backend
|
|
if backend == ControllerBackend.ROS2 and ROS2_AVAILABLE:
|
|
rclpy.init()
|
|
self.node = MotorTestNode()
|
|
self.backend_obj = self.node
|
|
else:
|
|
self.backend_obj = WebSocketController()
|
|
|
|
# Start ROS2 spin thread if needed
|
|
if isinstance(self.backend_obj, MotorTestNode):
|
|
self.spin_thread = threading.Thread(
|
|
target=lambda: rclpy.spin(self.node),
|
|
daemon=True
|
|
)
|
|
self.spin_thread.start()
|
|
|
|
def update_velocity(self, linear_x: float, angular_z: float):
|
|
"""Update and send velocity command"""
|
|
with self.lock:
|
|
# Clamp to max velocities
|
|
linear_x = max(-self.state.max_linear, min(self.state.max_linear, linear_x))
|
|
angular_z = max(-self.state.max_angular, min(self.state.max_angular, angular_z))
|
|
|
|
# Zero out if estop active
|
|
if self.state.estop_active:
|
|
linear_x = 0.0
|
|
angular_z = 0.0
|
|
|
|
# Check timeout (500ms)
|
|
if time.time() - self.state.last_command_time > TIMEOUT_MS / 1000.0:
|
|
linear_x = 0.0
|
|
angular_z = 0.0
|
|
|
|
self.state.linear_x = linear_x
|
|
self.state.angular_z = angular_z
|
|
self.state.last_command_time = time.time()
|
|
|
|
# Send to backend
|
|
if self.backend_obj:
|
|
self.backend_obj.send_twist(linear_x, angular_z)
|
|
|
|
def set_estop(self, active: bool):
|
|
"""Set e-stop state"""
|
|
with self.lock:
|
|
self.state.estop_active = active
|
|
if active:
|
|
self.state.linear_x = 0.0
|
|
self.state.angular_z = 0.0
|
|
if self.backend_obj:
|
|
self.backend_obj.send_twist(0.0, 0.0)
|
|
|
|
def shutdown(self):
|
|
"""Clean shutdown"""
|
|
self.running = False
|
|
# Send zero velocity
|
|
self.update_velocity(0.0, 0.0)
|
|
time.sleep(0.1)
|
|
|
|
# Cleanup backend
|
|
if isinstance(self.backend_obj, MotorTestNode):
|
|
self.node.destroy_node()
|
|
rclpy.shutdown()
|
|
elif isinstance(self.backend_obj, WebSocketController):
|
|
self.backend_obj.close()
|
|
|
|
|
|
# === Curses UI ===
|
|
def run_joystick_ui(stdscr, controller: MotorTestController):
|
|
"""Main curses event loop"""
|
|
curses.curs_set(0) # Hide cursor
|
|
stdscr.nodelay(1) # Non-blocking getch()
|
|
stdscr.timeout(int(1000 / POLL_RATE_HZ)) # Refresh rate
|
|
|
|
# Color pairs
|
|
curses.init_pair(1, curses.COLOR_GREEN, curses.COLOR_BLACK)
|
|
curses.init_pair(2, curses.COLOR_RED, curses.COLOR_BLACK)
|
|
curses.init_pair(3, curses.COLOR_YELLOW, curses.COLOR_BLACK)
|
|
|
|
linear_input = 0.0
|
|
angular_input = 0.0
|
|
status_msg = "Ready for motor test. Press W/A/S/D to control, SPACE to e-stop, Q to quit."
|
|
|
|
try:
|
|
while controller.running:
|
|
# Get input
|
|
try:
|
|
key = stdscr.getch()
|
|
except:
|
|
key = -1
|
|
|
|
# Process input
|
|
if key == ord('q') or key == ord('Q'):
|
|
break
|
|
elif key == ord(' '): # Spacebar
|
|
controller.set_estop(not controller.state.estop_active)
|
|
status_msg = f"E-STOP {'ACTIVE' if controller.state.estop_active else 'INACTIVE'}"
|
|
elif key in [ord('w'), ord('W'), curses.KEY_UP]:
|
|
linear_input = min(linear_input + 0.02, DEFAULT_LINEAR_VEL)
|
|
status_msg = f"Forward: {linear_input:.2f} m/s"
|
|
elif key in [ord('s'), ord('S'), curses.KEY_DOWN]:
|
|
linear_input = max(linear_input - 0.02, -DEFAULT_LINEAR_VEL)
|
|
status_msg = f"Reverse: {linear_input:.2f} m/s"
|
|
elif key in [ord('d'), ord('D'), curses.KEY_RIGHT]:
|
|
angular_input = max(angular_input - 0.02, -DEFAULT_ANGULAR_VEL)
|
|
status_msg = f"Right: {-angular_input:.2f} rad/s"
|
|
elif key in [ord('a'), ord('A'), curses.KEY_LEFT]:
|
|
angular_input = min(angular_input + 0.02, DEFAULT_ANGULAR_VEL)
|
|
status_msg = f"Left: {angular_input:.2f} rad/s"
|
|
elif key == ord('r') or key == ord('R'):
|
|
# Reset velocities
|
|
linear_input = 0.0
|
|
angular_input = 0.0
|
|
status_msg = "Velocities reset to zero"
|
|
|
|
# Apply exponential decay to input (if no new input, ramp down)
|
|
if key == -1: # No input
|
|
linear_input *= 0.95
|
|
angular_input *= 0.95
|
|
if abs(linear_input) < 0.01:
|
|
linear_input = 0.0
|
|
if abs(angular_input) < 0.01:
|
|
angular_input = 0.0
|
|
|
|
# Send updated velocity
|
|
controller.update_velocity(linear_input, angular_input)
|
|
|
|
# Render UI
|
|
stdscr.clear()
|
|
height, width = stdscr.getmaxyx()
|
|
|
|
# Title
|
|
title = "SaltyBot Motor Test Joystick (Issue #513)"
|
|
stdscr.addstr(0, (width - len(title)) // 2, title,
|
|
curses.color_pair(1) | curses.A_BOLD)
|
|
|
|
# Status line
|
|
y = 2
|
|
estop_color = curses.color_pair(2) if controller.state.estop_active else curses.color_pair(1)
|
|
estop_text = f"E-STOP: {'🛑 ACTIVE' if controller.state.estop_active else '✓ Inactive'}"
|
|
stdscr.addstr(y, 2, estop_text, estop_color)
|
|
y += 2
|
|
|
|
# Velocity displays
|
|
stdscr.addstr(y, 2, f"Linear X: {controller.state.linear_x:+7.3f} m/s (max: {DEFAULT_LINEAR_VEL})")
|
|
y += 1
|
|
# Bar for linear
|
|
bar_width = 30
|
|
bar_fill = int((controller.state.linear_x / DEFAULT_LINEAR_VEL) * (bar_width / 2))
|
|
bar_fill = max(-bar_width // 2, min(bar_width // 2, bar_fill))
|
|
bar = "[" + " " * (bar_width // 2 + bar_fill) + "●" + " " * (bar_width // 2 - bar_fill) + "]"
|
|
stdscr.addstr(y, 2, bar, curses.color_pair(3))
|
|
y += 2
|
|
|
|
stdscr.addstr(y, 2, f"Angular Z: {controller.state.angular_z:+7.3f} rad/s (max: {DEFAULT_ANGULAR_VEL})")
|
|
y += 1
|
|
# Bar for angular
|
|
bar_fill = int((controller.state.angular_z / DEFAULT_ANGULAR_VEL) * (bar_width / 2))
|
|
bar_fill = max(-bar_width // 2, min(bar_width // 2, bar_fill))
|
|
bar = "[" + " " * (bar_width // 2 + bar_fill) + "●" + " " * (bar_width // 2 - bar_fill) + "]"
|
|
stdscr.addstr(y, 2, bar, curses.color_pair(3))
|
|
y += 2
|
|
|
|
# Command input display
|
|
stdscr.addstr(y, 2, f"Input: Linear={linear_input:+.3f} Angular={angular_input:+.3f}")
|
|
y += 2
|
|
|
|
# Controls legend
|
|
stdscr.addstr(y, 2, "Controls:")
|
|
y += 1
|
|
stdscr.addstr(y, 2, " W/↑ = Forward S/↓ = Reverse A/← = Left D/→ = Right")
|
|
y += 1
|
|
stdscr.addstr(y, 2, " SPACE = E-stop (toggle) R = Reset Q = Quit")
|
|
y += 2
|
|
|
|
# Status message
|
|
stdscr.addstr(y, 2, f"Status: {status_msg[:width-10]}", curses.color_pair(1))
|
|
y += 2
|
|
|
|
# Timeout warning
|
|
time_since_cmd = time.time() - controller.state.last_command_time
|
|
if time_since_cmd > (TIMEOUT_MS / 1000.0):
|
|
warning = f"⚠ TIMEOUT: Motors disabled ({time_since_cmd:.1f}s since last command)"
|
|
stdscr.addstr(y, 2, warning, curses.color_pair(2))
|
|
|
|
stdscr.refresh()
|
|
|
|
finally:
|
|
controller.shutdown()
|
|
|
|
|
|
# === Main Entry Point ===
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Terminal-based motor test joystick for SaltyBot (Issue #513)"
|
|
)
|
|
parser.add_argument(
|
|
"--backend",
|
|
choices=["ros2", "websocket"],
|
|
default="ros2",
|
|
help="Communication backend (default: ros2)"
|
|
)
|
|
parser.add_argument(
|
|
"--host",
|
|
default="127.0.0.1",
|
|
help="Jetson hostname/IP (for WebSocket backend)"
|
|
)
|
|
parser.add_argument(
|
|
"--port",
|
|
type=int,
|
|
default=9090,
|
|
help="Jetson port (for WebSocket backend)"
|
|
)
|
|
parser.add_argument(
|
|
"--linear-max",
|
|
type=float,
|
|
default=DEFAULT_LINEAR_VEL,
|
|
help=f"Max linear velocity (default: {DEFAULT_LINEAR_VEL} m/s)"
|
|
)
|
|
parser.add_argument(
|
|
"--angular-max",
|
|
type=float,
|
|
default=DEFAULT_ANGULAR_VEL,
|
|
help=f"Max angular velocity (default: {DEFAULT_ANGULAR_VEL} rad/s)"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Select backend
|
|
backend = ControllerBackend.WEBSOCKET if args.backend == "websocket" else ControllerBackend.ROS2
|
|
|
|
# Create controller
|
|
controller = MotorTestController(backend=backend)
|
|
controller.state.max_linear = args.linear_max
|
|
controller.state.max_angular = args.angular_max
|
|
|
|
# Run UI
|
|
try:
|
|
curses.wrapper(run_joystick_ui, controller)
|
|
except KeyboardInterrupt:
|
|
controller.shutdown()
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
controller.shutdown()
|
|
sys.exit(1)
|
|
|
|
print("Motor test joystick closed. Velocities sent to zero.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|