""" XVF3800 USB Control — DoA, LEDs, device identification. Each ReSpeaker XVF3800 4-Mic Array is controlled via USB vendor commands (PyUSB). Replaces the old pixel_ring / Tuning interface used by the XVF3000. Reference: https://github.com/respeaker/reSpeaker_XVF3800_USB_4MIC_ARRAY/blob/master/python_control/xvf_host.py """ import logging import struct import time from typing import Optional try: import usb.core import usb.util PYUSB_AVAILABLE = True except ImportError: PYUSB_AVAILABLE = False logger = logging.getLogger("headmic.xvf3800") VID = 0x2886 PID = 0x001A # USB vendor control transfer parameters CTRL_REQUEST_TYPE_OUT = usb.util.CTRL_OUT | usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_RECIPIENT_DEVICE if PYUSB_AVAILABLE else 0 CTRL_REQUEST_TYPE_IN = usb.util.CTRL_IN | usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_RECIPIENT_DEVICE if PYUSB_AVAILABLE else 0 # Resource IDs (wIndex in USB control transfer) GPO_RESID = 20 # Command IDs (wValue in USB control transfer) # Verified against official xvf_host.py WriteCMD output DOA_VALUE_CMD = 18 # returns (angle 0-359, vad 0/1) — simple, sluggish LED_EFFECT_CMD = 12 # 0=off, 1=breath, 2=rainbow, 3=solid, 4=doa, 5=ring # AEC resource (resid=33) — beamformer-level commands, more responsive AEC_RESID = 33 AEC_AZIMUTH_CMD = 75 # 4 floats: beam1, beam2, free-running, auto-select (radians) AEC_SPENERGY_CMD = 80 # 4 floats: speech energy per beam (>0 = speech) AUDIO_MGR_RESID = 35 AUDIO_MGR_SELECTED_AZ_CMD = 11 # 2 floats: processed DoA, auto-select DoA (radians) LED_BRIGHTNESS_CMD = 14 LED_COLOR_CMD = 16 # single uint32 color (confirmed: xvf_host LED_COLOR cmdid=16) LED_DOA_COLOR_CMD = 17 # two uint32 values: base + indicator LED_RING_COLOR_CMD = 18 # 12 × uint32 class XVF3800: """Control a single ReSpeaker XVF3800 via USB vendor commands.""" def __init__(self, usb_device): self.dev = usb_device self.serial = usb_device.serial_number or "unknown" self.bus = usb_device.bus self.address = usb_device.address # Vendor control transfers work without claiming an interface — # don't call set_configuration() or detach_kernel_driver(), # the audio driver needs to keep its interface. def _read_uint16(self, resid: int, cmdid: int, count: int) -> bytes: """Read uint16 parameters. Returns raw bytes including 1-byte status header.""" try: length = count * 2 + 1 # +1 for status byte data = self.dev.ctrl_transfer( CTRL_REQUEST_TYPE_IN, 0, 0x80 | cmdid, resid, length, timeout=1000) return bytes(data) except Exception as e: logger.debug("USB read error (resid=%d, cmd=%d): %s", resid, cmdid, e) return b"" def _read_float(self, resid: int, cmdid: int, count: int) -> bytes: """Read float32 parameters. Returns raw bytes including 1-byte status header.""" try: length = count * 4 + 1 # +1 for status byte data = self.dev.ctrl_transfer( CTRL_REQUEST_TYPE_IN, 0, 0x80 | cmdid, resid, length, timeout=1000) return bytes(data) except Exception as e: logger.debug("USB read error (resid=%d, cmd=%d): %s", resid, cmdid, e) return b"" def _write(self, resid: int, cmdid: int, data: bytes): """Write parameter via USB control transfer. wValue = cmdid, wIndex = resid.""" try: self.dev.ctrl_transfer( CTRL_REQUEST_TYPE_OUT, 0, cmdid, # wValue: command ID resid, # wIndex: resource ID data, timeout=1000 ) except Exception as e: logger.debug("USB write error (resid=%d, cmd=%d): %s", resid, cmdid, e) # --- DoA --- def read_doa(self) -> tuple[int, bool]: """Read Direction of Arrival using the auto-select beam azimuth. Returns (angle 0-359 degrees, vad True/False). Uses AUDIO_MGR_SELECTED_AZIMUTHS which returns two values: [0] processed_doa — NaN when no speech, real angle when speech detected (= VAD) [1] auto_select_doa — always tracks strongest source, even noise (= angle) """ import math data = self._read_float(AUDIO_MGR_RESID, AUDIO_MGR_SELECTED_AZ_CMD, 2) if len(data) < 9: # 1 status + 2*4 bytes return 0, False processed_doa, auto_select_doa = struct.unpack_from(" bytes: """Convert 0xRRGGBB to [R, G, B, 0] payload.""" r = (color >> 16) & 0xFF g = (color >> 8) & 0xFF b = color & 0xFF return bytes([r, g, b, 0]) def led_solid(self, color: int): """Solid color on all LEDs. color is 0xRRGGBB.""" self._write(GPO_RESID, LED_COLOR_CMD, self._color_bytes(color)) self._write(GPO_RESID, LED_EFFECT_CMD, bytes([3])) def led_breath(self, color: int, brightness: int = 128): """Breathing effect.""" self._write(GPO_RESID, LED_COLOR_CMD, self._color_bytes(color)) self._write(GPO_RESID, LED_BRIGHTNESS_CMD, bytes([brightness])) self._write(GPO_RESID, LED_EFFECT_CMD, bytes([1])) def led_doa(self, base_color: int = 0x003333, doa_color: int = 0x00FFFF): """DoA indicator mode — shows beam direction on LED ring.""" data = self._color_bytes(base_color) + self._color_bytes(doa_color) self._write(GPO_RESID, LED_DOA_COLOR_CMD, data) self._write(GPO_RESID, LED_EFFECT_CMD, bytes([4])) def led_rainbow(self, brightness: int = 128): self._write(GPO_RESID, LED_BRIGHTNESS_CMD, bytes([brightness])) self._write(GPO_RESID, LED_EFFECT_CMD, bytes([2])) class XVF3800Manager: """Manage two XVF3800 arrays, identified by USB serial number.""" def __init__(self): self.left: Optional[XVF3800] = None self.right: Optional[XVF3800] = None self._serials: dict[str, str] = {} # {"left": "SN...", "right": "SN..."} def set_serial_mapping(self, left_serial: str, right_serial: str): """Pin left/right assignment by USB serial number.""" self._serials = {"left": left_serial, "right": right_serial} def discover(self) -> list[XVF3800]: """Find all connected XVF3800 devices.""" if not PYUSB_AVAILABLE: logger.warning("pyusb not installed — XVF3800 control disabled") return [] devices = [] for dev in usb.core.find(idVendor=VID, idProduct=PID, find_all=True): try: devices.append(XVF3800(dev)) except Exception as e: logger.warning("Failed to init XVF3800 at bus %d addr %d: %s", dev.bus, dev.address, e) return devices def assign(self): """Discover devices and assign left/right based on serial mapping.""" devices = self.discover() logger.info("Found %d XVF3800 device(s): %s", len(devices), [d.serial for d in devices]) if self._serials: for dev in devices: if dev.serial == self._serials.get("left"): self.left = dev elif dev.serial == self._serials.get("right"): self.right = dev if not self.left: logger.warning("Left XVF3800 (serial %s) not found", self._serials.get("left")) if not self.right: logger.warning("Right XVF3800 (serial %s) not found", self._serials.get("right")) else: # No serial mapping — assign by bus address order (unstable, but works for --learn) devices.sort(key=lambda d: (d.bus, d.address)) if len(devices) >= 1: self.left = devices[0] if len(devices) >= 2: self.right = devices[1] if self.left: logger.info("Left ear: serial=%s bus=%d addr=%d", self.left.serial, self.left.bus, self.left.address) if self.right: logger.info("Right ear: serial=%s bus=%d addr=%d", self.right.serial, self.right.bus, self.right.address) def serial_to_alsa(self, serial: str) -> Optional[str]: """Find the ALSA card name for a device with a given USB serial number. Searches /proc/asound/cards and matches via sysfs.""" import os, glob # Walk /sys/class/sound/card*/device -> look for matching USB serial for card_dir in sorted(glob.glob("/sys/class/sound/card*")): card_num = os.path.basename(card_dir).replace("card", "") # Follow the device symlink up to the USB device device_path = os.path.join(card_dir, "device") if not os.path.islink(device_path): continue usb_path = os.path.realpath(device_path) serial_file = os.path.join(usb_path, "..", "serial") if not os.path.exists(serial_file): serial_file = os.path.join(usb_path, "..", "..", "serial") if os.path.exists(serial_file): try: dev_serial = open(serial_file).read().strip() if dev_serial == serial: # Read the card ID (ALSA name) id_file = os.path.join(card_dir, "id") if os.path.exists(id_file): return open(id_file).read().strip() return card_num except Exception: pass return None def get_alsa_devices(self) -> dict[str, Optional[str]]: """Return {"left": "plughw:Array,0", "right": "plughw:Array_1,0"} or similar.""" result = {} for label, dev in [("left", self.left), ("right", self.right)]: if dev: card_name = self.serial_to_alsa(dev.serial) result[label] = f"plughw:{card_name},0" if card_name else None else: result[label] = None return result # --- Convenience: control both arrays --- def all_leds_off(self): for dev in [self.left, self.right]: if dev: dev.led_off() def all_leds_solid(self, color: int): for dev in [self.left, self.right]: if dev: dev.led_solid(color) def all_leds_breath(self, color: int, brightness: int = 128): for dev in [self.left, self.right]: if dev: dev.led_breath(color, brightness) def all_leds_doa(self): for dev in [self.left, self.right]: if dev: dev.led_doa() def read_both_doa(self) -> dict: """Read DoA from both arrays.""" result = {} for label, dev in [("left", self.left), ("right", self.right)]: if dev: angle, vad = dev.read_doa() result[label] = {"angle": angle, "vad": vad} else: result[label] = None return result def learn_devices() -> dict: """Discover connected XVF3800 devices and return their serials for config.""" mgr = XVF3800Manager() mgr.assign() result = {} if mgr.left: result["left"] = {"usb_serial": mgr.left.serial} alsa = mgr.serial_to_alsa(mgr.left.serial) if alsa: result["left"]["alsa_card"] = alsa if mgr.right: result["right"] = {"usb_serial": mgr.right.serial} alsa = mgr.serial_to_alsa(mgr.right.serial) if alsa: result["right"]["alsa_card"] = alsa return result # === CLI test === if __name__ == "__main__": import sys logging.basicConfig(level=logging.INFO) if "--learn" in sys.argv: info = learn_devices() import json print(json.dumps(info, indent=2)) sys.exit(0) if "--test-doa" in sys.argv: mgr = XVF3800Manager() mgr.assign() for _ in range(50): doa = mgr.read_both_doa() print(f"DoA: left={doa.get('left')} right={doa.get('right')}", end="\r") time.sleep(0.1) print() sys.exit(0) if "--test-leds" in sys.argv: mgr = XVF3800Manager() mgr.assign() for color, name in [(0xFF0000, "red"), (0x00FF00, "green"), (0x0000FF, "blue"), (0x00FFFF, "cyan"), (0x9400D3, "purple")]: print(f" {name}") mgr.all_leds_solid(color) time.sleep(1) mgr.all_leds_off() sys.exit(0)