Files
headmic/audio_stream.py
2026-04-11 15:11:22 -05:00

193 lines
6.3 KiB
Python

"""
Dual audio stream manager for two XVF3800 mic arrays.
Runs two arecord subprocesses (one per array) and provides best-beam selection:
the stream with higher energy is considered "active" (facing the speaker).
"""
import logging
import struct
import subprocess
import threading
import time
from typing import Optional, Generator
import numpy as np
logger = logging.getLogger("headmic.audio")
SAMPLE_RATE = 16000
FRAME_SIZE = 512 # Porcupine requires 512 samples
BYTES_PER_FRAME = FRAME_SIZE * 2 # 16-bit = 2 bytes per sample
ENERGY_WINDOW = 10 # frames to average for energy comparison
class MicStream:
"""Audio stream from a single ALSA device via arecord subprocess."""
def __init__(self, label: str, alsa_device: str):
self.label = label
self.alsa_device = alsa_device
self.proc: Optional[subprocess.Popen] = None
self.running = False
self.current_frame: Optional[bytes] = None
self.energy: float = 0.0
self._energy_history: list[float] = []
self._lock = threading.Lock()
self._thread: Optional[threading.Thread] = None
def start(self):
cmd = [
"arecord",
"-D", self.alsa_device,
"-f", "S16_LE",
"-r", str(SAMPLE_RATE),
"-c", "1",
"-t", "raw",
"-q",
"-"
]
logger.info("[%s] Starting: %s", self.label, " ".join(cmd))
self.proc = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
bufsize=BYTES_PER_FRAME
)
self.running = True
self._thread = threading.Thread(target=self._read_loop, daemon=True)
self._thread.start()
def _read_loop(self):
try:
while self.running and self.proc:
data = self.proc.stdout.read(BYTES_PER_FRAME)
if len(data) < BYTES_PER_FRAME:
break
# Compute frame energy (RMS)
samples = np.frombuffer(data, dtype=np.int16).astype(np.float32)
rms = float(np.sqrt(np.mean(samples * samples))) / 32768.0
with self._lock:
self.current_frame = data
self._energy_history.append(rms)
if len(self._energy_history) > ENERGY_WINDOW:
self._energy_history.pop(0)
self.energy = sum(self._energy_history) / len(self._energy_history)
except Exception as e:
logger.error("[%s] Read error: %s", self.label, e)
finally:
logger.info("[%s] Stream ended", self.label)
def get_frame(self) -> Optional[bytes]:
with self._lock:
return self.current_frame
def get_energy(self) -> float:
with self._lock:
return self.energy
def stop(self):
self.running = False
if self.proc:
try:
self.proc.terminate()
self.proc.wait(timeout=2)
except Exception:
try:
self.proc.kill()
except Exception:
pass
self.proc = None
class DualAudioStream:
"""
Manages two MicStreams and provides best-beam selection.
Usage:
stream = DualAudioStream(left_alsa, right_alsa)
stream.start()
for frame_data, side in stream.frames():
# frame_data is 512 samples (1024 bytes) of int16 PCM
# side is "left" or "right" (whichever has more energy)
...
stream.stop()
"""
def __init__(self, left_device: str, right_device: Optional[str] = None):
self.left = MicStream("left", left_device)
self.right = MicStream("right", right_device) if right_device else None
self.active_side: str = "left"
self._running = False
def start(self):
self._running = True
self.left.start()
if self.right:
self.right.start()
# Short delay so first frames are populated
time.sleep(0.1)
def stop(self):
self._running = False
self.left.stop()
if self.right:
self.right.stop()
def frames(self) -> Generator[tuple[bytes, str], None, None]:
"""
Yield (frame_bytes, side) at Porcupine's expected rate.
Always yields from the higher-energy side (best beam).
Falls back to left if right is unavailable.
"""
interval = FRAME_SIZE / SAMPLE_RATE # 0.032s = 32ms
last_frame_left = None
last_frame_right = None
while self._running:
t0 = time.monotonic()
frame_left = self.left.get_frame()
frame_right = self.right.get_frame() if self.right else None
# Wait for at least one new frame
if frame_left is None and frame_right is None:
time.sleep(0.005)
continue
# Skip if no new data since last yield
if frame_left == last_frame_left and frame_right == last_frame_right:
time.sleep(0.002)
continue
last_frame_left = frame_left
last_frame_right = frame_right
# Pick best beam
if frame_right is None:
self.active_side = "left"
yield frame_left, "left"
else:
left_energy = self.left.get_energy()
right_energy = self.right.get_energy()
if right_energy > left_energy * 1.1: # 10% hysteresis
self.active_side = "right"
elif left_energy > right_energy * 1.1:
self.active_side = "left"
# else: keep current active_side (hysteresis prevents flapping)
if self.active_side == "right" and frame_right:
yield frame_right, "right"
else:
yield frame_left, "left"
# Pace to ~32ms per frame
elapsed = time.monotonic() - t0
if elapsed < interval:
time.sleep(interval - elapsed)
def get_side_frame(self, side: str) -> Optional[bytes]:
"""Get the latest frame from a specific side."""
if side == "right" and self.right:
return self.right.get_frame()
return self.left.get_frame()