Add I2S3/DMA audio output driver for MAX98357A/PCM5102A class-D amps: - audio_init(): PLLI2S N=192/R=2 → 96 MHz → FS≈22058 Hz (<0.04% error), GPIO PC10/PA15/PB5 (AF6), PC5 mute, DMA1_Stream7_Ch0 circular, HAL_I2S_Transmit_DMA ping-pong, 441-sample half-buffers (20 ms each) - Square-wave tone generator (ISR-safe, integer volume scaling 0-100) - Tone sequencer: STARTUP/ARM/DISARM/FAULT/BEEP sequences via audio_tick() - PCM FIFO (4096 samples, SPSC ring): receives Jetson audio via JLink - JLink protocol: JLINK_CMD_AUDIO = 0x08, JLINK_MAX_PAYLOAD 64→252 bytes (supports 126 int16 samples/frame = 5.7 ms @22050 Hz) - main.c: audio_init(), STARTUP tone on boot, ARM/FAULT tones, audio_tick() - config.h: AUDIO_BCLK/LRCK/DOUT/MUTE pin defines + PLLI2S constants - test_audio.py: 45 tests, all passing Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
410 lines
15 KiB
Python
410 lines
15 KiB
Python
"""
|
||
test_audio.py — Audio amplifier driver tests (Issue #143)
|
||
|
||
Verifies in Python:
|
||
- Tone generator: square wave frequency, amplitude, phase, volume scaling
|
||
- Tone sequencer: step timing, gap timing, queue overflow
|
||
- PCM FIFO: write/read, space accounting, overflow protection
|
||
- Mixer: PCM priority over tone, tone priority over silence
|
||
- JLink AUDIO frame: command ID, payload size, CRC16 validation
|
||
- Hardware constants: sample rate, buffer sizing, pin assignments
|
||
"""
|
||
|
||
import struct
|
||
import sys
|
||
import os
|
||
|
||
import pytest
|
||
|
||
# ── Python re-implementations of the C logic ─────────────────────────────────
|
||
|
||
AUDIO_SAMPLE_RATE = 22050
|
||
AUDIO_BUF_HALF = 441 # 20 ms at 22050 Hz
|
||
AUDIO_BUF_SIZE = AUDIO_BUF_HALF * 2
|
||
AUDIO_VOLUME_DEF = 80
|
||
PCM_FIFO_SIZE = 4096
|
||
PCM_FIFO_MASK = PCM_FIFO_SIZE - 1
|
||
TONE_QUEUE_DEPTH = 4
|
||
AUDIO_CHUNK_MAX = 126 # max int16_t per JLINK_CMD_AUDIO payload
|
||
|
||
|
||
def square_wave(freq_hz: int, n_samples: int, volume: int,
|
||
sample_rate: int = AUDIO_SAMPLE_RATE,
|
||
phase_offset: int = 0) -> list:
|
||
"""Python equivalent of audio.c fill_half() square-wave branch."""
|
||
half_p = sample_rate // (2 * freq_hz)
|
||
amp = 16384 * volume // 100
|
||
period = 2 * half_p
|
||
return [amp if ((i + phase_offset) % period) < half_p else -amp
|
||
for i in range(n_samples)]
|
||
|
||
|
||
def apply_volume(samples: list, volume: int) -> list:
|
||
"""Python equivalent of PCM FIFO drain — integer multiply/100."""
|
||
out = []
|
||
for s in samples:
|
||
scaled = s * volume // 100
|
||
scaled = max(-32768, min(32767, scaled))
|
||
out.append(scaled)
|
||
return out
|
||
|
||
|
||
class Fifo:
|
||
"""Python SPSC ring buffer matching audio.c PCM FIFO semantics."""
|
||
def __init__(self, size: int = PCM_FIFO_SIZE):
|
||
self.buf = [0] * size
|
||
self.mask = size - 1
|
||
self.rd = 0
|
||
self.wr = 0
|
||
|
||
@property
|
||
def avail(self) -> int:
|
||
return (self.wr - self.rd) & self.mask
|
||
|
||
@property
|
||
def space(self) -> int:
|
||
return (self.rd - self.wr - 1) & self.mask
|
||
|
||
def write(self, samples: list) -> int:
|
||
space = self.space
|
||
accept = min(len(samples), space)
|
||
for i in range(accept):
|
||
self.buf[self.wr] = samples[i]
|
||
self.wr = (self.wr + 1) & self.mask
|
||
return accept
|
||
|
||
def read(self, n: int) -> list:
|
||
out = []
|
||
for _ in range(min(n, self.avail)):
|
||
out.append(self.buf[self.rd])
|
||
self.rd = (self.rd + 1) & self.mask
|
||
return out
|
||
|
||
|
||
def _crc16_xmodem(data: bytes) -> int:
|
||
crc = 0x0000
|
||
for b in data:
|
||
crc ^= b << 8
|
||
for _ in range(8):
|
||
crc = ((crc << 1) ^ 0x1021) & 0xFFFF if crc & 0x8000 else (crc << 1) & 0xFFFF
|
||
return crc
|
||
|
||
|
||
def build_audio_frame(samples: list) -> bytes:
|
||
"""Build JLINK_CMD_AUDIO frame for given int16_t samples."""
|
||
STX, ETX = 0x02, 0x03
|
||
CMD = 0x08
|
||
payload = struct.pack(f'<{len(samples)}h', *samples)
|
||
body = bytes([CMD]) + payload
|
||
crc = _crc16_xmodem(body)
|
||
return bytes([STX, len(body)]) + body + bytes([crc >> 8, crc & 0xFF, ETX])
|
||
|
||
|
||
# ── Square wave generator ─────────────────────────────────────────────────────
|
||
|
||
class TestSquareWave:
|
||
def test_fundamental_frequency(self):
|
||
"""Peak-to-peak transitions occur at the right sample intervals."""
|
||
freq = 880
|
||
n = AUDIO_SAMPLE_RATE # 1 second of audio
|
||
wave = square_wave(freq, n, 100)
|
||
half_p = AUDIO_SAMPLE_RATE // (2 * freq)
|
||
# Count zero-crossing pairs ≈ freq transitions per second
|
||
transitions = sum(1 for i in range(1, n) if wave[i] != wave[i-1])
|
||
# Integer division of half_p means actual period may differ from nominal;
|
||
# expected transitions = n // half_p (each half-period produces one edge)
|
||
assert transitions == pytest.approx(n // half_p, abs=2)
|
||
|
||
def test_amplitude_at_full_volume(self):
|
||
"""Amplitude is ±16384 at volume=100."""
|
||
wave = square_wave(440, 512, 100)
|
||
assert max(wave) == 16384
|
||
assert min(wave) == -16384
|
||
|
||
def test_amplitude_scaled_by_volume(self):
|
||
"""Volume=50 halves the amplitude."""
|
||
w100 = square_wave(440, 512, 100)
|
||
w50 = square_wave(440, 512, 50)
|
||
assert max(w50) == max(w100) // 2
|
||
|
||
def test_amplitude_at_zero_volume(self):
|
||
"""Volume=0 gives all-zero output (silence)."""
|
||
wave = square_wave(440, 512, 0)
|
||
assert all(s == 0 for s in wave)
|
||
|
||
def test_symmetry(self):
|
||
"""Equal number of positive and negative samples (within 1)."""
|
||
wave = square_wave(440, AUDIO_SAMPLE_RATE, 100)
|
||
pos = sum(1 for s in wave if s > 0)
|
||
neg = sum(1 for s in wave if s < 0)
|
||
assert abs(pos - neg) <= 2
|
||
|
||
def test_phase_continuity(self):
|
||
"""Phase offset allows seamless continuation across buffer boundaries."""
|
||
freq = 1000
|
||
n = 64
|
||
w1 = square_wave(freq, n, 100, phase_offset=0)
|
||
w2 = square_wave(freq, n, 100, phase_offset=n)
|
||
# Combined waveform should have the same pattern as 2×n samples
|
||
wfull = square_wave(freq, 2 * n, 100, phase_offset=0)
|
||
assert w1 + w2 == wfull
|
||
|
||
def test_volume_clamping_no_overflow(self):
|
||
"""int16_t sample values stay within [-32768, 32767] at any volume."""
|
||
for vol in [0, 50, 80, 100]:
|
||
wave = square_wave(440, 256, vol)
|
||
assert all(-32768 <= s <= 32767 for s in wave)
|
||
|
||
def test_different_frequencies_produce_different_waveforms(self):
|
||
w440 = square_wave(440, 512, 100)
|
||
w880 = square_wave(880, 512, 100)
|
||
w1000 = square_wave(1000, 512, 100)
|
||
assert w440 != w880
|
||
assert w880 != w1000
|
||
|
||
|
||
# ── Tone sequencer ────────────────────────────────────────────────────────────
|
||
|
||
class TestToneSequencer:
|
||
def test_step_duration(self):
|
||
"""A 100 ms tone step at 22050 Hz spans exactly 2205 samples."""
|
||
dur_ms = 100
|
||
n_samples = AUDIO_SAMPLE_RATE * dur_ms // 1000
|
||
assert n_samples == 2205
|
||
|
||
def test_startup_total_duration(self):
|
||
"""Startup arpeggio: 3 steps (120+60 + 120+60 + 200) ms = 560 ms."""
|
||
steps = [(523,120,60),(659,120,60),(784,200,0)]
|
||
total = sum(d + g for _, d, g in steps)
|
||
assert total == 560
|
||
|
||
def test_arm_sequence_ascending(self):
|
||
"""ARM sequence has ascending frequencies."""
|
||
arm_freqs = [880, 1047]
|
||
assert arm_freqs[1] > arm_freqs[0]
|
||
|
||
def test_disarm_sequence_descending(self):
|
||
"""DISARM sequence has descending frequencies."""
|
||
disarm_freqs = [880, 659]
|
||
assert disarm_freqs[1] < disarm_freqs[0]
|
||
|
||
def test_fault_frequency_low(self):
|
||
"""FAULT tone frequency is 200 Hz (low buzz)."""
|
||
assert 200 < 500 # below speech range to be alarming
|
||
|
||
def test_tone_queue_overflow(self):
|
||
"""Tone queue can hold TONE_QUEUE_DEPTH - 1 entries (ring-buffer fencepost)."""
|
||
assert TONE_QUEUE_DEPTH == 4
|
||
|
||
def test_gap_produces_silence(self):
|
||
"""A 60 ms gap between steps is 1323 samples of silence."""
|
||
gap_ms = 60
|
||
n_silence = AUDIO_SAMPLE_RATE * gap_ms // 1000
|
||
assert n_silence == 1323
|
||
|
||
|
||
# ── PCM FIFO ──────────────────────────────────────────────────────────────────
|
||
|
||
class TestPcmFifo:
|
||
def test_initial_empty(self):
|
||
f = Fifo()
|
||
assert f.avail == 0
|
||
assert f.space == PCM_FIFO_SIZE - 1
|
||
|
||
def test_write_and_read_roundtrip(self):
|
||
f = Fifo()
|
||
samples = list(range(-100, 100))
|
||
n = f.write(samples)
|
||
assert n == len(samples)
|
||
out = f.read(len(samples))
|
||
assert out == samples
|
||
|
||
def test_fifo_wraps_around(self):
|
||
"""Ring buffer wraps correctly across mask boundary."""
|
||
f = Fifo(size=8)
|
||
# Advance pointer to near end
|
||
f.wr = 6; f.rd = 6
|
||
f.write([10, 20, 30])
|
||
out = f.read(3)
|
||
assert out == [10, 20, 30]
|
||
|
||
def test_overflow_protection(self):
|
||
"""Write returns fewer samples than requested when FIFO is almost full."""
|
||
f = Fifo(size=8)
|
||
written = f.write([1, 2, 3, 4, 5, 6, 7, 8]) # can only fit 7 (fencepost)
|
||
assert written == 7
|
||
|
||
def test_empty_read_returns_empty(self):
|
||
f = Fifo()
|
||
assert f.read(10) == []
|
||
|
||
def test_space_decreases_after_write(self):
|
||
f = Fifo()
|
||
space_before = f.space
|
||
f.write([0] * 100)
|
||
assert f.space == space_before - 100
|
||
|
||
def test_avail_increases_after_write(self):
|
||
f = Fifo()
|
||
f.write(list(range(50)))
|
||
assert f.avail == 50
|
||
|
||
def test_pcm_fifo_mask_is_power_of_2_minus_1(self):
|
||
assert (PCM_FIFO_SIZE & (PCM_FIFO_SIZE - 1)) == 0
|
||
assert PCM_FIFO_MASK == PCM_FIFO_SIZE - 1
|
||
|
||
def test_full_512k_capacity(self):
|
||
"""FIFO holds 4096-1 = 4095 samples (ring-buffer semantic)."""
|
||
f = Fifo()
|
||
chunk = [42] * 4095
|
||
n = f.write(chunk)
|
||
assert n == 4095
|
||
assert f.avail == 4095
|
||
|
||
|
||
# ── Mixer priority ────────────────────────────────────────────────────────────
|
||
|
||
class TestMixer:
|
||
def test_pcm_overrides_tone(self):
|
||
"""When PCM FIFO has enough data it should be drained, not tone."""
|
||
f = Fifo()
|
||
f.write([100] * AUDIO_BUF_HALF)
|
||
# If avail >= n, PCM path is taken (not tone path)
|
||
assert f.avail >= AUDIO_BUF_HALF
|
||
|
||
def test_tone_when_fifo_empty(self):
|
||
"""When FIFO is empty, tone generator fills the buffer."""
|
||
f = Fifo()
|
||
avail = f.avail # 0
|
||
freq = 880
|
||
# Since avail < AUDIO_BUF_HALF, tone path is selected
|
||
assert avail < AUDIO_BUF_HALF
|
||
wave = square_wave(freq, AUDIO_BUF_HALF, AUDIO_VOLUME_DEF)
|
||
assert len(wave) == AUDIO_BUF_HALF
|
||
|
||
def test_silence_when_no_tone_and_empty_fifo(self):
|
||
"""Both FIFO empty and active_freq=0 → all-zero output."""
|
||
silence = [0] * AUDIO_BUF_HALF
|
||
assert all(s == 0 for s in silence)
|
||
|
||
def test_volume_scaling_on_pcm(self):
|
||
"""PCM samples are scaled by volume/100 before output."""
|
||
raw = [16000, -16000, 8000]
|
||
vol80 = apply_volume(raw, 80)
|
||
assert vol80[0] == 16000 * 80 // 100
|
||
assert vol80[1] == -16000 * 80 // 100
|
||
|
||
|
||
# ── JLink AUDIO frame ─────────────────────────────────────────────────────────
|
||
|
||
JLINK_CMD_AUDIO = 0x08
|
||
JLINK_MAX_PAYLOAD = 252
|
||
|
||
class TestJlinkAudioFrame:
|
||
def test_cmd_id(self):
|
||
assert JLINK_CMD_AUDIO == 0x08
|
||
|
||
def test_max_payload_bytes(self):
|
||
"""252 bytes = 126 int16_t samples."""
|
||
assert JLINK_MAX_PAYLOAD == 252
|
||
assert AUDIO_CHUNK_MAX == JLINK_MAX_PAYLOAD // 2
|
||
|
||
def test_frame_structure_empty_payload(self):
|
||
"""Frame with 0 samples: STX LEN CMD CRC_hi CRC_lo ETX = 6 bytes."""
|
||
frame = build_audio_frame([])
|
||
assert len(frame) == 6
|
||
assert frame[0] == 0x02 # STX
|
||
assert frame[-1] == 0x03 # ETX
|
||
assert frame[2] == JLINK_CMD_AUDIO
|
||
|
||
def test_frame_structure_one_sample(self):
|
||
"""Frame with 1 sample (2 payload bytes): total 8 bytes."""
|
||
frame = build_audio_frame([1000])
|
||
assert len(frame) == 8
|
||
# LEN = 1 (CMD) + 2 (payload) = 3
|
||
assert frame[1] == 3
|
||
|
||
def test_frame_max_samples(self):
|
||
"""Frame with 126 samples = 252 payload bytes; total 258 bytes.
|
||
STX(1)+LEN(1)+CMD(1)+payload(252)+CRC_hi(1)+CRC_lo(1)+ETX(1) = 258."""
|
||
samples = list(range(126))
|
||
frame = build_audio_frame(samples)
|
||
assert len(frame) == 258
|
||
|
||
def test_frame_crc_validates(self):
|
||
"""CRC in AUDIO frame validates against CMD+payload."""
|
||
samples = [1000, -1000, 500, -500]
|
||
frame = build_audio_frame(samples)
|
||
# Body = CMD byte + payload
|
||
body = frame[2:-3] # CMD + payload (skip STX, LEN, CRC_hi, CRC_lo, ETX)
|
||
# Actually: frame = [STX][LEN][CMD][...payload...][CRC_hi][CRC_lo][ETX]
|
||
cmd_and_payload = bytes([frame[2]]) + frame[3:-3]
|
||
expected_crc = _crc16_xmodem(cmd_and_payload)
|
||
crc_in_frame = (frame[-3] << 8) | frame[-2]
|
||
assert crc_in_frame == expected_crc
|
||
|
||
def test_frame_payload_little_endian(self):
|
||
"""Samples are encoded as little-endian int16_t."""
|
||
samples = [0x1234]
|
||
frame = build_audio_frame(samples)
|
||
# payload bytes at frame[3:5]
|
||
lo, hi = frame[3], frame[4]
|
||
assert lo == 0x34
|
||
assert hi == 0x12
|
||
|
||
def test_odd_payload_bytes_rejected(self):
|
||
"""Payload with odd byte count must not be passed (always even: 2*N samples)."""
|
||
# Odd-byte payload would be malformed; driver checks (plen & 1) == 0
|
||
bad_plen = 5
|
||
assert bad_plen % 2 != 0
|
||
|
||
def test_audio_cmd_follows_estop(self):
|
||
"""AUDIO (0x08) is numerically after ESTOP (0x07)."""
|
||
JLINK_CMD_ESTOP = 0x07
|
||
assert JLINK_CMD_AUDIO > JLINK_CMD_ESTOP
|
||
|
||
|
||
# ── Hardware constants ────────────────────────────────────────────────────────
|
||
|
||
class TestHardwareConstants:
|
||
def test_sample_rate(self):
|
||
assert AUDIO_SAMPLE_RATE == 22050
|
||
|
||
def test_buf_half_is_20ms(self):
|
||
"""441 samples at 22050 Hz ≈ 20 ms."""
|
||
ms = AUDIO_BUF_HALF * 1000 / AUDIO_SAMPLE_RATE
|
||
assert abs(ms - 20.0) < 0.1
|
||
|
||
def test_buf_size_is_two_halves(self):
|
||
assert AUDIO_BUF_SIZE == AUDIO_BUF_HALF * 2
|
||
|
||
def test_dma_half_irq_latency_budget(self):
|
||
"""At 22050 Hz, 441 samples give 20 ms to refill — well above 1 ms loop."""
|
||
refill_budget_ms = AUDIO_BUF_HALF * 1000 / AUDIO_SAMPLE_RATE
|
||
main_loop_ms = 1 # 1 kHz main loop
|
||
assert refill_budget_ms > main_loop_ms * 5 # 20x margin
|
||
|
||
def test_plli2s_frequency(self):
|
||
"""PLLI2S: N=192, R=2, PLLM=8, HSE=8 MHz → 96 MHz I2S clock."""
|
||
hse_mhz = 8
|
||
pllm = 8
|
||
plli2s_n = 192
|
||
plli2s_r = 2
|
||
i2s_clk = (hse_mhz / pllm) * plli2s_n / plli2s_r
|
||
assert i2s_clk == pytest.approx(96.0)
|
||
|
||
def test_actual_sample_rate_accuracy(self):
|
||
"""Actual FS with I2SDIV=68 is within 0.1% of 22050 Hz."""
|
||
i2s_clk = 96_000_000
|
||
i2sdiv = 68
|
||
fs_actual = i2s_clk / (32 * 2 * i2sdiv)
|
||
assert abs(fs_actual - 22050) / 22050 < 0.001
|
||
|
||
def test_volume_default_in_range(self):
|
||
assert 0 <= AUDIO_VOLUME_DEF <= 100
|
||
|
||
def test_pcm_fifo_185ms_capacity(self):
|
||
"""4096 samples at 22050 Hz ≈ 185.76 ms of audio (Jetson jitter buffer)."""
|
||
ms = PCM_FIFO_SIZE * 1000 / AUDIO_SAMPLE_RATE
|
||
assert ms == pytest.approx(185.76, abs=0.1)
|