Listens on UDP :8782 for bare integer amplitude values (0-100), forwards directly to Pico serial as fire-and-forget jaw commands. Runs as daemon thread alongside the HTTP server. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
218 lines
6.8 KiB
Python
218 lines
6.8 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Vixy Light Service - HTTP-to-Serial Bridge
|
|
|
|
Thin bridge between HTTP API and Pico 2 animation engine.
|
|
All animations run on the Pico. This service just forwards mode commands.
|
|
|
|
Hardware:
|
|
- Raspberry Pi Pico 2 connected via USB (/dev/ttyACM0)
|
|
- Pico runs all LED animations locally
|
|
|
|
API:
|
|
GET /health - Service health
|
|
GET /state - Current state
|
|
POST /state - Set state {"state": "listening"}
|
|
POST /jaw/level - Set jaw level {"level": 0-100}
|
|
POST /jaw/mode - Set jaw mode {"mode": "talking"}
|
|
UDP :8782 - Jaw amplitude (bare integer, fire-and-forget lip sync)
|
|
"""
|
|
|
|
import time
|
|
import socket
|
|
import threading
|
|
import json
|
|
import signal
|
|
from http.server import HTTPServer, BaseHTTPRequestHandler
|
|
import serial
|
|
|
|
# === Configuration ===
|
|
HTTP_PORT = 8781
|
|
UDP_PORT = 8782
|
|
SERIAL_PORT = "/dev/ttyACM0"
|
|
SERIAL_BAUD = 115200
|
|
|
|
VALID_STATES = ["idle", "listening", "responding", "pleasure", "thinking", "playful", "commanding", "love", "sleep"]
|
|
VALID_JAW_MODES = ["idle", "talking", "off"]
|
|
|
|
# === Shared State ===
|
|
current_state = "idle"
|
|
jaw_level = 0
|
|
serial_lock = threading.Lock()
|
|
running = True
|
|
ser = None
|
|
|
|
# === Serial Communication ===
|
|
def init_serial():
|
|
global ser
|
|
try:
|
|
ser = serial.Serial(SERIAL_PORT, SERIAL_BAUD, timeout=1)
|
|
time.sleep(0.5)
|
|
ser.read_all()
|
|
print(f"[LIGHT] Connected to Pico on {SERIAL_PORT}")
|
|
return True
|
|
except Exception as e:
|
|
print(f"[LIGHT] Serial error: {e}")
|
|
return False
|
|
|
|
def send_mode_command(cmd):
|
|
"""Send a mode command to Pico, wait for OK response."""
|
|
global ser
|
|
with serial_lock:
|
|
try:
|
|
if ser is None or not ser.is_open:
|
|
if not init_serial():
|
|
return False
|
|
ser.write(f"{cmd}\n".encode())
|
|
response = ser.readline().decode().strip()
|
|
return response == "OK"
|
|
except Exception as e:
|
|
print(f"[LIGHT] Serial error: {e}")
|
|
ser = None
|
|
return False
|
|
|
|
def send_amplitude(level):
|
|
"""Send bare integer for jaw amplitude. Fire-and-forget."""
|
|
global ser
|
|
with serial_lock:
|
|
try:
|
|
if ser is None or not ser.is_open:
|
|
if not init_serial():
|
|
return False
|
|
ser.write(f"{level}\n".encode())
|
|
return True
|
|
except Exception as e:
|
|
print(f"[LIGHT] Serial error: {e}")
|
|
ser = None
|
|
return False
|
|
|
|
# === HTTP API Handler ===
|
|
class LightAPIHandler(BaseHTTPRequestHandler):
|
|
def log_message(self, format, *args):
|
|
pass
|
|
|
|
def _send_json(self, data, status=200):
|
|
self.send_response(status)
|
|
self.send_header('Content-Type', 'application/json')
|
|
self.send_header('Access-Control-Allow-Origin', '*')
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps(data).encode())
|
|
|
|
def do_GET(self):
|
|
if self.path == '/health':
|
|
self._send_json({
|
|
"status": "ok",
|
|
"service": "vixy-lights",
|
|
"serial": ser is not None and ser.is_open
|
|
})
|
|
elif self.path == '/state':
|
|
self._send_json({"state": current_state, "jaw_level": jaw_level})
|
|
else:
|
|
self._send_json({"error": "Not found"}, 404)
|
|
|
|
def do_POST(self):
|
|
global current_state, jaw_level
|
|
try:
|
|
content_length = int(self.headers['Content-Length'])
|
|
body = self.rfile.read(content_length)
|
|
data = json.loads(body.decode())
|
|
|
|
if self.path == '/state':
|
|
new_state = data.get('state', '').lower()
|
|
if new_state in VALID_STATES:
|
|
old_state = current_state
|
|
current_state = new_state
|
|
send_mode_command(f"mood {new_state}")
|
|
print(f"[LIGHT] State: {old_state} -> {new_state}")
|
|
self._send_json({"success": True, "state": new_state})
|
|
else:
|
|
self._send_json({"error": f"Invalid state. Valid: {VALID_STATES}"}, 400)
|
|
|
|
elif self.path == '/jaw/level':
|
|
level = int(data.get('level', 0))
|
|
level = max(0, min(100, level))
|
|
jaw_level = level
|
|
send_amplitude(level)
|
|
self._send_json({"success": True, "jaw_level": level})
|
|
|
|
elif self.path == '/jaw/mode':
|
|
mode = data.get('mode', '').lower()
|
|
if mode in VALID_JAW_MODES:
|
|
send_mode_command(f"jaw {mode}")
|
|
self._send_json({"success": True, "jaw_mode": mode})
|
|
else:
|
|
self._send_json({"error": f"Invalid jaw mode. Valid: {VALID_JAW_MODES}"}, 400)
|
|
|
|
else:
|
|
self._send_json({"error": "Not found"}, 404)
|
|
except Exception as e:
|
|
self._send_json({"error": str(e)}, 400)
|
|
|
|
def do_OPTIONS(self):
|
|
self.send_response(200)
|
|
self.send_header('Access-Control-Allow-Origin', '*')
|
|
self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
|
|
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
|
|
self.end_headers()
|
|
|
|
# === UDP Amplitude Listener ===
|
|
def run_udp_listener():
|
|
"""Listen for jaw amplitude via UDP (fire-and-forget from voice service)."""
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
sock.bind(('0.0.0.0', UDP_PORT))
|
|
sock.settimeout(1.0)
|
|
print(f"[LIGHT] UDP listener on port {UDP_PORT}")
|
|
while running:
|
|
try:
|
|
data, _ = sock.recvfrom(16)
|
|
level = max(0, min(100, int(data)))
|
|
send_amplitude(level)
|
|
except socket.timeout:
|
|
continue
|
|
except (ValueError, Exception):
|
|
pass
|
|
sock.close()
|
|
|
|
# === Signal Handler ===
|
|
def signal_handler(sig, frame):
|
|
global running
|
|
print("\n[LIGHT] Shutting down...")
|
|
running = False
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
# === Main ===
|
|
def main():
|
|
global running
|
|
|
|
print("[LIGHT] Vixy Light Service starting...")
|
|
|
|
if not init_serial():
|
|
print("[LIGHT] Warning: Could not connect to Pico, will retry...")
|
|
|
|
# Set initial mood on Pico
|
|
send_mode_command("mood idle")
|
|
|
|
# Start UDP listener for lip sync amplitude
|
|
udp_thread = threading.Thread(target=run_udp_listener, daemon=True)
|
|
udp_thread.start()
|
|
|
|
server = HTTPServer(('0.0.0.0', HTTP_PORT), LightAPIHandler)
|
|
print(f"[LIGHT] HTTP API listening on port {HTTP_PORT}")
|
|
print("[LIGHT] Service ready")
|
|
|
|
try:
|
|
while running:
|
|
server.handle_request()
|
|
finally:
|
|
print("[LIGHT] Cleaning up...")
|
|
send_mode_command("mood off")
|
|
send_mode_command("jaw off")
|
|
if ser:
|
|
ser.close()
|
|
print("[LIGHT] Shutdown complete")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|