Files
headmic/xvf3800.py
Alex 38d21ef53c Add multi-speaker tracking with beam steering (#5)
multi_speaker.py: Tracks up to 2 speakers simultaneously. When 2 distinct
DoA angles are detected (30°+ apart) for >1s, locks the XVF3800's fixed
beams onto each speaker. Releases back to auto mode when only 1 speaker
remains (3s timeout). Manages beam gating so only the speaking beam is active.

xvf3800.py: Added beam steering commands — enable_fixed_beams(),
set_beam_azimuths(), enable_beam_gating(), read_all_beams().
Manager gets steer_beams() and release_beams() convenience methods.

headmic.py: Wire multi-speaker tracker into DoA loop. New endpoint:
GET /speakers/tracked — current speaker positions, beam mode, lock state.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 21:37:49 -05:00

400 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
XVF3800 USB Control — DoA, LEDs, device identification.
Each ReSpeaker XVF3800 4-Mic Array is controlled via USB vendor commands (PyUSB).
Replaces the old pixel_ring / Tuning interface used by the XVF3000.
Reference: https://github.com/respeaker/reSpeaker_XVF3800_USB_4MIC_ARRAY/blob/master/python_control/xvf_host.py
"""
import logging
import struct
import time
from typing import Optional
try:
import usb.core
import usb.util
PYUSB_AVAILABLE = True
except ImportError:
PYUSB_AVAILABLE = False
logger = logging.getLogger("headmic.xvf3800")
VID = 0x2886
PID = 0x001A
# USB vendor control transfer parameters
CTRL_REQUEST_TYPE_OUT = usb.util.CTRL_OUT | usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_RECIPIENT_DEVICE if PYUSB_AVAILABLE else 0
CTRL_REQUEST_TYPE_IN = usb.util.CTRL_IN | usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_RECIPIENT_DEVICE if PYUSB_AVAILABLE else 0
# Resource IDs (wIndex in USB control transfer)
GPO_RESID = 20
# Command IDs (wValue in USB control transfer)
# Verified against official xvf_host.py WriteCMD output
DOA_VALUE_CMD = 18 # returns (angle 0-359, vad 0/1) — simple, sluggish
LED_EFFECT_CMD = 12 # 0=off, 1=breath, 2=rainbow, 3=solid, 4=doa, 5=ring
# AEC resource (resid=33) — beamformer-level commands, more responsive
AEC_RESID = 33
AEC_AZIMUTH_CMD = 75 # 4 floats: beam1, beam2, free-running, auto-select (radians)
AEC_SPENERGY_CMD = 80 # 4 floats: speech energy per beam (>0 = speech)
# Beam steering (resid=33)
AEC_FIXED_BEAMS_ON_CMD = 37 # rw uint8: 0=off, 1=on — enable fixed beam mode
AEC_FIXED_BEAMS_AZ_CMD = 81 # rw 2 floats (radians): beam 1 azimuth, beam 2 azimuth
AEC_FIXED_BEAMS_ELEV_CMD = 82 # rw 2 floats (radians): beam 1 elevation, beam 2 elevation
AEC_FIXED_BEAMS_GATING_CMD = 83 # rw uint8: 0=off, 1=on — silence beams without speech
AUDIO_MGR_RESID = 35
AUDIO_MGR_SELECTED_AZ_CMD = 11 # 2 floats: processed DoA, auto-select DoA (radians)
AUDIO_MGR_SELECTED_CH_CMD = 12 # rw 2 uint8: which beam goes to L/R output channel
LED_BRIGHTNESS_CMD = 14
LED_COLOR_CMD = 16 # single uint32 color (confirmed: xvf_host LED_COLOR cmdid=16)
LED_DOA_COLOR_CMD = 17 # two uint32 values: base + indicator
LED_RING_COLOR_CMD = 18 # 12 × uint32
class XVF3800:
"""Control a single ReSpeaker XVF3800 via USB vendor commands."""
def __init__(self, usb_device):
self.dev = usb_device
self.serial = usb_device.serial_number or "unknown"
self.bus = usb_device.bus
self.address = usb_device.address
# Vendor control transfers work without claiming an interface —
# don't call set_configuration() or detach_kernel_driver(),
# the audio driver needs to keep its interface.
def _read_uint16(self, resid: int, cmdid: int, count: int) -> bytes:
"""Read uint16 parameters. Returns raw bytes including 1-byte status header."""
try:
length = count * 2 + 1 # +1 for status byte
data = self.dev.ctrl_transfer(
CTRL_REQUEST_TYPE_IN, 0,
0x80 | cmdid, resid, length, timeout=1000)
return bytes(data)
except Exception as e:
logger.debug("USB read error (resid=%d, cmd=%d): %s", resid, cmdid, e)
return b""
def _read_float(self, resid: int, cmdid: int, count: int) -> bytes:
"""Read float32 parameters. Returns raw bytes including 1-byte status header."""
try:
length = count * 4 + 1 # +1 for status byte
data = self.dev.ctrl_transfer(
CTRL_REQUEST_TYPE_IN, 0,
0x80 | cmdid, resid, length, timeout=1000)
return bytes(data)
except Exception as e:
logger.debug("USB read error (resid=%d, cmd=%d): %s", resid, cmdid, e)
return b""
def _write(self, resid: int, cmdid: int, data: bytes):
"""Write parameter via USB control transfer.
wValue = cmdid, wIndex = resid."""
try:
self.dev.ctrl_transfer(
CTRL_REQUEST_TYPE_OUT, 0,
cmdid, # wValue: command ID
resid, # wIndex: resource ID
data,
timeout=1000
)
except Exception as e:
logger.debug("USB write error (resid=%d, cmd=%d): %s", resid, cmdid, e)
# --- DoA ---
def read_doa(self) -> tuple[int, bool]:
"""Read Direction of Arrival using the auto-select beam azimuth.
Returns (angle 0-359 degrees, vad True/False).
Uses AUDIO_MGR_SELECTED_AZIMUTHS which returns two values:
[0] processed_doa — NaN when no speech, real angle when speech detected (= VAD)
[1] auto_select_doa — always tracks strongest source, even noise (= angle)
"""
import math
data = self._read_float(AUDIO_MGR_RESID, AUDIO_MGR_SELECTED_AZ_CMD, 2)
if len(data) < 9: # 1 status + 2*4 bytes
return 0, False
processed_doa, auto_select_doa = struct.unpack_from("<ff", data, 1)
# VAD: processed_doa is NaN when no speech, real value when speech
vad = not math.isnan(processed_doa)
# Angle: prefer auto-select beam (always has a value)
if not math.isnan(auto_select_doa):
angle_deg = math.degrees(auto_select_doa) % 360
elif vad:
angle_deg = math.degrees(processed_doa) % 360
else:
return 0, False
return int(angle_deg) % 360, vad
def read_all_beams(self) -> dict:
"""Read all 4 beam azimuths: beam1, beam2, free-running, auto-select."""
import math
data = self._read_float(AEC_RESID, AEC_AZIMUTH_CMD, 4)
if len(data) < 17: # 1 status + 4*4 bytes
return {}
beams = struct.unpack_from("<ffff", data, 1)
return {
"beam1_deg": round(math.degrees(beams[0]) % 360, 1),
"beam2_deg": round(math.degrees(beams[1]) % 360, 1),
"free_running_deg": round(math.degrees(beams[2]) % 360, 1),
"auto_select_deg": round(math.degrees(beams[3]) % 360, 1),
}
# --- Beam steering ---
def enable_fixed_beams(self, on: bool = True):
"""Enable/disable fixed beam mode. When on, beams lock to set azimuths."""
self._write(AEC_RESID, AEC_FIXED_BEAMS_ON_CMD, bytes([1 if on else 0]))
def set_beam_azimuths(self, beam1_deg: float, beam2_deg: float):
"""Set fixed beam directions in degrees (0=front, 90=right, 180=back, 270=left)."""
import math
b1_rad = math.radians(beam1_deg)
b2_rad = math.radians(beam2_deg)
self._write(AEC_RESID, AEC_FIXED_BEAMS_AZ_CMD, struct.pack("<ff", b1_rad, b2_rad))
def enable_beam_gating(self, on: bool = True):
"""Enable/disable beam gating. When on, only the beam with speech is active."""
self._write(AEC_RESID, AEC_FIXED_BEAMS_GATING_CMD, bytes([1 if on else 0]))
def disable_fixed_beams(self):
"""Return to auto beam mode."""
self.enable_fixed_beams(False)
# --- LEDs ---
def led_off(self):
self._write(GPO_RESID, LED_EFFECT_CMD, bytes([0]))
@staticmethod
def _color_bytes(color: int) -> bytes:
"""Convert 0xRRGGBB to [R, G, B, 0] payload."""
r = (color >> 16) & 0xFF
g = (color >> 8) & 0xFF
b = color & 0xFF
return bytes([r, g, b, 0])
def led_solid(self, color: int):
"""Solid color on all LEDs. color is 0xRRGGBB."""
self._write(GPO_RESID, LED_COLOR_CMD, self._color_bytes(color))
self._write(GPO_RESID, LED_EFFECT_CMD, bytes([3]))
def led_breath(self, color: int, brightness: int = 128):
"""Breathing effect."""
self._write(GPO_RESID, LED_COLOR_CMD, self._color_bytes(color))
self._write(GPO_RESID, LED_BRIGHTNESS_CMD, bytes([brightness]))
self._write(GPO_RESID, LED_EFFECT_CMD, bytes([1]))
def led_doa(self, base_color: int = 0x003333, doa_color: int = 0x00FFFF):
"""DoA indicator mode — shows beam direction on LED ring."""
data = self._color_bytes(base_color) + self._color_bytes(doa_color)
self._write(GPO_RESID, LED_DOA_COLOR_CMD, data)
self._write(GPO_RESID, LED_EFFECT_CMD, bytes([4]))
def led_rainbow(self, brightness: int = 128):
self._write(GPO_RESID, LED_BRIGHTNESS_CMD, bytes([brightness]))
self._write(GPO_RESID, LED_EFFECT_CMD, bytes([2]))
class XVF3800Manager:
"""Manage two XVF3800 arrays, identified by USB serial number."""
def __init__(self):
self.left: Optional[XVF3800] = None
self.right: Optional[XVF3800] = None
self._serials: dict[str, str] = {} # {"left": "SN...", "right": "SN..."}
def set_serial_mapping(self, left_serial: str, right_serial: str):
"""Pin left/right assignment by USB serial number."""
self._serials = {"left": left_serial, "right": right_serial}
def discover(self) -> list[XVF3800]:
"""Find all connected XVF3800 devices."""
if not PYUSB_AVAILABLE:
logger.warning("pyusb not installed — XVF3800 control disabled")
return []
devices = []
for dev in usb.core.find(idVendor=VID, idProduct=PID, find_all=True):
try:
devices.append(XVF3800(dev))
except Exception as e:
logger.warning("Failed to init XVF3800 at bus %d addr %d: %s",
dev.bus, dev.address, e)
return devices
def assign(self):
"""Discover devices and assign left/right based on serial mapping."""
devices = self.discover()
logger.info("Found %d XVF3800 device(s): %s",
len(devices), [d.serial for d in devices])
if self._serials:
for dev in devices:
if dev.serial == self._serials.get("left"):
self.left = dev
elif dev.serial == self._serials.get("right"):
self.right = dev
if not self.left:
logger.warning("Left XVF3800 (serial %s) not found", self._serials.get("left"))
if not self.right:
logger.warning("Right XVF3800 (serial %s) not found", self._serials.get("right"))
else:
# No serial mapping — assign by bus address order (unstable, but works for --learn)
devices.sort(key=lambda d: (d.bus, d.address))
if len(devices) >= 1:
self.left = devices[0]
if len(devices) >= 2:
self.right = devices[1]
if self.left:
logger.info("Left ear: serial=%s bus=%d addr=%d", self.left.serial, self.left.bus, self.left.address)
if self.right:
logger.info("Right ear: serial=%s bus=%d addr=%d", self.right.serial, self.right.bus, self.right.address)
def serial_to_alsa(self, serial: str) -> Optional[str]:
"""Find the ALSA card name for a device with a given USB serial number.
Searches /proc/asound/cards and matches via sysfs."""
import os, glob
# Walk /sys/class/sound/card*/device -> look for matching USB serial
for card_dir in sorted(glob.glob("/sys/class/sound/card*")):
card_num = os.path.basename(card_dir).replace("card", "")
# Follow the device symlink up to the USB device
device_path = os.path.join(card_dir, "device")
if not os.path.islink(device_path):
continue
usb_path = os.path.realpath(device_path)
serial_file = os.path.join(usb_path, "..", "serial")
if not os.path.exists(serial_file):
serial_file = os.path.join(usb_path, "..", "..", "serial")
if os.path.exists(serial_file):
try:
dev_serial = open(serial_file).read().strip()
if dev_serial == serial:
# Read the card ID (ALSA name)
id_file = os.path.join(card_dir, "id")
if os.path.exists(id_file):
return open(id_file).read().strip()
return card_num
except Exception:
pass
return None
def get_alsa_devices(self) -> dict[str, Optional[str]]:
"""Return {"left": "plughw:Array,0", "right": "plughw:Array_1,0"} or similar."""
result = {}
for label, dev in [("left", self.left), ("right", self.right)]:
if dev:
card_name = self.serial_to_alsa(dev.serial)
result[label] = f"plughw:{card_name},0" if card_name else None
else:
result[label] = None
return result
# --- Convenience: control both arrays ---
def all_leds_off(self):
for dev in [self.left, self.right]:
if dev:
dev.led_off()
def all_leds_solid(self, color: int):
for dev in [self.left, self.right]:
if dev:
dev.led_solid(color)
def all_leds_breath(self, color: int, brightness: int = 128):
for dev in [self.left, self.right]:
if dev:
dev.led_breath(color, brightness)
def all_leds_doa(self):
for dev in [self.left, self.right]:
if dev:
dev.led_doa()
def steer_beams(self, beam1_deg: float, beam2_deg: float):
"""Steer both arrays' fixed beams to the same directions."""
for dev in [self.left, self.right]:
if dev:
dev.enable_fixed_beams(True)
dev.set_beam_azimuths(beam1_deg, beam2_deg)
dev.enable_beam_gating(True)
def release_beams(self):
"""Return both arrays to auto beam mode."""
for dev in [self.left, self.right]:
if dev:
dev.disable_fixed_beams()
def read_both_doa(self) -> dict:
"""Read DoA from both arrays."""
result = {}
for label, dev in [("left", self.left), ("right", self.right)]:
if dev:
angle, vad = dev.read_doa()
result[label] = {"angle": angle, "vad": vad}
else:
result[label] = None
return result
def learn_devices() -> dict:
"""Discover connected XVF3800 devices and return their serials for config."""
mgr = XVF3800Manager()
mgr.assign()
result = {}
if mgr.left:
result["left"] = {"usb_serial": mgr.left.serial}
alsa = mgr.serial_to_alsa(mgr.left.serial)
if alsa:
result["left"]["alsa_card"] = alsa
if mgr.right:
result["right"] = {"usb_serial": mgr.right.serial}
alsa = mgr.serial_to_alsa(mgr.right.serial)
if alsa:
result["right"]["alsa_card"] = alsa
return result
# === CLI test ===
if __name__ == "__main__":
import sys
logging.basicConfig(level=logging.INFO)
if "--learn" in sys.argv:
info = learn_devices()
import json
print(json.dumps(info, indent=2))
sys.exit(0)
if "--test-doa" in sys.argv:
mgr = XVF3800Manager()
mgr.assign()
for _ in range(50):
doa = mgr.read_both_doa()
print(f"DoA: left={doa.get('left')} right={doa.get('right')}", end="\r")
time.sleep(0.1)
print()
sys.exit(0)
if "--test-leds" in sys.argv:
mgr = XVF3800Manager()
mgr.assign()
for color, name in [(0xFF0000, "red"), (0x00FF00, "green"), (0x0000FF, "blue"),
(0x00FFFF, "cyan"), (0x9400D3, "purple")]:
print(f" {name}")
mgr.all_leds_solid(color)
time.sleep(1)
mgr.all_leds_off()
sys.exit(0)