Files
headmic/xvf3800.py
2026-04-11 15:58:56 -05:00

305 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
XVF3800 USB Control — DoA, LEDs, device identification.
Each ReSpeaker XVF3800 4-Mic Array is controlled via USB vendor commands (PyUSB).
Replaces the old pixel_ring / Tuning interface used by the XVF3000.
Reference: https://github.com/respeaker/reSpeaker_XVF3800_USB_4MIC_ARRAY/blob/master/python_control/xvf_host.py
"""
import logging
import struct
import time
from typing import Optional
try:
import usb.core
import usb.util
PYUSB_AVAILABLE = True
except ImportError:
PYUSB_AVAILABLE = False
logger = logging.getLogger("headmic.xvf3800")
VID = 0x2886
PID = 0x001A
# USB vendor control transfer parameters
CTRL_REQUEST_TYPE_OUT = usb.util.CTRL_OUT | usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_RECIPIENT_DEVICE if PYUSB_AVAILABLE else 0
CTRL_REQUEST_TYPE_IN = usb.util.CTRL_IN | usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_RECIPIENT_DEVICE if PYUSB_AVAILABLE else 0
# Resource IDs (wIndex in USB control transfer)
GPO_RESID = 20
# Command IDs (wValue in USB control transfer)
# Verified against official xvf_host.py WriteCMD output
DOA_VALUE_CMD = 18 # returns (angle 0-359, vad 0/1)
LED_EFFECT_CMD = 12 # 0=off, 1=breath, 2=rainbow, 3=solid, 4=doa, 5=ring
LED_BRIGHTNESS_CMD = 14
LED_COLOR_CMD = 16 # single uint32 color (confirmed: xvf_host LED_COLOR cmdid=16)
LED_DOA_COLOR_CMD = 17 # two uint32 values: base + indicator
LED_RING_COLOR_CMD = 18 # 12 × uint32
class XVF3800:
"""Control a single ReSpeaker XVF3800 via USB vendor commands."""
def __init__(self, usb_device):
self.dev = usb_device
self.serial = usb_device.serial_number or "unknown"
self.bus = usb_device.bus
self.address = usb_device.address
def _read(self, resid: int, cmdid: int, length: int) -> bytes:
"""Read parameter via USB control transfer.
wValue = cmdid | 0x80 (read flag), wIndex = resid."""
try:
data = self.dev.ctrl_transfer(
CTRL_REQUEST_TYPE_IN, 0,
0x80 | cmdid, # wValue: cmdid with read bit
resid, # wIndex: resource ID
length * 4, # bytes to read (length is in words)
timeout=1000
)
return bytes(data)
except Exception as e:
logger.debug("USB read error (resid=%d, cmd=%d): %s", resid, cmdid, e)
return b""
def _write(self, resid: int, cmdid: int, data: bytes):
"""Write parameter via USB control transfer.
wValue = cmdid, wIndex = resid."""
try:
self.dev.ctrl_transfer(
CTRL_REQUEST_TYPE_OUT, 0,
cmdid, # wValue: command ID
resid, # wIndex: resource ID
data,
timeout=1000
)
except Exception as e:
logger.debug("USB write error (resid=%d, cmd=%d): %s", resid, cmdid, e)
# --- DoA ---
def read_doa(self) -> tuple[int, bool]:
"""Read Direction of Arrival. Returns (angle 0-359, vad True/False)."""
data = self._read(GPO_RESID, DOA_VALUE_CMD, 2) # 2 uint16 words
if len(data) < 4:
return 0, False
angle, vad = struct.unpack_from("<HH", data)
return angle % 360, bool(vad)
# --- LEDs ---
def led_off(self):
self._write(GPO_RESID, LED_EFFECT_CMD, struct.pack("<I", 0))
@staticmethod
def _color_bytes(color: int) -> bytes:
"""Convert 0xRRGGBB to [R, G, B, 0] payload."""
r = (color >> 16) & 0xFF
g = (color >> 8) & 0xFF
b = color & 0xFF
return bytes([r, g, b, 0])
def led_solid(self, color: int):
"""Solid color on all LEDs. color is 0xRRGGBB."""
self._write(GPO_RESID, LED_COLOR_CMD, self._color_bytes(color))
self._write(GPO_RESID, LED_EFFECT_CMD, struct.pack("<I", 3))
def led_breath(self, color: int, brightness: int = 128):
"""Breathing effect."""
self._write(GPO_RESID, LED_COLOR_CMD, self._color_bytes(color))
self._write(GPO_RESID, LED_BRIGHTNESS_CMD, struct.pack("<I", brightness))
self._write(GPO_RESID, LED_EFFECT_CMD, struct.pack("<I", 1))
def led_doa(self, base_color: int = 0x003333, doa_color: int = 0x00FFFF):
"""DoA indicator mode — shows beam direction on LED ring."""
data = self._color_bytes(base_color) + self._color_bytes(doa_color)
self._write(GPO_RESID, LED_DOA_COLOR_CMD, data)
self._write(GPO_RESID, LED_EFFECT_CMD, struct.pack("<I", 4))
def led_rainbow(self, brightness: int = 128):
self._write(GPO_RESID, LED_BRIGHTNESS_CMD, struct.pack("<I", brightness))
self._write(GPO_RESID, LED_EFFECT_CMD, struct.pack("<I", 2))
class XVF3800Manager:
"""Manage two XVF3800 arrays, identified by USB serial number."""
def __init__(self):
self.left: Optional[XVF3800] = None
self.right: Optional[XVF3800] = None
self._serials: dict[str, str] = {} # {"left": "SN...", "right": "SN..."}
def set_serial_mapping(self, left_serial: str, right_serial: str):
"""Pin left/right assignment by USB serial number."""
self._serials = {"left": left_serial, "right": right_serial}
def discover(self) -> list[XVF3800]:
"""Find all connected XVF3800 devices."""
if not PYUSB_AVAILABLE:
logger.warning("pyusb not installed — XVF3800 control disabled")
return []
devices = []
for dev in usb.core.find(idVendor=VID, idProduct=PID, find_all=True):
try:
devices.append(XVF3800(dev))
except Exception as e:
logger.warning("Failed to init XVF3800 at bus %d addr %d: %s",
dev.bus, dev.address, e)
return devices
def assign(self):
"""Discover devices and assign left/right based on serial mapping."""
devices = self.discover()
logger.info("Found %d XVF3800 device(s): %s",
len(devices), [d.serial for d in devices])
if self._serials:
for dev in devices:
if dev.serial == self._serials.get("left"):
self.left = dev
elif dev.serial == self._serials.get("right"):
self.right = dev
if not self.left:
logger.warning("Left XVF3800 (serial %s) not found", self._serials.get("left"))
if not self.right:
logger.warning("Right XVF3800 (serial %s) not found", self._serials.get("right"))
else:
# No serial mapping — assign by bus address order (unstable, but works for --learn)
devices.sort(key=lambda d: (d.bus, d.address))
if len(devices) >= 1:
self.left = devices[0]
if len(devices) >= 2:
self.right = devices[1]
if self.left:
logger.info("Left ear: serial=%s bus=%d addr=%d", self.left.serial, self.left.bus, self.left.address)
if self.right:
logger.info("Right ear: serial=%s bus=%d addr=%d", self.right.serial, self.right.bus, self.right.address)
def serial_to_alsa(self, serial: str) -> Optional[str]:
"""Find the ALSA card name for a device with a given USB serial number.
Searches /proc/asound/cards and matches via sysfs."""
import os, glob
# Walk /sys/class/sound/card*/device -> look for matching USB serial
for card_dir in sorted(glob.glob("/sys/class/sound/card*")):
card_num = os.path.basename(card_dir).replace("card", "")
# Follow the device symlink up to the USB device
device_path = os.path.join(card_dir, "device")
if not os.path.islink(device_path):
continue
usb_path = os.path.realpath(device_path)
serial_file = os.path.join(usb_path, "..", "serial")
if not os.path.exists(serial_file):
serial_file = os.path.join(usb_path, "..", "..", "serial")
if os.path.exists(serial_file):
try:
dev_serial = open(serial_file).read().strip()
if dev_serial == serial:
# Read the card ID (ALSA name)
id_file = os.path.join(card_dir, "id")
if os.path.exists(id_file):
return open(id_file).read().strip()
return card_num
except Exception:
pass
return None
def get_alsa_devices(self) -> dict[str, Optional[str]]:
"""Return {"left": "plughw:Array,0", "right": "plughw:Array_1,0"} or similar."""
result = {}
for label, dev in [("left", self.left), ("right", self.right)]:
if dev:
card_name = self.serial_to_alsa(dev.serial)
result[label] = f"plughw:{card_name},0" if card_name else None
else:
result[label] = None
return result
# --- Convenience: control both arrays ---
def all_leds_off(self):
for dev in [self.left, self.right]:
if dev:
dev.led_off()
def all_leds_solid(self, color: int):
for dev in [self.left, self.right]:
if dev:
dev.led_solid(color)
def all_leds_breath(self, color: int, brightness: int = 128):
for dev in [self.left, self.right]:
if dev:
dev.led_breath(color, brightness)
def all_leds_doa(self):
for dev in [self.left, self.right]:
if dev:
dev.led_doa()
def read_both_doa(self) -> dict:
"""Read DoA from both arrays."""
result = {}
for label, dev in [("left", self.left), ("right", self.right)]:
if dev:
angle, vad = dev.read_doa()
result[label] = {"angle": angle, "vad": vad}
else:
result[label] = None
return result
def learn_devices() -> dict:
"""Discover connected XVF3800 devices and return their serials for config."""
mgr = XVF3800Manager()
mgr.assign()
result = {}
if mgr.left:
result["left"] = {"usb_serial": mgr.left.serial}
alsa = mgr.serial_to_alsa(mgr.left.serial)
if alsa:
result["left"]["alsa_card"] = alsa
if mgr.right:
result["right"] = {"usb_serial": mgr.right.serial}
alsa = mgr.serial_to_alsa(mgr.right.serial)
if alsa:
result["right"]["alsa_card"] = alsa
return result
# === CLI test ===
if __name__ == "__main__":
import sys
logging.basicConfig(level=logging.INFO)
if "--learn" in sys.argv:
info = learn_devices()
import json
print(json.dumps(info, indent=2))
sys.exit(0)
if "--test-doa" in sys.argv:
mgr = XVF3800Manager()
mgr.assign()
for _ in range(50):
doa = mgr.read_both_doa()
print(f"DoA: left={doa.get('left')} right={doa.get('right')}", end="\r")
time.sleep(0.1)
print()
sys.exit(0)
if "--test-leds" in sys.argv:
mgr = XVF3800Manager()
mgr.assign()
for color, name in [(0xFF0000, "red"), (0x00FF00, "green"), (0x0000FF, "blue"),
(0x00FFFF, "cyan"), (0x9400D3, "purple")]:
print(f" {name}")
mgr.all_leds_solid(color)
time.sleep(1)
mgr.all_leds_off()
sys.exit(0)