updates for dual mic array
This commit is contained in:
283
xvf3800.py
Normal file
283
xvf3800.py
Normal file
@@ -0,0 +1,283 @@
|
||||
"""
|
||||
XVF3800 USB Control — DoA, LEDs, device identification.
|
||||
|
||||
Each ReSpeaker XVF3800 4-Mic Array is controlled via USB vendor commands (PyUSB).
|
||||
Replaces the old pixel_ring / Tuning interface used by the XVF3000.
|
||||
|
||||
Reference: https://github.com/respeaker/reSpeaker_XVF3800_USB_4MIC_ARRAY/blob/master/python_control/xvf_host.py
|
||||
"""
|
||||
|
||||
import logging
|
||||
import struct
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
try:
|
||||
import usb.core
|
||||
import usb.util
|
||||
PYUSB_AVAILABLE = True
|
||||
except ImportError:
|
||||
PYUSB_AVAILABLE = False
|
||||
|
||||
logger = logging.getLogger("headmic.xvf3800")
|
||||
|
||||
VID = 0x2886
|
||||
PID = 0x001A
|
||||
|
||||
# USB vendor control transfer parameters
|
||||
CTRL_REQUEST_TYPE_OUT = usb.util.CTRL_OUT | usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_RECIPIENT_DEVICE if PYUSB_AVAILABLE else 0
|
||||
CTRL_REQUEST_TYPE_IN = usb.util.CTRL_IN | usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_RECIPIENT_DEVICE if PYUSB_AVAILABLE else 0
|
||||
|
||||
# Resource IDs
|
||||
GPO_RESID = 20
|
||||
|
||||
# Parameter indices (within resource)
|
||||
DOA_VALUE_IDX = 18 # returns (angle 0-359, vad 0/1)
|
||||
LED_EFFECT_IDX = 0 # 0=off, 1=breath, 2=rainbow, 3=solid, 4=doa, 5=ring
|
||||
LED_BRIGHTNESS_IDX = 1
|
||||
LED_COLOR_IDX = 3 # single uint32 color
|
||||
LED_RING_COLOR_IDX = 5 # 12 × uint32
|
||||
|
||||
|
||||
class XVF3800:
|
||||
"""Control a single ReSpeaker XVF3800 via USB vendor commands."""
|
||||
|
||||
def __init__(self, usb_device):
|
||||
self.dev = usb_device
|
||||
self.serial = usb_device.serial_number or "unknown"
|
||||
self.bus = usb_device.bus
|
||||
self.address = usb_device.address
|
||||
|
||||
def _read(self, resid: int, param_idx: int, length: int) -> bytes:
|
||||
"""Read parameter via USB control transfer."""
|
||||
wValue = (resid << 8) | param_idx
|
||||
try:
|
||||
data = self.dev.ctrl_transfer(CTRL_REQUEST_TYPE_IN, 0, wValue, 0, length, timeout=1000)
|
||||
return bytes(data)
|
||||
except Exception as e:
|
||||
logger.debug("USB read error (resid=%d, param=%d): %s", resid, param_idx, e)
|
||||
return b""
|
||||
|
||||
def _write(self, resid: int, param_idx: int, data: bytes):
|
||||
"""Write parameter via USB control transfer."""
|
||||
wValue = (resid << 8) | param_idx
|
||||
try:
|
||||
self.dev.ctrl_transfer(CTRL_REQUEST_TYPE_OUT, 0, wValue, 0, data, timeout=1000)
|
||||
except Exception as e:
|
||||
logger.debug("USB write error (resid=%d, param=%d): %s", resid, param_idx, e)
|
||||
|
||||
# --- DoA ---
|
||||
|
||||
def read_doa(self) -> tuple[int, bool]:
|
||||
"""Read Direction of Arrival. Returns (angle 0-359, vad True/False)."""
|
||||
data = self._read(GPO_RESID, DOA_VALUE_IDX, 4)
|
||||
if len(data) < 4:
|
||||
return 0, False
|
||||
angle, vad = struct.unpack_from("<HH", data)
|
||||
return angle % 360, bool(vad)
|
||||
|
||||
# --- LEDs ---
|
||||
|
||||
def led_off(self):
|
||||
self._write(GPO_RESID, LED_EFFECT_IDX, struct.pack("<I", 0))
|
||||
|
||||
def led_solid(self, color: int):
|
||||
"""Solid color on all LEDs. color is 0xRRGGBB."""
|
||||
self._write(GPO_RESID, LED_COLOR_IDX, struct.pack("<I", color))
|
||||
self._write(GPO_RESID, LED_EFFECT_IDX, struct.pack("<I", 3))
|
||||
|
||||
def led_breath(self, color: int, brightness: int = 128):
|
||||
"""Breathing effect."""
|
||||
self._write(GPO_RESID, LED_COLOR_IDX, struct.pack("<I", color))
|
||||
self._write(GPO_RESID, LED_BRIGHTNESS_IDX, struct.pack("<I", brightness))
|
||||
self._write(GPO_RESID, LED_EFFECT_IDX, struct.pack("<I", 1))
|
||||
|
||||
def led_doa(self, base_color: int = 0x003333, doa_color: int = 0x00FFFF):
|
||||
"""DoA indicator mode — shows beam direction on LED ring."""
|
||||
# LED_DOA_COLOR takes two uint32 values: base + indicator
|
||||
data = struct.pack("<II", base_color, doa_color)
|
||||
self._write(GPO_RESID, 4, data) # param 4 = LED_DOA_COLOR
|
||||
self._write(GPO_RESID, LED_EFFECT_IDX, struct.pack("<I", 4))
|
||||
|
||||
def led_rainbow(self, brightness: int = 128):
|
||||
self._write(GPO_RESID, LED_BRIGHTNESS_IDX, struct.pack("<I", brightness))
|
||||
self._write(GPO_RESID, LED_EFFECT_IDX, struct.pack("<I", 2))
|
||||
|
||||
|
||||
class XVF3800Manager:
|
||||
"""Manage two XVF3800 arrays, identified by USB serial number."""
|
||||
|
||||
def __init__(self):
|
||||
self.left: Optional[XVF3800] = None
|
||||
self.right: Optional[XVF3800] = None
|
||||
self._serials: dict[str, str] = {} # {"left": "SN...", "right": "SN..."}
|
||||
|
||||
def set_serial_mapping(self, left_serial: str, right_serial: str):
|
||||
"""Pin left/right assignment by USB serial number."""
|
||||
self._serials = {"left": left_serial, "right": right_serial}
|
||||
|
||||
def discover(self) -> list[XVF3800]:
|
||||
"""Find all connected XVF3800 devices."""
|
||||
if not PYUSB_AVAILABLE:
|
||||
logger.warning("pyusb not installed — XVF3800 control disabled")
|
||||
return []
|
||||
devices = []
|
||||
for dev in usb.core.find(idVendor=VID, idProduct=PID, find_all=True):
|
||||
try:
|
||||
devices.append(XVF3800(dev))
|
||||
except Exception as e:
|
||||
logger.warning("Failed to init XVF3800 at bus %d addr %d: %s",
|
||||
dev.bus, dev.address, e)
|
||||
return devices
|
||||
|
||||
def assign(self):
|
||||
"""Discover devices and assign left/right based on serial mapping."""
|
||||
devices = self.discover()
|
||||
logger.info("Found %d XVF3800 device(s): %s",
|
||||
len(devices), [d.serial for d in devices])
|
||||
|
||||
if self._serials:
|
||||
for dev in devices:
|
||||
if dev.serial == self._serials.get("left"):
|
||||
self.left = dev
|
||||
elif dev.serial == self._serials.get("right"):
|
||||
self.right = dev
|
||||
if not self.left:
|
||||
logger.warning("Left XVF3800 (serial %s) not found", self._serials.get("left"))
|
||||
if not self.right:
|
||||
logger.warning("Right XVF3800 (serial %s) not found", self._serials.get("right"))
|
||||
else:
|
||||
# No serial mapping — assign by bus address order (unstable, but works for --learn)
|
||||
devices.sort(key=lambda d: (d.bus, d.address))
|
||||
if len(devices) >= 1:
|
||||
self.left = devices[0]
|
||||
if len(devices) >= 2:
|
||||
self.right = devices[1]
|
||||
|
||||
if self.left:
|
||||
logger.info("Left ear: serial=%s bus=%d addr=%d", self.left.serial, self.left.bus, self.left.address)
|
||||
if self.right:
|
||||
logger.info("Right ear: serial=%s bus=%d addr=%d", self.right.serial, self.right.bus, self.right.address)
|
||||
|
||||
def serial_to_alsa(self, serial: str) -> Optional[str]:
|
||||
"""Find the ALSA card name for a device with a given USB serial number.
|
||||
Searches /proc/asound/cards and matches via sysfs."""
|
||||
import os, glob
|
||||
# Walk /sys/class/sound/card*/device -> look for matching USB serial
|
||||
for card_dir in sorted(glob.glob("/sys/class/sound/card*")):
|
||||
card_num = os.path.basename(card_dir).replace("card", "")
|
||||
# Follow the device symlink up to the USB device
|
||||
device_path = os.path.join(card_dir, "device")
|
||||
if not os.path.islink(device_path):
|
||||
continue
|
||||
usb_path = os.path.realpath(device_path)
|
||||
serial_file = os.path.join(usb_path, "..", "serial")
|
||||
if not os.path.exists(serial_file):
|
||||
serial_file = os.path.join(usb_path, "..", "..", "serial")
|
||||
if os.path.exists(serial_file):
|
||||
try:
|
||||
dev_serial = open(serial_file).read().strip()
|
||||
if dev_serial == serial:
|
||||
# Read the card ID (ALSA name)
|
||||
id_file = os.path.join(card_dir, "id")
|
||||
if os.path.exists(id_file):
|
||||
return open(id_file).read().strip()
|
||||
return card_num
|
||||
except Exception:
|
||||
pass
|
||||
return None
|
||||
|
||||
def get_alsa_devices(self) -> dict[str, Optional[str]]:
|
||||
"""Return {"left": "plughw:Array,0", "right": "plughw:Array_1,0"} or similar."""
|
||||
result = {}
|
||||
for label, dev in [("left", self.left), ("right", self.right)]:
|
||||
if dev:
|
||||
card_name = self.serial_to_alsa(dev.serial)
|
||||
result[label] = f"plughw:{card_name},0" if card_name else None
|
||||
else:
|
||||
result[label] = None
|
||||
return result
|
||||
|
||||
# --- Convenience: control both arrays ---
|
||||
|
||||
def all_leds_off(self):
|
||||
for dev in [self.left, self.right]:
|
||||
if dev:
|
||||
dev.led_off()
|
||||
|
||||
def all_leds_solid(self, color: int):
|
||||
for dev in [self.left, self.right]:
|
||||
if dev:
|
||||
dev.led_solid(color)
|
||||
|
||||
def all_leds_breath(self, color: int, brightness: int = 128):
|
||||
for dev in [self.left, self.right]:
|
||||
if dev:
|
||||
dev.led_breath(color, brightness)
|
||||
|
||||
def all_leds_doa(self):
|
||||
for dev in [self.left, self.right]:
|
||||
if dev:
|
||||
dev.led_doa()
|
||||
|
||||
def read_both_doa(self) -> dict:
|
||||
"""Read DoA from both arrays."""
|
||||
result = {}
|
||||
for label, dev in [("left", self.left), ("right", self.right)]:
|
||||
if dev:
|
||||
angle, vad = dev.read_doa()
|
||||
result[label] = {"angle": angle, "vad": vad}
|
||||
else:
|
||||
result[label] = None
|
||||
return result
|
||||
|
||||
|
||||
def learn_devices() -> dict:
|
||||
"""Discover connected XVF3800 devices and return their serials for config."""
|
||||
mgr = XVF3800Manager()
|
||||
mgr.assign()
|
||||
result = {}
|
||||
if mgr.left:
|
||||
result["left"] = {"usb_serial": mgr.left.serial}
|
||||
alsa = mgr.serial_to_alsa(mgr.left.serial)
|
||||
if alsa:
|
||||
result["left"]["alsa_card"] = alsa
|
||||
if mgr.right:
|
||||
result["right"] = {"usb_serial": mgr.right.serial}
|
||||
alsa = mgr.serial_to_alsa(mgr.right.serial)
|
||||
if alsa:
|
||||
result["right"]["alsa_card"] = alsa
|
||||
return result
|
||||
|
||||
|
||||
# === CLI test ===
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
if "--learn" in sys.argv:
|
||||
info = learn_devices()
|
||||
import json
|
||||
print(json.dumps(info, indent=2))
|
||||
sys.exit(0)
|
||||
|
||||
if "--test-doa" in sys.argv:
|
||||
mgr = XVF3800Manager()
|
||||
mgr.assign()
|
||||
for _ in range(50):
|
||||
doa = mgr.read_both_doa()
|
||||
print(f"DoA: left={doa.get('left')} right={doa.get('right')}", end="\r")
|
||||
time.sleep(0.1)
|
||||
print()
|
||||
sys.exit(0)
|
||||
|
||||
if "--test-leds" in sys.argv:
|
||||
mgr = XVF3800Manager()
|
||||
mgr.assign()
|
||||
for color, name in [(0xFF0000, "red"), (0x00FF00, "green"), (0x0000FF, "blue"),
|
||||
(0x00FFFF, "cyan"), (0x9400D3, "purple")]:
|
||||
print(f" {name}")
|
||||
mgr.all_leds_solid(color)
|
||||
time.sleep(1)
|
||||
mgr.all_leds_off()
|
||||
sys.exit(0)
|
||||
Reference in New Issue
Block a user