diff --git a/ear_mcp.py b/ear_mcp.py index 4954a0c..09cbb9a 100644 --- a/ear_mcp.py +++ b/ear_mcp.py @@ -21,6 +21,7 @@ mcp = FastMCP("Ear STT Transcriber") # Configuration from environment EARTAIL_BASE_URL = os.getenv("EARTAIL_BASE_URL", "http://bigorin.local:8764") +HEADMIC_BASE_URL = os.getenv("HEADMIC_BASE_URL", "http://head-vixy.local:8446") DEFAULT_POLL_INTERVAL = 2 # seconds DEFAULT_TIMEOUT = 300 # seconds (5 minutes - transcription is usually fast) @@ -178,5 +179,173 @@ async def ear_health() -> dict: return {"status": "error", "error": str(e)} +# ============================================================================ +# HeadMic - Sound Classification & Speaker Identification +# ============================================================================ + +@mcp.tool() +async def ear_sounds() -> dict: + """ + Get current audio scene from HeadMic's sound classifier. + + Returns what the microphone is currently hearing: speech, music, alert, + animal, household sounds, environment, or silence. Also includes speaker + identification if someone is talking. + + Returns: + Dictionary with category, top_classes, dominant_category, + recognized_speaker, speaker_confidence + """ + async with httpx.AsyncClient(timeout=10.0) as client: + try: + response = await client.get(f"{HEADMIC_BASE_URL}/sounds") + response.raise_for_status() + return response.json() + except httpx.HTTPStatusError as e: + if e.response.status_code == 503: + return {"error": "Sound classification not available on HeadMic"} + raise + except Exception as e: + return {"error": f"HeadMic unavailable: {e}"} + + +@mcp.tool() +async def ear_speakers() -> dict: + """ + List all enrolled speakers for voice identification. + + Returns: + Dictionary with speaker names and their enrollment sample counts + """ + async with httpx.AsyncClient(timeout=10.0) as client: + try: + response = await client.get(f"{HEADMIC_BASE_URL}/speakers") + response.raise_for_status() + return response.json() + except httpx.HTTPStatusError as e: + if e.response.status_code == 503: + return {"error": "Speaker recognition not available on HeadMic"} + raise + except Exception as e: + return {"error": f"HeadMic unavailable: {e}"} + + +@mcp.tool() +async def ear_enroll_speaker(name: str) -> dict: + """ + Enroll a speaker by recording 5 seconds from the live microphone. + + The person should speak naturally during the recording. The ReSpeaker LEDs + will pulse orange while recording. Multiple enrollments of the same person + improve recognition accuracy. + + Args: + name: Name to associate with the voice (e.g. "Alex") + + Returns: + Enrollment result with duration and updated speaker list + """ + async with httpx.AsyncClient(timeout=15.0) as client: + try: + print(f"🎙️ Recording 5 seconds for '{name}'...") + response = await client.post( + f"{HEADMIC_BASE_URL}/speakers/enroll-from-mic", + params={"name": name}, + ) + response.raise_for_status() + result = response.json() + print(f"✓ Enrolled '{name}' ({result.get('seconds', '?')}s)") + return result + except httpx.HTTPStatusError as e: + if e.response.status_code == 503: + return {"error": "Speaker recognition not available on HeadMic"} + if e.response.status_code == 409: + return {"error": "Enrollment already in progress"} + raise + except Exception as e: + return {"error": f"HeadMic unavailable: {e}"} + + +@mcp.tool() +async def ear_enroll_speaker_from_file(name: str, audio_path: str) -> dict: + """ + Enroll a speaker from an audio file. + + Args: + name: Name to associate with the voice (e.g. "Alex") + audio_path: Path to audio file with the person speaking + + Returns: + Enrollment result with updated speaker list + """ + path = Path(audio_path).expanduser() + if not path.exists(): + raise FileNotFoundError(f"Audio file not found: {audio_path}") + + async with httpx.AsyncClient(timeout=15.0) as client: + try: + with open(path, "rb") as f: + response = await client.post( + f"{HEADMIC_BASE_URL}/speakers/enroll", + data={"name": name}, + files={"audio": (path.name, f, "audio/wav")}, + ) + response.raise_for_status() + result = response.json() + print(f"✓ Enrolled '{name}' from {path.name}") + return result + except httpx.HTTPStatusError as e: + if e.response.status_code == 503: + return {"error": "Speaker recognition not available on HeadMic"} + raise + except Exception as e: + return {"error": f"HeadMic unavailable: {e}"} + + +@mcp.tool() +async def ear_delete_speaker(name: str) -> dict: + """ + Remove an enrolled speaker and all their voice samples. + + Args: + name: Name of the speaker to remove + + Returns: + Deletion result with number of samples removed + """ + async with httpx.AsyncClient(timeout=10.0) as client: + try: + response = await client.delete(f"{HEADMIC_BASE_URL}/speakers/{name}") + response.raise_for_status() + result = response.json() + print(f"✓ Deleted speaker '{name}' ({result.get('samples_removed', 0)} samples)") + return result + except httpx.HTTPStatusError as e: + if e.response.status_code == 503: + return {"error": "Speaker recognition not available on HeadMic"} + if e.response.status_code == 404: + return {"error": f"Speaker '{name}' not found"} + raise + except Exception as e: + return {"error": f"HeadMic unavailable: {e}"} + + +@mcp.tool() +async def ear_headmic_status() -> dict: + """ + Get full HeadMic status: wake word, audio scene, and speaker identification. + + Returns: + Dictionary with listening state, audio scene, recognized speaker, etc. + """ + async with httpx.AsyncClient(timeout=10.0) as client: + try: + response = await client.get(f"{HEADMIC_BASE_URL}/status") + response.raise_for_status() + return response.json() + except Exception as e: + return {"error": f"HeadMic unavailable: {e}"} + + if __name__ == "__main__": mcp.run()