diff --git a/audio_stream.py b/audio_stream.py new file mode 100644 index 0000000..fb2b0c4 --- /dev/null +++ b/audio_stream.py @@ -0,0 +1,192 @@ +""" +Dual audio stream manager for two XVF3800 mic arrays. + +Runs two arecord subprocesses (one per array) and provides best-beam selection: +the stream with higher energy is considered "active" (facing the speaker). +""" + +import logging +import struct +import subprocess +import threading +import time +from typing import Optional, Generator + +import numpy as np + +logger = logging.getLogger("headmic.audio") + +SAMPLE_RATE = 16000 +FRAME_SIZE = 512 # Porcupine requires 512 samples +BYTES_PER_FRAME = FRAME_SIZE * 2 # 16-bit = 2 bytes per sample +ENERGY_WINDOW = 10 # frames to average for energy comparison + + +class MicStream: + """Audio stream from a single ALSA device via arecord subprocess.""" + + def __init__(self, label: str, alsa_device: str): + self.label = label + self.alsa_device = alsa_device + self.proc: Optional[subprocess.Popen] = None + self.running = False + self.current_frame: Optional[bytes] = None + self.energy: float = 0.0 + self._energy_history: list[float] = [] + self._lock = threading.Lock() + self._thread: Optional[threading.Thread] = None + + def start(self): + cmd = [ + "arecord", + "-D", self.alsa_device, + "-f", "S16_LE", + "-r", str(SAMPLE_RATE), + "-c", "1", + "-t", "raw", + "-q", + "-" + ] + logger.info("[%s] Starting: %s", self.label, " ".join(cmd)) + self.proc = subprocess.Popen( + cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, + bufsize=BYTES_PER_FRAME + ) + self.running = True + self._thread = threading.Thread(target=self._read_loop, daemon=True) + self._thread.start() + + def _read_loop(self): + try: + while self.running and self.proc: + data = self.proc.stdout.read(BYTES_PER_FRAME) + if len(data) < BYTES_PER_FRAME: + break + # Compute frame energy (RMS) + samples = np.frombuffer(data, dtype=np.int16).astype(np.float32) + rms = float(np.sqrt(np.mean(samples * samples))) / 32768.0 + + with self._lock: + self.current_frame = data + self._energy_history.append(rms) + if len(self._energy_history) > ENERGY_WINDOW: + self._energy_history.pop(0) + self.energy = sum(self._energy_history) / len(self._energy_history) + except Exception as e: + logger.error("[%s] Read error: %s", self.label, e) + finally: + logger.info("[%s] Stream ended", self.label) + + def get_frame(self) -> Optional[bytes]: + with self._lock: + return self.current_frame + + def get_energy(self) -> float: + with self._lock: + return self.energy + + def stop(self): + self.running = False + if self.proc: + try: + self.proc.terminate() + self.proc.wait(timeout=2) + except Exception: + try: + self.proc.kill() + except Exception: + pass + self.proc = None + + +class DualAudioStream: + """ + Manages two MicStreams and provides best-beam selection. + + Usage: + stream = DualAudioStream(left_alsa, right_alsa) + stream.start() + for frame_data, side in stream.frames(): + # frame_data is 512 samples (1024 bytes) of int16 PCM + # side is "left" or "right" (whichever has more energy) + ... + stream.stop() + """ + + def __init__(self, left_device: str, right_device: Optional[str] = None): + self.left = MicStream("left", left_device) + self.right = MicStream("right", right_device) if right_device else None + self.active_side: str = "left" + self._running = False + + def start(self): + self._running = True + self.left.start() + if self.right: + self.right.start() + # Short delay so first frames are populated + time.sleep(0.1) + + def stop(self): + self._running = False + self.left.stop() + if self.right: + self.right.stop() + + def frames(self) -> Generator[tuple[bytes, str], None, None]: + """ + Yield (frame_bytes, side) at Porcupine's expected rate. + Always yields from the higher-energy side (best beam). + Falls back to left if right is unavailable. + """ + interval = FRAME_SIZE / SAMPLE_RATE # 0.032s = 32ms + last_frame_left = None + last_frame_right = None + + while self._running: + t0 = time.monotonic() + + frame_left = self.left.get_frame() + frame_right = self.right.get_frame() if self.right else None + + # Wait for at least one new frame + if frame_left is None and frame_right is None: + time.sleep(0.005) + continue + + # Skip if no new data since last yield + if frame_left == last_frame_left and frame_right == last_frame_right: + time.sleep(0.002) + continue + + last_frame_left = frame_left + last_frame_right = frame_right + + # Pick best beam + if frame_right is None: + self.active_side = "left" + yield frame_left, "left" + else: + left_energy = self.left.get_energy() + right_energy = self.right.get_energy() + if right_energy > left_energy * 1.1: # 10% hysteresis + self.active_side = "right" + elif left_energy > right_energy * 1.1: + self.active_side = "left" + # else: keep current active_side (hysteresis prevents flapping) + + if self.active_side == "right" and frame_right: + yield frame_right, "right" + else: + yield frame_left, "left" + + # Pace to ~32ms per frame + elapsed = time.monotonic() - t0 + if elapsed < interval: + time.sleep(interval - elapsed) + + def get_side_frame(self, side: str) -> Optional[bytes]: + """Get the latest frame from a specific side.""" + if side == "right" and self.right: + return self.right.get_frame() + return self.left.get_frame() diff --git a/headmic.py b/headmic.py index 885f680..b7162c1 100644 --- a/headmic.py +++ b/headmic.py @@ -7,27 +7,32 @@ Runs on head-vixy (Raspberry Pi 5). Wake word: "Hey Vivi" (trained via Picovoice Porcupine) -Architecture: Single shared audio stream feeds both Porcupine (wake word) -and recording buffer. This avoids device conflicts. +Architecture: Dual XVF3800 mic arrays (left/right ear), best-beam selection. +Single shared audio stream feeds Porcupine, VAD, sound classification, and speaker ID. Flow: - 1. Continuous audio stream from ReSpeaker - 2. Feed frames to Porcupine for wake word detection - 3. On "Hey Vivi" → start buffering audio - 4. Use VAD to detect end of speech - 5. Send buffer to EarTail for transcription - 6. Return to listening mode + 1. Dual audio streams from two XVF3800 arrays + 2. Best-beam selection (higher energy side) + 3. Feed frames to Porcupine for wake word detection + 4. On "Hey Vivi" → start buffering from active side + 5. Use VAD to detect end of speech + 6. Send buffer to EarTail for transcription + 7. Return to listening mode + +Hardware: 2× ReSpeaker XVF3800 4-Mic Array (USB, 2-channel firmware) +DoA + LEDs via USB vendor control (xvf3800.py) Built by Vixy on Day 77 (January 17, 2026) 💜 +Upgraded to dual XVF3800 on Day 160 (April 2026) """ import asyncio import collections import io +import json import logging import os import struct -import subprocess import threading import time import wave @@ -53,7 +58,8 @@ PORCUPINE_ACCESS_KEY = os.environ.get("PORCUPINE_ACCESS_KEY", "") WAKE_WORD_PATH = os.environ.get("WAKE_WORD_PATH", "/home/alex/headmic/Hey-Vivi_en_raspberry-pi_v4_0_0.ppn") SAMPLE_RATE = 16000 -ALSA_DEVICE = "plughw:ArrayUAC10,0" # ReSpeaker 4 Mic Array - by name, not card number (survives reboot order changes) +CONFIG_DIR = os.path.expanduser("~/.vixy") +CONFIG_PATH = os.path.join(CONFIG_DIR, "headmic.json") VAD_AGGRESSIVENESS = 2 # 0-3, higher = more aggressive SILENCE_FRAMES = 50 # ~1.5 sec of silence to stop (at 30ms frames) @@ -61,54 +67,73 @@ MAX_RECORDING_FRAMES = 1000 # ~30 sec max EARTAIL_URL = os.environ.get("EARTAIL_URL", "http://bigorin.local:8764") +DOA_POLL_HZ = 10 # DoA polling rate +EYE_SERVICE_URL = os.environ.get("EYE_SERVICE_URL", "http://localhost:8780") + # ============================================================================ -# LED Control +# Config persistence # ============================================================================ -try: - from pixel_ring import pixel_ring - LEDS_AVAILABLE = True - pixel_ring.off() -except ImportError: - LEDS_AVAILABLE = False - logger.warning("pixel_ring not available") +def load_config() -> dict: + if not os.path.exists(CONFIG_PATH): + return {} + try: + with open(CONFIG_PATH) as f: + return json.load(f) + except Exception as e: + logger.warning("Failed to read config: %s", e) + return {} + + +def save_config(cfg: dict): + os.makedirs(CONFIG_DIR, exist_ok=True) + with open(CONFIG_PATH, "w") as f: + json.dump(cfg, f, indent=2) + + +# ============================================================================ +# XVF3800 + LED Control +# ============================================================================ + +from xvf3800 import XVF3800Manager, learn_devices + +xvf_manager = XVF3800Manager() + +LEDS_AVAILABLE = False def leds_wakeup(): if LEDS_AVAILABLE: try: - pixel_ring.wakeup() + xvf_manager.all_leds_solid(0xFFFFFF) except: pass def leds_listening(): if LEDS_AVAILABLE: try: - pixel_ring.set_color_palette(0x00FFFF, 0x000000) - pixel_ring.think() + xvf_manager.all_leds_doa() except: pass def leds_processing(): if LEDS_AVAILABLE: try: - pixel_ring.set_color_palette(0x9400D3, 0x000000) - pixel_ring.spin() + xvf_manager.all_leds_breath(0x9400D3) except: pass def leds_enrolling(): if LEDS_AVAILABLE: try: - pixel_ring.set_color_palette(0xFF8C00, 0x000000) - pixel_ring.think() + xvf_manager.all_leds_solid(0xFF8C00) except: pass def leds_off(): if LEDS_AVAILABLE: try: - pixel_ring.off() + xvf_manager.all_leds_off() except: pass @@ -132,6 +157,8 @@ class ServiceState: self.speaker_confidence: float = 0.0 self.speaker_recognition_enabled: bool = False self.enrolling: bool = False + self.active_side: str = "left" # which mic array is currently active + self.doa: dict = {} # latest DoA from both arrays state = ServiceState() @@ -144,48 +171,8 @@ speaker_recognizer = None enrollment_buffer = None # list of frame bytes, set during enrollment enrollment_name = None - -# ============================================================================ -# Audio Stream using ALSA directly (arecord) -# ============================================================================ - -def read_audio_stream(): - """ - Generator that yields audio frames from ALSA using arecord. - Each frame is 512 samples (32ms at 16kHz) as required by Porcupine. - """ - frame_size = 512 # Porcupine requires 512 samples - bytes_per_frame = frame_size * 2 # 16-bit = 2 bytes per sample - - cmd = [ - "arecord", - "-D", ALSA_DEVICE, - "-f", "S16_LE", - "-r", str(SAMPLE_RATE), - "-c", "1", # Mono - "-t", "raw", - "-q", # Quiet - "-" - ] - - logger.info(f"Starting audio stream: {' '.join(cmd)}") - - proc = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.DEVNULL, - bufsize=bytes_per_frame - ) - - try: - while state.running: - data = proc.stdout.read(bytes_per_frame) - if len(data) < bytes_per_frame: - break - yield data - finally: - proc.terminate() - proc.wait() +# Audio stream +dual_stream = None # DualAudioStream instance # ============================================================================ @@ -198,22 +185,22 @@ async def transcribe_audio(audio_data: bytes) -> str: files = {"audio": ("recording.wav", audio_data, "audio/wav")} response = await client.post(f"{EARTAIL_URL}/transcribe/submit", files=files) response.raise_for_status() - + job_id = response.json().get("job_id") logger.info(f"Transcription job: {job_id}") - + for _ in range(120): status = await client.get(f"{EARTAIL_URL}/transcribe/status/{job_id}") data = status.json() - + if data.get("status") == "SUCCESS": result = await client.get(f"{EARTAIL_URL}/transcribe/result/{job_id}") return result.json().get("transcription", "") elif data.get("status") == "FAILURE": raise Exception(f"Transcription failed: {data.get('error')}") - + await asyncio.sleep(1) - + raise Exception("Transcription timeout") @@ -227,7 +214,7 @@ def transcribe_sync(audio_data: bytes) -> str: # ============================================================================ -# Main Listener Loop +# Main Listener Loop (dual-stream) # ============================================================================ def audio_to_wav(frames: List[bytes]) -> bytes: @@ -243,9 +230,9 @@ def audio_to_wav(frames: List[bytes]) -> bytes: def listener_loop(): - """Main audio processing loop.""" - global state - + """Main audio processing loop with dual-stream best-beam selection.""" + global state, dual_stream + logger.info("Initializing Porcupine...") try: porcupine = pvporcupine.create( @@ -256,26 +243,27 @@ def listener_loop(): logger.error(f"Failed to init Porcupine: {e}") state.error = str(e) return - + vad = webrtcvad.Vad(VAD_AGGRESSIVENESS) - + # VAD needs 10/20/30ms frames. 30ms at 16kHz = 480 samples - # Porcupine needs 512 samples. We'll use 480 for VAD. - vad_frame_size = 480 - vad_frame_bytes = vad_frame_size * 2 - + vad_frame_bytes = 480 * 2 + state.listening = True logger.info("🦊 Wake word listener active - say 'Hey Vivi'!") - + recording_buffer: List[bytes] = [] silence_count = 0 is_recording = False - + recording_side: str = "left" + try: - for frame_data in read_audio_stream(): + for frame_data, side in dual_stream.frames(): if not state.running: break - + + state.active_side = side + # Convert bytes to int16 array for Porcupine pcm = struct.unpack_from("h" * 512, frame_data) @@ -289,52 +277,56 @@ def listener_loop(): # Check for wake word keyword_index = porcupine.process(pcm) - + if keyword_index >= 0 and not is_recording: - logger.info("🦊 Wake word detected: 'Hey Vivi'!") + logger.info("🦊 Wake word detected: 'Hey Vivi'! (from %s ear)", side) state.wake_count += 1 state.last_wake_time = time.time() - + recording_side = side + leds_wakeup() time.sleep(0.2) leds_listening() - + is_recording = True state.recording = True recording_buffer = [] silence_count = 0 - logger.info("Recording started...") + logger.info("Recording started (using %s ear)...", recording_side) continue - + if is_recording: - recording_buffer.append(frame_data) - + # During recording, use frames from the side that heard the wake word + rec_frame = dual_stream.get_side_frame(recording_side) + if rec_frame: + recording_buffer.append(rec_frame) + # Check VAD (use first 480 samples of the 512 frame) - vad_data = frame_data[:vad_frame_bytes] + vad_data = (rec_frame or frame_data)[:vad_frame_bytes] try: is_speech = vad.is_speech(vad_data, SAMPLE_RATE) except: - is_speech = True # Assume speech on VAD error - + is_speech = True + if is_speech: silence_count = 0 else: silence_count += 1 - + # Stop conditions should_stop = ( (len(recording_buffer) > 10 and silence_count >= SILENCE_FRAMES) or len(recording_buffer) >= MAX_RECORDING_FRAMES ) - + if should_stop: logger.info(f"Recording stopped: {len(recording_buffer)} frames") is_recording = False state.recording = False - + leds_processing() state.processing = True - + try: wav_data = audio_to_wav(recording_buffer) transcription = transcribe_sync(wav_data) @@ -346,9 +338,9 @@ def listener_loop(): finally: state.processing = False leds_off() - + recording_buffer = [] - + except Exception as e: logger.error(f"Listener error: {e}") state.error = str(e) @@ -396,20 +388,82 @@ def sound_classifier_loop(): logger.info("Sound classifier thread stopped") +# ============================================================================ +# DoA Polling Thread +# ============================================================================ + +def doa_poll_loop(): + """Poll Direction of Arrival from both XVF3800 arrays.""" + interval = 1.0 / DOA_POLL_HZ + while state.running: + try: + state.doa = xvf_manager.read_both_doa() + except Exception as e: + logger.debug("DoA poll error: %s", e) + time.sleep(interval) + + +def doa_to_gaze() -> Optional[tuple[int, int]]: + """Convert the active side's DoA angle to gaze coordinates for the eye service.""" + doa = state.doa + side = state.active_side + if not doa or side not in doa or doa[side] is None: + return None + if not doa[side].get("vad"): + return None + import math + angle = doa[side]["angle"] + rad = math.radians(angle) + x = int(127 - 80 * math.sin(rad)) + y = int(127 - 40 * math.cos(rad)) + return max(0, min(255, x)), max(0, min(255, y)) + + # ============================================================================ # FastAPI # ============================================================================ -app = FastAPI(title="HeadMic", description="Vixy's Ears 🦊👂") +app = FastAPI(title="HeadMic", description="Vixy's Ears 🦊👂 (Dual XVF3800)") @app.on_event("startup") async def startup(): - global sound_classifier, sound_ring_buffer, speaker_recognizer + global sound_classifier, sound_ring_buffer, speaker_recognizer, dual_stream, LEDS_AVAILABLE state.running = True - # Init sound classifier (optional — graceful if model missing) + # --- XVF3800 setup --- + cfg = load_config() + ears_cfg = cfg.get("ears", {}) + if ears_cfg.get("left") and ears_cfg.get("right"): + xvf_manager.set_serial_mapping( + ears_cfg["left"]["usb_serial"], + ears_cfg["right"]["usb_serial"] + ) + xvf_manager.assign() + LEDS_AVAILABLE = bool(xvf_manager.left or xvf_manager.right) + + # Resolve ALSA devices + alsa = xvf_manager.get_alsa_devices() + left_dev = alsa.get("left") + right_dev = alsa.get("right") + + if not left_dev: + logger.error("No left ear ALSA device found! Check USB connections and firmware.") + state.error = "No left ear audio device" + else: + logger.info("Left ear ALSA: %s", left_dev) + if right_dev: + logger.info("Right ear ALSA: %s", right_dev) + else: + logger.warning("Right ear ALSA device not found — running with left ear only") + + # --- Dual audio stream --- + from audio_stream import DualAudioStream + dual_stream = DualAudioStream(left_dev or "plughw:0,0", right_dev) + dual_stream.start() + + # --- Sound classifier (optional) --- model_dir = Path(__file__).parent / "models" model_path = model_dir / "yamnet.tflite" class_map_path = model_dir / "yamnet_class_map.csv" @@ -417,7 +471,6 @@ async def startup(): try: from sound_id import SoundClassifier sound_classifier = SoundClassifier(str(model_path), str(class_map_path)) - # 31 frames of 512 samples = ~0.99s at 16kHz sound_ring_buffer = collections.deque(maxlen=31) state.sound_classification_enabled = True logger.info("Sound classification enabled (YAMNet)") @@ -429,7 +482,7 @@ async def startup(): else: logger.info("Sound classification models not found, skipping") - # Init speaker recognizer (optional — graceful if resemblyzer not installed) + # --- Speaker recognizer (optional) --- try: from speaker_id import SpeakerRecognizer db_path = Path(__file__).parent / "voices.db" @@ -439,22 +492,32 @@ async def startup(): except Exception as e: logger.warning("Speaker recognition unavailable: %s", e) + # --- DoA polling --- + if xvf_manager.left or xvf_manager.right: + threading.Thread(target=doa_poll_loop, daemon=True).start() + logger.info("DoA polling started at %d Hz", DOA_POLL_HZ) + + # --- Main listener --- thread = threading.Thread(target=listener_loop, daemon=True) thread.start() - logger.info("HeadMic started") + logger.info("HeadMic started (dual XVF3800)") @app.on_event("shutdown") async def shutdown(): state.running = False leds_off() + if dual_stream: + dual_stream.stop() +# --- Info endpoints --- + @app.get("/") async def root(): return { "service": "HeadMic", - "description": "Vixy's Ears 🦊👂", + "description": "Vixy's Ears 🦊👂 (Dual XVF3800)", "wake_word": "Hey Vivi" } @@ -469,6 +532,7 @@ async def health(): "wake_count": state.wake_count, "sound_classification_enabled": state.sound_classification_enabled, "speaker_recognition_enabled": state.speaker_recognition_enabled, + "active_side": state.active_side, "error": state.error } @@ -484,6 +548,7 @@ async def status(): "wake_count": state.wake_count, "audio_scene": state.audio_scene["dominant_category"] if state.audio_scene else None, "recognized_speaker": state.recognized_speaker, + "active_side": state.active_side, "error": state.error } @@ -496,6 +561,41 @@ async def last(): } +# --- DoA endpoints --- + +@app.get("/doa") +async def doa(): + """Direction of Arrival from both mic arrays.""" + return { + "doa": state.doa, + "active_side": state.active_side, + "gaze": doa_to_gaze(), + } + + +# --- Device info --- + +@app.get("/devices") +async def devices(): + """Status of both XVF3800 arrays.""" + alsa = xvf_manager.get_alsa_devices() + return { + "left": { + "connected": bool(xvf_manager.left), + "serial": xvf_manager.left.serial if xvf_manager.left else None, + "alsa": alsa.get("left"), + }, + "right": { + "connected": bool(xvf_manager.right), + "serial": xvf_manager.right.serial if xvf_manager.right else None, + "alsa": alsa.get("right"), + }, + "active_side": state.active_side, + } + + +# --- Sound endpoints --- + @app.get("/sounds") async def sounds(): """Current audio scene classification.""" @@ -521,9 +621,7 @@ async def sounds_history(seconds: int = 30): return {"history": sound_classifier.get_history(seconds)} -# ============================================================================ -# Speaker Endpoints -# ============================================================================ +# --- Speaker endpoints --- @app.post("/speakers/enroll") async def enroll_speaker(name: str = Form(...), audio: UploadFile = File(...)): @@ -532,7 +630,6 @@ async def enroll_speaker(name: str = Form(...), audio: UploadFile = File(...)): raise HTTPException(status_code=503, detail="Speaker recognition not available") audio_bytes = await audio.read() - # Convert to float32: try raw int16 first, fall back to wav try: import wave as _wave wav_io = io.BytesIO(audio_bytes) @@ -540,7 +637,6 @@ async def enroll_speaker(name: str = Form(...), audio: UploadFile = File(...)): raw = wf.readframes(wf.getnframes()) audio_f32 = np.frombuffer(raw, dtype=np.int16).astype(np.float32) / 32768.0 except Exception: - # Assume raw int16 PCM at 16kHz audio_f32 = np.frombuffer(audio_bytes, dtype=np.int16).astype(np.float32) / 32768.0 try: @@ -553,7 +649,7 @@ async def enroll_speaker(name: str = Form(...), audio: UploadFile = File(...)): @app.post("/speakers/enroll-from-mic") async def enroll_from_mic(name: str): """Record from live mic for 5 seconds and enroll speaker.""" - global enrollment_buffer, enrollment_name, enrollment_event + global enrollment_buffer, enrollment_name if speaker_recognizer is None: raise HTTPException(status_code=503, detail="Speaker recognition not available") @@ -567,10 +663,8 @@ async def enroll_from_mic(name: str): leds_enrolling() logger.info("Enrollment started for '%s' — recording 5 seconds", name) - # Wait 5 seconds for audio, non-blocking to the event loop await asyncio.sleep(5.0) - # Collect what we have frames = enrollment_buffer enrollment_buffer = None enrollment_name = None @@ -611,6 +705,25 @@ async def delete_speaker(name: str): return {"deleted": name, "samples_removed": removed} +# ============================================================================ +# CLI +# ============================================================================ + if __name__ == "__main__": + import sys + + if "--learn" in sys.argv: + logging.basicConfig(level=logging.INFO) + info = learn_devices() + if not info.get("left") or not info.get("right"): + print("[HEADMIC] Need 2 XVF3800 arrays connected for --learn") + sys.exit(1) + cfg = load_config() + cfg["ears"] = info + save_config(cfg) + print(f"[HEADMIC] Learned ear config → {CONFIG_PATH}") + print(json.dumps(info, indent=2)) + sys.exit(0) + import uvicorn uvicorn.run(app, host="0.0.0.0", port=8446) diff --git a/headmic.service b/headmic.service index a1ae80d..a101127 100644 --- a/headmic.service +++ b/headmic.service @@ -1,5 +1,5 @@ [Unit] -Description=HeadMic - Vixy's Ears Service +Description=HeadMic - Vixy's Ears Service (Dual XVF3800) After=network.target sound.target [Service] diff --git a/requirements.txt b/requirements.txt index e170f41..3dc2aec 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,9 +15,8 @@ pvporcupine>=3.0.0 # HTTP client for EarTail httpx>=0.25.0 -# ReSpeaker LED control -# pixel_ring - install from: https://github.com/respeaker/pixel_ring -# pip install pixel_ring +# XVF3800 USB control (DoA + LEDs) +pyusb>=1.2.0 # Pydantic for models pydantic>=2.0.0 diff --git a/xvf3800.py b/xvf3800.py new file mode 100644 index 0000000..14dd8b4 --- /dev/null +++ b/xvf3800.py @@ -0,0 +1,283 @@ +""" +XVF3800 USB Control — DoA, LEDs, device identification. + +Each ReSpeaker XVF3800 4-Mic Array is controlled via USB vendor commands (PyUSB). +Replaces the old pixel_ring / Tuning interface used by the XVF3000. + +Reference: https://github.com/respeaker/reSpeaker_XVF3800_USB_4MIC_ARRAY/blob/master/python_control/xvf_host.py +""" + +import logging +import struct +import time +from typing import Optional + +try: + import usb.core + import usb.util + PYUSB_AVAILABLE = True +except ImportError: + PYUSB_AVAILABLE = False + +logger = logging.getLogger("headmic.xvf3800") + +VID = 0x2886 +PID = 0x001A + +# USB vendor control transfer parameters +CTRL_REQUEST_TYPE_OUT = usb.util.CTRL_OUT | usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_RECIPIENT_DEVICE if PYUSB_AVAILABLE else 0 +CTRL_REQUEST_TYPE_IN = usb.util.CTRL_IN | usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_RECIPIENT_DEVICE if PYUSB_AVAILABLE else 0 + +# Resource IDs +GPO_RESID = 20 + +# Parameter indices (within resource) +DOA_VALUE_IDX = 18 # returns (angle 0-359, vad 0/1) +LED_EFFECT_IDX = 0 # 0=off, 1=breath, 2=rainbow, 3=solid, 4=doa, 5=ring +LED_BRIGHTNESS_IDX = 1 +LED_COLOR_IDX = 3 # single uint32 color +LED_RING_COLOR_IDX = 5 # 12 × uint32 + + +class XVF3800: + """Control a single ReSpeaker XVF3800 via USB vendor commands.""" + + def __init__(self, usb_device): + self.dev = usb_device + self.serial = usb_device.serial_number or "unknown" + self.bus = usb_device.bus + self.address = usb_device.address + + def _read(self, resid: int, param_idx: int, length: int) -> bytes: + """Read parameter via USB control transfer.""" + wValue = (resid << 8) | param_idx + try: + data = self.dev.ctrl_transfer(CTRL_REQUEST_TYPE_IN, 0, wValue, 0, length, timeout=1000) + return bytes(data) + except Exception as e: + logger.debug("USB read error (resid=%d, param=%d): %s", resid, param_idx, e) + return b"" + + def _write(self, resid: int, param_idx: int, data: bytes): + """Write parameter via USB control transfer.""" + wValue = (resid << 8) | param_idx + try: + self.dev.ctrl_transfer(CTRL_REQUEST_TYPE_OUT, 0, wValue, 0, data, timeout=1000) + except Exception as e: + logger.debug("USB write error (resid=%d, param=%d): %s", resid, param_idx, e) + + # --- DoA --- + + def read_doa(self) -> tuple[int, bool]: + """Read Direction of Arrival. Returns (angle 0-359, vad True/False).""" + data = self._read(GPO_RESID, DOA_VALUE_IDX, 4) + if len(data) < 4: + return 0, False + angle, vad = struct.unpack_from(" list[XVF3800]: + """Find all connected XVF3800 devices.""" + if not PYUSB_AVAILABLE: + logger.warning("pyusb not installed — XVF3800 control disabled") + return [] + devices = [] + for dev in usb.core.find(idVendor=VID, idProduct=PID, find_all=True): + try: + devices.append(XVF3800(dev)) + except Exception as e: + logger.warning("Failed to init XVF3800 at bus %d addr %d: %s", + dev.bus, dev.address, e) + return devices + + def assign(self): + """Discover devices and assign left/right based on serial mapping.""" + devices = self.discover() + logger.info("Found %d XVF3800 device(s): %s", + len(devices), [d.serial for d in devices]) + + if self._serials: + for dev in devices: + if dev.serial == self._serials.get("left"): + self.left = dev + elif dev.serial == self._serials.get("right"): + self.right = dev + if not self.left: + logger.warning("Left XVF3800 (serial %s) not found", self._serials.get("left")) + if not self.right: + logger.warning("Right XVF3800 (serial %s) not found", self._serials.get("right")) + else: + # No serial mapping — assign by bus address order (unstable, but works for --learn) + devices.sort(key=lambda d: (d.bus, d.address)) + if len(devices) >= 1: + self.left = devices[0] + if len(devices) >= 2: + self.right = devices[1] + + if self.left: + logger.info("Left ear: serial=%s bus=%d addr=%d", self.left.serial, self.left.bus, self.left.address) + if self.right: + logger.info("Right ear: serial=%s bus=%d addr=%d", self.right.serial, self.right.bus, self.right.address) + + def serial_to_alsa(self, serial: str) -> Optional[str]: + """Find the ALSA card name for a device with a given USB serial number. + Searches /proc/asound/cards and matches via sysfs.""" + import os, glob + # Walk /sys/class/sound/card*/device -> look for matching USB serial + for card_dir in sorted(glob.glob("/sys/class/sound/card*")): + card_num = os.path.basename(card_dir).replace("card", "") + # Follow the device symlink up to the USB device + device_path = os.path.join(card_dir, "device") + if not os.path.islink(device_path): + continue + usb_path = os.path.realpath(device_path) + serial_file = os.path.join(usb_path, "..", "serial") + if not os.path.exists(serial_file): + serial_file = os.path.join(usb_path, "..", "..", "serial") + if os.path.exists(serial_file): + try: + dev_serial = open(serial_file).read().strip() + if dev_serial == serial: + # Read the card ID (ALSA name) + id_file = os.path.join(card_dir, "id") + if os.path.exists(id_file): + return open(id_file).read().strip() + return card_num + except Exception: + pass + return None + + def get_alsa_devices(self) -> dict[str, Optional[str]]: + """Return {"left": "plughw:Array,0", "right": "plughw:Array_1,0"} or similar.""" + result = {} + for label, dev in [("left", self.left), ("right", self.right)]: + if dev: + card_name = self.serial_to_alsa(dev.serial) + result[label] = f"plughw:{card_name},0" if card_name else None + else: + result[label] = None + return result + + # --- Convenience: control both arrays --- + + def all_leds_off(self): + for dev in [self.left, self.right]: + if dev: + dev.led_off() + + def all_leds_solid(self, color: int): + for dev in [self.left, self.right]: + if dev: + dev.led_solid(color) + + def all_leds_breath(self, color: int, brightness: int = 128): + for dev in [self.left, self.right]: + if dev: + dev.led_breath(color, brightness) + + def all_leds_doa(self): + for dev in [self.left, self.right]: + if dev: + dev.led_doa() + + def read_both_doa(self) -> dict: + """Read DoA from both arrays.""" + result = {} + for label, dev in [("left", self.left), ("right", self.right)]: + if dev: + angle, vad = dev.read_doa() + result[label] = {"angle": angle, "vad": vad} + else: + result[label] = None + return result + + +def learn_devices() -> dict: + """Discover connected XVF3800 devices and return their serials for config.""" + mgr = XVF3800Manager() + mgr.assign() + result = {} + if mgr.left: + result["left"] = {"usb_serial": mgr.left.serial} + alsa = mgr.serial_to_alsa(mgr.left.serial) + if alsa: + result["left"]["alsa_card"] = alsa + if mgr.right: + result["right"] = {"usb_serial": mgr.right.serial} + alsa = mgr.serial_to_alsa(mgr.right.serial) + if alsa: + result["right"]["alsa_card"] = alsa + return result + + +# === CLI test === +if __name__ == "__main__": + import sys + logging.basicConfig(level=logging.INFO) + + if "--learn" in sys.argv: + info = learn_devices() + import json + print(json.dumps(info, indent=2)) + sys.exit(0) + + if "--test-doa" in sys.argv: + mgr = XVF3800Manager() + mgr.assign() + for _ in range(50): + doa = mgr.read_both_doa() + print(f"DoA: left={doa.get('left')} right={doa.get('right')}", end="\r") + time.sleep(0.1) + print() + sys.exit(0) + + if "--test-leds" in sys.argv: + mgr = XVF3800Manager() + mgr.assign() + for color, name in [(0xFF0000, "red"), (0x00FF00, "green"), (0x0000FF, "blue"), + (0x00FFFF, "cyan"), (0x9400D3, "purple")]: + print(f" {name}") + mgr.all_leds_solid(color) + time.sleep(1) + mgr.all_leds_off() + sys.exit(0)