#!/usr/bin/env python3 """ MageDok Audio Router Routes HDMI audio from DisplayPort adapter to internal speakers via PulseAudio """ import subprocess import rclpy from rclpy.node import Node from std_msgs.msg import String class AudioRouter(Node): def __init__(self): super().__init__('audio_router') self.declare_parameter('hdmi_sink', 'alsa_output.pci-0000_00_1d.0.hdmi-stereo') self.declare_parameter('default_sink', True) self.hdmi_sink = self.get_parameter('hdmi_sink').value self.set_default = self.get_parameter('default_sink').value self.audio_status_pub = self.create_publisher(String, '/magedok/audio_status', 10) self.get_logger().info('Audio Router: Configuring HDMI audio routing...') self.setup_pulseaudio() # Check status every 5 seconds self.create_timer(5.0, self.check_audio_status) def setup_pulseaudio(self): """Configure PulseAudio to route HDMI audio""" try: # List available sinks result = subprocess.run(['pactl', 'list', 'sinks'], capture_output=True, text=True, timeout=5) sinks = self._parse_pa_sinks(result.stdout) if not sinks: self.get_logger().warn('No PulseAudio sinks detected') return self.get_logger().info(f'Available sinks: {", ".join(sinks.keys())}') # Find HDMI or use first available hdmi_sink = None for name in sinks.keys(): if 'hdmi' in name.lower() or 'HDMI' in name: hdmi_sink = name break if not hdmi_sink: hdmi_sink = list(sinks.keys())[0] # Fallback to first sink self.get_logger().warn(f'HDMI sink not found, using: {hdmi_sink}') else: self.get_logger().info(f'✓ HDMI sink identified: {hdmi_sink}') # Set as default if requested if self.set_default: subprocess.run(['pactl', 'set-default-sink', hdmi_sink], timeout=5) self.get_logger().info(f'✓ Audio routed to: {hdmi_sink}') except Exception as e: self.get_logger().error(f'PulseAudio setup failed: {e}') def _parse_pa_sinks(self, pactl_output): """Parse 'pactl list sinks' output""" sinks = {} current_sink = None for line in pactl_output.split('\n'): if line.startswith('Sink #'): current_sink = line.split('#')[1].strip() elif '\tName: ' in line and current_sink: name = line.split('Name: ')[1].strip() sinks[name] = current_sink return sinks def check_audio_status(self): """Verify audio is properly routed""" try: result = subprocess.run(['pactl', 'get-default-sink'], capture_output=True, text=True, timeout=5) status = String() status.data = result.stdout.strip() self.audio_status_pub.publish(status) self.get_logger().debug(f'Current audio sink: {status.data}') except Exception as e: self.get_logger().warn(f'Audio status check failed: {e}') def main(args=None): rclpy.init(args=args) router = AudioRouter() rclpy.spin(router) rclpy.shutdown() if __name__ == '__main__': main()