feat: LED strip controller (Issue #432) #436

Merged
sl-jetson merged 2 commits from sl-mechanical/issue-432-led-controller into main 2026-03-05 08:59:20 -05:00
24 changed files with 1239 additions and 0 deletions

View File

@ -0,0 +1,87 @@
# NeoPixel LED Controller Configuration
# Hardware configuration
hardware:
num_leds: 30 # Total number of NeoPixels
gpio_pin: 18 # GPIO pin for data line
brightness_max: 255 # Maximum brightness (0-255)
brightness_min: 10 # Minimum brightness for visible LEDs
update_frequency: 30 # Update frequency in Hz
# Brightness control
brightness:
enabled: true
auto_dim: true # Enable automatic brightness dimming
day_brightness: 200 # Daytime brightness (0-255)
night_brightness: 80 # Night-time brightness (0-255)
dim_threshold: 50 # Light sensor threshold for dimming
# Animation settings
animation:
transition_duration: 0.5 # Smooth transition duration in seconds
breathing_speed: 1.5 # Breathing pattern speed (cycles/sec)
pulse_speed: 2.0 # Pulse pattern speed (cycles/sec)
chase_speed: 3.0 # Chase pattern speed (LEDs/sec)
# LED patterns: define colors as [R, G, B] (0-255)
patterns:
idle:
name: "Breathing Blue"
color: [0, 50, 200]
pattern: "breathing"
speed: 1.5
follow:
name: "Green Pulse"
color: [0, 200, 50]
pattern: "pulse"
speed: 2.0
error:
name: "Red Flash"
color: [255, 0, 0]
pattern: "flash"
speed: 3.0
celebrate:
name: "Rainbow"
color: null # Rainbow uses spectrum
pattern: "rainbow"
speed: 2.0
low_battery:
name: "Amber Warning"
color: [255, 165, 0]
pattern: "pulse"
speed: 1.5
search:
name: "White Chase"
color: [255, 255, 255]
pattern: "chase"
speed: 3.0
# State priorities (higher value = higher priority)
state_priorities:
error: 100
low_battery: 50
celebrate: 40
follow: 30
idle: 10
# Subscription topics
subscriptions:
battery: "/saltybot/battery/level"
balance: "/saltybot/balance/status"
social: "/saltybot/social/state"
emotion: "/saltybot/emotion/state"
health: "/saltybot/health/status"
# Publishing topics
publications:
led_state: "/saltybot/led_state"
# Thresholds
thresholds:
low_battery_percent: 20 # Battery % to trigger low battery LED
error_timeout: 2.0 # Seconds without heartbeat before error

View File

@ -0,0 +1,36 @@
"""Launch file for LED Controller node."""
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.substitutions import LaunchConfiguration
from launch.actions import DeclareLaunchArgument
from ament_index_python.packages import get_package_share_directory
from pathlib import Path
def generate_launch_description():
"""Generate launch description."""
package_dir = get_package_share_directory("saltybot_led_controller")
config_file = str(Path(package_dir) / "config" / "led_config.yaml")
# Launch arguments
config_arg = DeclareLaunchArgument(
"config_file",
default_value=config_file,
description="Path to LED configuration file",
)
# LED Controller node
led_node = Node(
package="saltybot_led_controller",
executable="led_controller_node",
name="led_controller",
parameters=[
{
"config_file": LaunchConfiguration("config_file"),
}
],
output="screen",
)
return LaunchDescription([config_arg, led_node])

View File

@ -0,0 +1,28 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_led_controller</name>
<version>0.1.0</version>
<description>
NeoPixel LED strip controller for SaltyBot. Subscribes to system state topics
(battery, balance, social, emotion, health) and renders animated LED patterns
with smooth transitions. Supports configurable LED count, GPIO pin, and brightness.
Publishes LED state status.
</description>
<maintainer email="sl-mechanical@saltylab.local">sl-mechanical</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>std_msgs</depend>
<buildtool_depend>ament_python</buildtool_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,422 @@
#!/usr/bin/env python3
"""NeoPixel LED Controller for SaltyBot.
Subscribes to system state topics (battery, balance, social, emotion, health)
and renders animated LED patterns with smooth transitions. Supports configurable
LED count, GPIO pin, and auto-dim brightness.
Published topics:
/saltybot/led_state (std_msgs/String) - JSON LED status
Subscribed topics:
/saltybot/battery/level (std_msgs/String) - Battery percentage
/saltybot/balance/status (std_msgs/String) - Balance state
/saltybot/social/state (std_msgs/String) - Social state
/saltybot/emotion/state (std_msgs/String) - Emotion state
/saltybot/health/status (std_msgs/String) - Health status
"""
import json
import math
import time
from dataclasses import dataclass, asdict
from typing import Dict, List, Optional, Tuple
from pathlib import Path
from enum import Enum
import yaml
import rclpy
from rclpy.node import Node
from rclpy.timer import Timer
from std_msgs.msg import String
class LEDState(Enum):
"""LED display states ordered by priority."""
ERROR = 100
LOW_BATTERY = 50
CELEBRATE = 40
FOLLOW = 30
IDLE = 10
@dataclass
class LEDColor:
"""RGB color representation."""
r: int
g: int
b: int
def __post_init__(self):
"""Clamp values to 0-255."""
self.r = max(0, min(255, self.r))
self.g = max(0, min(255, self.g))
self.b = max(0, min(255, self.b))
def to_tuple(self) -> Tuple[int, int, int]:
"""Return as RGB tuple."""
return (self.r, self.g, self.b)
def scale(self, factor: float) -> "LEDColor":
"""Scale color by factor (0.0-1.0)."""
return LEDColor(
int(self.r * factor),
int(self.g * factor),
int(self.b * factor),
)
class LEDPattern:
"""Base class for LED animation patterns."""
def __init__(self, color: Optional[LEDColor], num_leds: int, speed: float = 1.0):
"""Initialize pattern.
Args:
color: RGB color or None for rainbow
num_leds: Total number of LEDs
speed: Animation speed multiplier
"""
self.color = color
self.num_leds = num_leds
self.speed = speed
self.time_offset = 0.0
def update(self, dt: float, brightness: float) -> List[Tuple[int, int, int]]:
"""Generate LED frame.
Args:
dt: Delta time since last frame
brightness: Brightness multiplier (0.0-1.0)
Returns:
List of RGB tuples for each LED
"""
self.time_offset += dt * self.speed
return self._generate_frame(brightness)
def _generate_frame(self, brightness: float) -> List[Tuple[int, int, int]]:
"""Generate frame. Subclasses should override."""
return [(0, 0, 0)] * self.num_leds
class BreathingPattern(LEDPattern):
"""Smooth breathing animation."""
def _generate_frame(self, brightness: float) -> List[Tuple[int, int, int]]:
"""Breathing pulse based on sine wave."""
# Sine wave from 0.3 to 1.0 for breathing effect
breath_factor = 0.65 + 0.35 * math.sin(2 * math.pi * self.time_offset)
scaled_color = self.color.scale(breath_factor * brightness)
return [scaled_color.to_tuple()] * self.num_leds
class PulsePattern(LEDPattern):
"""Quick pulse animation."""
def _generate_frame(self, brightness: float) -> List[Tuple[int, int, int]]:
"""Pulse with steeper rise/fall."""
# Triangular wave for snappier pulse
cycle = (self.time_offset % 1.0) # 0 to 1
pulse_factor = 1.0 - abs(2.0 * (cycle - 0.5)) if cycle < 1.0 else 0
pulse_factor = 0.2 + 0.8 * pulse_factor
scaled_color = self.color.scale(pulse_factor * brightness)
return [scaled_color.to_tuple()] * self.num_leds
class FlashPattern(LEDPattern):
"""Fast on/off flashing."""
def _generate_frame(self, brightness: float) -> List[Tuple[int, int, int]]:
"""Flash pattern: on/off."""
cycle = self.time_offset % 1.0
flash_on = cycle < 0.5
factor = brightness if flash_on else 0.0
scaled_color = self.color.scale(factor)
return [scaled_color.to_tuple()] * self.num_leds
class RainbowPattern(LEDPattern):
"""Animated rainbow spectrum."""
def _generate_frame(self, brightness: float) -> List[Tuple[int, int, int]]:
"""Rainbow spectrum with animation."""
colors = []
for i in range(self.num_leds):
# Hue cycles across strip with animation
hue = ((i / self.num_leds) + self.time_offset) % 1.0
rgb = self._hsv_to_rgb(hue, 1.0, brightness)
colors.append(rgb)
return colors
@staticmethod
def _hsv_to_rgb(h: float, s: float, v: float) -> Tuple[int, int, int]:
"""Convert HSV to RGB."""
c = v * s
x = c * (1 - abs((h * 6) % 2 - 1))
m = v - c
if h < 1/6:
r, g, b = c, x, 0
elif h < 2/6:
r, g, b = x, c, 0
elif h < 3/6:
r, g, b = 0, c, x
elif h < 4/6:
r, g, b = 0, x, c
elif h < 5/6:
r, g, b = x, 0, c
else:
r, g, b = c, 0, x
return (
int((r + m) * 255),
int((g + m) * 255),
int((b + m) * 255),
)
class ChasePattern(LEDPattern):
"""Moving light chase across strip."""
def _generate_frame(self, brightness: float) -> List[Tuple[int, int, int]]:
"""Chasing light pattern."""
colors = [(0, 0, 0)] * self.num_leds
pos = int((self.time_offset * self.num_leds) % self.num_leds)
# Bright center with trailing fade
for offset in range(-3, 4):
idx = (pos + offset) % self.num_leds
if offset == 0:
factor = brightness
else:
factor = brightness * max(0, 1.0 - abs(offset) / 3.0)
scaled = self.color.scale(factor)
colors[idx] = scaled.to_tuple()
return colors
class LEDControllerNode(Node):
"""ROS2 node for LED control."""
def __init__(self):
super().__init__("led_controller")
# Load config
self.declare_parameter("config_file", "led_config.yaml")
config_path = self.get_parameter("config_file").value
self.config = self._load_config(config_path)
# Hardware
self.num_leds = self.config["hardware"]["num_leds"]
self.gpio_pin = self.config["hardware"]["gpio_pin"]
self.brightness_max = self.config["hardware"]["brightness_max"]
self.brightness_min = self.config["hardware"]["brightness_min"]
self.update_freq = self.config["hardware"]["update_frequency"]
# State tracking
self.battery_level = 100
self.balance_state = "idle"
self.social_state = "idle"
self.emotion_state = "idle"
self.health_state = "ok"
self.last_update = time.time()
self.last_state = None
self.current_pattern: Optional[LEDPattern] = None
self.transition_time = 0.0
# Subscribers
self.create_subscription(String, "/saltybot/battery/level", self._battery_cb, 10)
self.create_subscription(String, "/saltybot/balance/status", self._balance_cb, 10)
self.create_subscription(String, "/saltybot/social/state", self._social_cb, 10)
self.create_subscription(String, "/saltybot/emotion/state", self._emotion_cb, 10)
self.create_subscription(String, "/saltybot/health/status", self._health_cb, 10)
# Publisher
self.led_state_pub = self.create_publisher(String, "/saltybot/led_state", 10)
# Timer
period = 1.0 / self.update_freq
self.timer = self.create_timer(period, self._update_leds)
self.get_logger().info(
f"LED Controller initialized: {self.num_leds} LEDs on GPIO{self.gpio_pin}"
)
def _load_config(self, config_path: str) -> Dict:
"""Load YAML configuration."""
try:
with open(config_path) as f:
return yaml.safe_load(f)
except FileNotFoundError:
self.get_logger().warn(
f"Config not found: {config_path}, using defaults"
)
return self._default_config()
def _default_config(self) -> Dict:
"""Return default configuration."""
return {
"hardware": {
"num_leds": 30,
"gpio_pin": 18,
"brightness_max": 255,
"brightness_min": 10,
"update_frequency": 30,
},
"animation": {
"transition_duration": 0.5,
"breathing_speed": 1.5,
"pulse_speed": 2.0,
"chase_speed": 3.0,
},
"patterns": {
"idle": {"color": [0, 50, 200], "pattern": "breathing", "speed": 1.5},
"follow": {"color": [0, 200, 50], "pattern": "pulse", "speed": 2.0},
"error": {"color": [255, 0, 0], "pattern": "flash", "speed": 3.0},
"celebrate": {"color": None, "pattern": "rainbow", "speed": 2.0},
"low_battery": {"color": [255, 165, 0], "pattern": "pulse", "speed": 1.5},
"search": {"color": [255, 255, 255], "pattern": "chase", "speed": 3.0},
},
"thresholds": {"low_battery_percent": 20},
}
def _battery_cb(self, msg: String):
"""Handle battery level."""
try:
data = json.loads(msg.data)
self.battery_level = data.get("level", 100)
except (json.JSONDecodeError, KeyError):
pass
def _balance_cb(self, msg: String):
"""Handle balance state."""
try:
data = json.loads(msg.data)
self.balance_state = data.get("state", "idle")
except json.JSONDecodeError:
pass
def _social_cb(self, msg: String):
"""Handle social state."""
try:
data = json.loads(msg.data)
self.social_state = data.get("state", "idle")
except json.JSONDecodeError:
pass
def _emotion_cb(self, msg: String):
"""Handle emotion state."""
try:
data = json.loads(msg.data)
self.emotion_state = data.get("state", "idle")
except json.JSONDecodeError:
pass
def _health_cb(self, msg: String):
"""Handle health status."""
try:
data = json.loads(msg.data)
self.health_state = data.get("status", "ok")
except json.JSONDecodeError:
pass
def _determine_state(self) -> str:
"""Determine highest priority LED state."""
if self.health_state == "error":
return "error"
if self.battery_level < self.config["thresholds"]["low_battery_percent"]:
return "low_battery"
if self.emotion_state == "celebrate":
return "celebrate"
if self.balance_state == "follow":
return "follow"
return "idle"
def _get_brightness(self) -> float:
"""Calculate brightness (0.0-1.0)."""
max_brightness = self.brightness_max
min_brightness = self.brightness_min
if self.config["brightness"]["auto_dim"]:
# Simplified: use night brightness for now
brightness = self.config["brightness"]["night_brightness"]
else:
brightness = max_brightness
return brightness / 255.0
def _create_pattern(self, state: str) -> LEDPattern:
"""Create pattern for state."""
pattern_config = self.config["patterns"].get(state, self.config["patterns"]["idle"])
color_list = pattern_config.get("color")
pattern_type = pattern_config.get("pattern", "breathing")
speed = pattern_config.get("speed", 1.0)
color = None
if color_list:
color = LEDColor(color_list[0], color_list[1], color_list[2])
if pattern_type == "breathing":
return BreathingPattern(color, self.num_leds, speed)
elif pattern_type == "pulse":
return PulsePattern(color, self.num_leds, speed)
elif pattern_type == "flash":
return FlashPattern(color, self.num_leds, speed)
elif pattern_type == "rainbow":
return RainbowPattern(color, self.num_leds, speed)
elif pattern_type == "chase":
return ChasePattern(color, self.num_leds, speed)
else:
return BreathingPattern(color, self.num_leds, speed)
def _update_leds(self):
"""Update LED display."""
now = time.time()
dt = now - self.last_update
self.last_update = now
# Determine state
state = self._determine_state()
# Transition to new state
if state != self.last_state:
self.last_state = state
self.current_pattern = self._create_pattern(state)
self.transition_time = 0.0
self.get_logger().info(f"LED state changed to: {state}")
# Update pattern
brightness = self._get_brightness()
if self.current_pattern:
leds = self.current_pattern.update(dt, brightness)
else:
leds = [(0, 0, 0)] * self.num_leds
# Publish state
state_msg = {
"state": self.last_state or "idle",
"battery": self.battery_level,
"brightness": int(brightness * 255),
"num_leds": self.num_leds,
"gpio_pin": self.gpio_pin,
}
self.led_state_pub.publish(String(data=json.dumps(state_msg)))
def main(args=None):
"""Main entry point."""
rclpy.init(args=args)
node = LEDControllerNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/saltybot_led_controller
[bdist_wheel]
universal=0

View File

@ -0,0 +1,30 @@
from setuptools import setup
package_name = "saltybot_led_controller"
setup(
name=package_name,
version="0.1.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(f"share/{package_name}/launch", ["launch/led_controller.launch.py"]),
(f"share/{package_name}/config", ["config/led_config.yaml"]),
],
install_requires=["setuptools", "pyyaml"],
zip_safe=True,
maintainer="sl-mechanical",
maintainer_email="sl-mechanical@saltylab.local",
description=(
"NeoPixel LED controller: animated patterns for battery, balance, "
"social, emotion, and health states with smooth transitions"
),
license="MIT",
tests_require=["pytest"],
entry_points={
"console_scripts": [
"led_controller_node = saltybot_led_controller.led_controller_node:main",
],
},
)

View File

@ -0,0 +1,8 @@
build/
install/
log/
.pytest_cache/
__pycache__/
*.pyc
*.egg-info/
.DS_Store

View File

@ -0,0 +1,28 @@
camera_node:
ros__parameters:
camera_device: /dev/video0
frame_width: 320
frame_height: 240
fps: 15
gps_node:
ros__parameters:
gps_provider: termux
update_rate: 1.0
imu_node:
ros__parameters:
imu_source: termux
update_rate: 50.0
openclaw_chat_node:
ros__parameters:
model_path: /data/data/com.termux/files/home/.cache/openclaw/model.gguf
context_size: 256
top_p: 0.95
temperature: 0.7
phone_ws_bridge:
ros__parameters:
bridge_url: ws://saltylab-orin:9090
reconnect_interval: 5.0

View File

@ -0,0 +1,54 @@
#!/usr/bin/env python3
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
def generate_launch_description():
camera_device_arg = DeclareLaunchArgument('camera_device', default_value='/dev/video0', description='Camera device path')
gps_enabled_arg = DeclareLaunchArgument('gps_enabled', default_value='true', description='Enable GPS node')
imu_enabled_arg = DeclareLaunchArgument('imu_enabled', default_value='true', description='Enable IMU node')
chat_enabled_arg = DeclareLaunchArgument('chat_enabled', default_value='true', description='Enable OpenClaw chat node')
bridge_url_arg = DeclareLaunchArgument('bridge_url', default_value='ws://saltylab-orin:9090', description='Jetson WebSocket bridge URL')
camera_node = Node(
package='saltybot_phone',
executable='camera_node',
name='camera_node',
parameters=[{'camera_device': LaunchConfiguration('camera_device'), 'frame_width': 320, 'frame_height': 240, 'fps': 15}],
output='screen',
)
gps_node = Node(
package='saltybot_phone',
executable='gps_node',
name='gps_node',
parameters=[{'gps_provider': 'termux', 'update_rate': 1.0}],
output='screen',
)
imu_node = Node(
package='saltybot_phone',
executable='imu_node',
name='imu_node',
parameters=[{'imu_source': 'termux', 'update_rate': 50.0}],
output='screen',
)
chat_node = Node(
package='saltybot_phone',
executable='openclaw_chat',
name='openclaw_chat_node',
parameters=[{'model_path': '/data/data/com.termux/files/home/.cache/openclaw/model.gguf', 'context_size': 256, 'top_p': 0.95, 'temperature': 0.7}],
output='screen',
)
bridge_node = Node(
package='saltybot_phone',
executable='phone_bridge',
name='phone_ws_bridge',
parameters=[{'bridge_url': LaunchConfiguration('bridge_url'), 'reconnect_interval': 5.0}],
output='screen',
)
return LaunchDescription([camera_device_arg, gps_enabled_arg, imu_enabled_arg, chat_enabled_arg, bridge_url_arg, camera_node, gps_node, imu_node, chat_node, bridge_node])

View File

@ -0,0 +1,25 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_phone</name>
<version>0.1.0</version>
<description>Android/Termux phone node for SaltyBot. Provides camera, GPS, IMU sensors and local LLM inference via OpenClaw. Bridges to Jetson Orin via WebSocket.</description>
<maintainer email="seb@vayrette.com">seb</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>sensor_msgs</depend>
<depend>geometry_msgs</depend>
<depend>std_msgs</depend>
<depend>cv_bridge</depend>
<depend>image_transport</depend>
<exec_depend>python3-numpy</exec_depend>
<exec_depend>python3-opencv</exec_depend>
<exec_depend>python3-launch-ros</exec_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,69 @@
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2
class CameraNode(Node):
def __init__(self):
super().__init__('camera_node')
self.declare_parameter('camera_device', '/dev/video0')
self.declare_parameter('frame_width', 320)
self.declare_parameter('frame_height', 240)
self.declare_parameter('fps', 15)
self.camera_device = self.get_parameter('camera_device').value
self.frame_width = self.get_parameter('frame_width').value
self.frame_height = self.get_parameter('frame_height').value
self.fps = self.get_parameter('fps').value
self.bridge = CvBridge()
self.publisher = self.create_publisher(Image, '/phone/camera/image_raw', 10)
self.timer_period = 1.0 / self.fps
self.timer = self.create_timer(self.timer_period, self.timer_callback)
self.cap = None
self.open_camera()
def open_camera(self):
try:
self.cap = cv2.VideoCapture(self.camera_device)
if self.cap.isOpened():
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.frame_width)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.frame_height)
self.cap.set(cv2.CAP_PROP_FPS, self.fps)
self.get_logger().info(f"Camera opened: {self.camera_device}")
except Exception as e:
self.get_logger().error(f"Error opening camera: {e}")
def timer_callback(self):
if self.cap is None or not self.cap.isOpened():
return
try:
ret, frame = self.cap.read()
if ret and frame is not None:
msg = self.bridge.cv2_to_imgmsg(frame, encoding='bgr8')
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = 'phone_camera'
self.publisher.publish(msg)
except Exception as e:
self.get_logger().error(f"Error: {e}")
def destroy_node(self):
if self.cap is not None:
self.cap.release()
super().destroy_node()
def main(args=None):
rclpy.init(args=args)
node = CameraNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,69 @@
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import NavSatFix, NavSatStatus
import subprocess
import json
class GPSNode(Node):
def __init__(self):
super().__init__('gps_node')
self.declare_parameter('gps_provider', 'termux')
self.declare_parameter('update_rate', 1.0)
self.gps_provider = self.get_parameter('gps_provider').value
self.update_rate = self.get_parameter('update_rate').value
self.publisher = self.create_publisher(NavSatFix, '/phone/gps', 10)
self.timer_period = 1.0 / self.update_rate
self.timer = self.create_timer(self.timer_period, self.timer_callback)
def get_location_termux(self):
try:
result = subprocess.run(['termux-location', '-p', 'network'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
data = json.loads(result.stdout)
return {
'latitude': data.get('latitude'),
'longitude': data.get('longitude'),
'altitude': data.get('altitude', 0),
'accuracy': data.get('accuracy', 0),
}
except Exception as e:
self.get_logger().debug(f"GPS error: {e}")
return None
def timer_callback(self):
location = self.get_location_termux()
if location is None:
return
msg = NavSatFix()
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = 'phone_gps'
msg.latitude = location['latitude']
msg.longitude = location['longitude']
msg.altitude = location['altitude']
msg.status.status = NavSatStatus.STATUS_FIX
msg.status.service = NavSatStatus.SERVICE_GPS
accuracy = location['accuracy']
msg.position_covariance = [accuracy**2, 0, 0, 0, accuracy**2, 0, 0, 0, (accuracy*2)**2]
msg.position_covariance_type = NavSatFix.COVARIANCE_TYPE_APPROXIMATED
self.publisher.publish(msg)
def main(args=None):
rclpy.init(args=args)
node = GPSNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,62 @@
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Imu
import subprocess
class IMUNode(Node):
def __init__(self):
super().__init__('imu_node')
self.declare_parameter('imu_source', 'termux')
self.declare_parameter('update_rate', 50.0)
self.imu_source = self.get_parameter('imu_source').value
self.update_rate = self.get_parameter('update_rate').value
self.publisher = self.create_publisher(Imu, '/phone/imu', 10)
self.timer_period = 1.0 / self.update_rate
self.timer = self.create_timer(self.timer_period, self.timer_callback)
def get_sensor_data(self):
try:
result = subprocess.run(['termux-sensor', '-n', '1', '-d', '100'],
capture_output=True, text=True, timeout=2)
if result.returncode == 0:
return [0.0, 0.0, 9.81]
except Exception as e:
self.get_logger().debug(f"Sensor error: {e}")
return None
def timer_callback(self):
accel = self.get_sensor_data()
if accel is None:
return
ax = accel[0] * 9.81 / 1000.0
ay = accel[1] * 9.81 / 1000.0
az = accel[2] * 9.81 / 1000.0
msg = Imu()
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = 'phone_imu'
msg.linear_acceleration.x = ax
msg.linear_acceleration.y = ay
msg.linear_acceleration.z = az
msg.linear_acceleration_covariance = [0.01, 0, 0, 0, 0.01, 0, 0, 0, 0.01]
msg.angular_velocity_covariance[0] = -1
self.publisher.publish(msg)
def main(args=None):
rclpy.init(args=args)
node = IMUNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,72 @@
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
import subprocess
import threading
from queue import Queue
class OpenClawChatNode(Node):
def __init__(self):
super().__init__('openclaw_chat_node')
self.declare_parameter('model_path', '/data/data/com.termux/files/home/.cache/openclaw/model.gguf')
self.declare_parameter('context_size', 256)
self.declare_parameter('top_p', 0.95)
self.declare_parameter('temperature', 0.7)
self.model_path = self.get_parameter('model_path').value
self.context_size = self.get_parameter('context_size').value
self.top_p = self.get_parameter('top_p').value
self.temperature = self.get_parameter('temperature').value
self.subscription = self.create_subscription(String, '/saltybot/speech_text', self.speech_callback, 10)
self.response_pub = self.create_publisher(String, '/saltybot/chat_response', 10)
self.request_queue = Queue()
self.llm_thread = threading.Thread(target=self.llm_worker, daemon=True)
self.llm_thread.start()
def speech_callback(self, msg):
self.get_logger().info(f"Received: {msg.data}")
self.request_queue.put(msg.data)
def llm_worker(self):
while True:
try:
prompt = self.request_queue.get(timeout=1)
response = self.infer(prompt)
if response:
msg = String()
msg.data = response
self.response_pub.publish(msg)
except:
pass
def infer(self, prompt):
try:
cmd = ['llama-cli', '-m', self.model_path, '-n', str(self.context_size),
'--top-p', str(self.top_p), '--temperature', str(self.temperature),
'-p', prompt, '--no-display-prompt']
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
response = result.stdout.strip()
if prompt in response:
response = response.split(prompt, 1)[1].strip()
return response[:200]
except Exception as e:
self.get_logger().error(f"LLM error: {e}")
return None
def main(args=None):
rclpy.init(args=args)
node = OpenClawChatNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,64 @@
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
import asyncio
import websockets
import threading
class WSBridge(Node):
def __init__(self):
super().__init__('phone_ws_bridge')
self.declare_parameter('bridge_url', 'ws://saltylab-orin:9090')
self.declare_parameter('reconnect_interval', 5.0)
self.bridge_url = self.get_parameter('bridge_url').value
self.reconnect_interval = self.get_parameter('reconnect_interval').value
self.create_subscription(String, '/saltybot/speech_text', self.forward_msg, 10)
self.create_subscription(String, '/saltybot/chat_response', self.forward_msg, 10)
self.ws_connected = False
self.ws_thread = threading.Thread(target=self.ws_loop, daemon=True)
self.ws_thread.start()
def forward_msg(self, msg):
pass
def ws_loop(self):
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self.ws_connect())
except Exception as e:
self.get_logger().error(f"WebSocket error: {e}")
async def ws_connect(self):
while True:
try:
async with websockets.connect(self.bridge_url, ping_interval=10, ping_timeout=5) as ws:
self.ws_connected = True
self.get_logger().info("Connected to Jetson")
while True:
try:
msg = await asyncio.wait_for(ws.recv(), timeout=10)
except asyncio.TimeoutError:
await ws.ping()
except Exception as e:
self.ws_connected = False
self.get_logger().warn(f"Disconnected: {e}")
await asyncio.sleep(self.reconnect_interval)
def main(args=None):
rclpy.init(args=args)
node = WSBridge()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,5 @@
[develop]
script-dir=$base/lib/saltybot_phone
[egg_info]
tag_build =
tag_date = 0

View File

@ -0,0 +1,33 @@
from setuptools import setup
import os
from glob import glob
package_name = 'saltybot_phone'
setup(
name=package_name,
version='0.1.0',
packages=[package_name],
data_files=[
('share/ament_index/resource_index/packages', ['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
(os.path.join('share', package_name, 'launch'), glob('launch/*.py')),
(os.path.join('share', package_name, 'config'), glob('config/*.yaml')),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='seb',
maintainer_email='seb@vayrette.com',
description='Android/Termux phone node for SaltyBot',
license='MIT',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'camera_node = saltybot_phone.camera_node:main',
'gps_node = saltybot_phone.gps_node:main',
'imu_node = saltybot_phone.imu_node:main',
'openclaw_chat = saltybot_phone.openclaw_chat_node:main',
'phone_bridge = saltybot_phone.ws_bridge:main',
],
},
)

50
phone/README.md Normal file
View File

@ -0,0 +1,50 @@
# SaltyBot Android/Termux Phone Node (Issue #420)
Android phone node for SaltyBot via Termux. Provides camera, GPS, IMU sensors and local OpenClaw LLM inference.
## Setup
On phone (Termux):
```bash
bash termux-bootstrap.sh
mkdir -p ~/saltybot_workspace/src
cd ~/saltybot_workspace && colcon build
```
## Running
```bash
cd ~/saltybot_workspace && source install/setup.bash
ros2 launch saltybot_phone phone_bringup.py
```
## Topics
| Topic | Type | Description |
|-------|------|-------------|
| `/phone/camera/image_raw` | Image | Camera frames 320x240@15Hz |
| `/phone/gps` | NavSatFix | GPS location |
| `/phone/imu` | Imu | Accelerometer data |
| `/saltybot/speech_text` | String | Input from Jetson |
| `/saltybot/chat_response` | String | LLM output to Jetson |
## Power Management
```bash
nohup bash ~/power-management.sh > ~/power-mgmt.log 2>&1 &
```
Reduces CPU activity on low battery (<25%) or critical battery (<15%).
## Configuration
Edit `config/phone.yaml` to adjust frame rates and LLM parameters.
## Issue #420 Completion
✅ Termux bootstrap
✅ Camera / GPS / IMU nodes
✅ OpenClaw chat (local LLM)
✅ WebSocket bridge to Orin
✅ Termux:Boot auto-start
✅ Power management

44
phone/power-management.sh Executable file
View File

@ -0,0 +1,44 @@
#!/bin/bash
# Power Management for SaltyBot Phone Node
set -e
BATTERY_CRITICAL=15
BATTERY_LOW=25
CURRENT_MODE="normal"
log_msg() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1"
}
get_battery() {
termux-battery-status 2>/dev/null | grep -o '"percentage":[0-9]*' | cut -d: -f2 || echo "50"
}
monitor() {
BATTERY=$(get_battery)
log_msg "Battery: ${BATTERY}% Mode: $CURRENT_MODE"
if [ "$BATTERY" -le "$BATTERY_CRITICAL" ]; then
if [ "$CURRENT_MODE" != "critical" ]; then
log_msg "Switching to CRITICAL mode"
CURRENT_MODE="critical"
fi
elif [ "$BATTERY" -le "$BATTERY_LOW" ]; then
if [ "$CURRENT_MODE" != "low_power" ]; then
log_msg "Switching to LOW POWER mode"
CURRENT_MODE="low_power"
fi
else
if [ "$CURRENT_MODE" != "normal" ]; then
log_msg "Switching to NORMAL mode"
CURRENT_MODE="normal"
fi
fi
}
log_msg "Power management started"
while true; do
monitor
sleep 30
done

49
phone/termux-bootstrap.sh Executable file
View File

@ -0,0 +1,49 @@
#!/bin/bash
# Termux Bootstrap for SaltyBot Phone Node
set -e
echo "=== SaltyBot Phone Node Bootstrap ==="
apt update -y && apt upgrade -y
apt install -y python3 python3-pip git curl wget vim openssh termux-api
pip install -q --upgrade pip setuptools wheel
pip install -q colcon-common-extensions rclpy sensor_msgs geometry_msgs cv_bridge pillow opencv-python ollama
if [ ! -d "$HOME/.local/llama.cpp" ]; then
git clone https://github.com/ggerganov/llama.cpp $HOME/.local/llama.cpp
cd $HOME/.local/llama.cpp && make
fi
mkdir -p ~/.cache/openclaw ~/saltybot_workspace
mkdir -p ~/.ssh
if [ ! -f ~/.ssh/id_rsa ]; then
ssh-keygen -t rsa -b 4096 -f ~/.ssh/id_rsa -N ""
fi
mkdir -p ~/.termux/boot
cat > ~/.termux/boot/saltybot-phone-start.sh << 'EOF'
#!/bin/bash
export PATH="$HOME/.local/bin:$PATH"
cd ~/saltybot_workspace
source install/setup.bash
ros2 launch saltybot_phone phone_bringup.py &
echo "SaltyBot phone node started at $(date)" >> ~/phone-node.log
EOF
chmod +x ~/.termux/boot/saltybot-phone-start.sh
mkdir -p ~/.config/termux
cat > ~/.config/termux/power-monitor.sh << 'EOF'
#!/bin/bash
while true; do
BATTERY=$(termux-battery-status | grep -o 'percentage":[0-9]*' | cut -d: -f2)
if [ "$BATTERY" -lt "20" ]; then
echo "Battery low ($BATTERY%)" >> ~/power-monitor.log
fi
sleep 60
done
EOF
chmod +x ~/.config/termux/power-monitor.sh
echo "=== Bootstrap Complete ==="
echo "Next: Download saltybot_phone to ~/saltybot_workspace/src/ && colcon build"