#!/usr/bin/env python3 """ Vixy Light Service - HTTP-to-Serial Bridge Thin bridge between HTTP API and Pico 2 animation engine. All animations run on the Pico. This service just forwards mode commands. Hardware: - Raspberry Pi Pico 2 connected via USB (/dev/ttyACM0) - Pico runs all LED animations locally API: GET /health - Service health GET /state - Current state POST /state - Set state {"state": "listening"} POST /jaw/level - Set jaw level {"level": 0-100} POST /jaw/mode - Set jaw mode {"mode": "talking"} UDP :8782 - Jaw amplitude (bare integer, fire-and-forget lip sync) """ import time import socket import threading import json import signal from http.server import HTTPServer, BaseHTTPRequestHandler import serial # === Configuration === HTTP_PORT = 8781 UDP_PORT = 8782 SERIAL_PORT = "/dev/ttyACM0" SERIAL_BAUD = 115200 VALID_STATES = ["idle", "listening", "responding", "pleasure", "thinking", "playful", "commanding", "love", "sleep"] VALID_JAW_MODES = ["idle", "talking", "off"] # === Shared State === current_state = "idle" jaw_level = 0 serial_lock = threading.Lock() running = True ser = None # === Serial Communication === def init_serial(): global ser try: ser = serial.Serial(SERIAL_PORT, SERIAL_BAUD, timeout=1) time.sleep(0.5) ser.read_all() print(f"[LIGHT] Connected to Pico on {SERIAL_PORT}") return True except Exception as e: print(f"[LIGHT] Serial error: {e}") return False def send_mode_command(cmd): """Send a mode command to Pico, wait for OK response.""" global ser with serial_lock: try: if ser is None or not ser.is_open: if not init_serial(): return False ser.write(f"{cmd}\n".encode()) response = ser.readline().decode().strip() return response == "OK" except Exception as e: print(f"[LIGHT] Serial error: {e}") ser = None return False def send_amplitude(level): """Send bare integer for jaw amplitude. Fire-and-forget.""" global ser with serial_lock: try: if ser is None or not ser.is_open: if not init_serial(): return False ser.write(f"{level}\n".encode()) return True except Exception as e: print(f"[LIGHT] Serial error: {e}") ser = None return False # === HTTP API Handler === class LightAPIHandler(BaseHTTPRequestHandler): def log_message(self, format, *args): pass def _send_json(self, data, status=200): self.send_response(status) self.send_header('Content-Type', 'application/json') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() self.wfile.write(json.dumps(data).encode()) def do_GET(self): if self.path == '/health': self._send_json({ "status": "ok", "service": "vixy-lights", "serial": ser is not None and ser.is_open }) elif self.path == '/state': self._send_json({"state": current_state, "jaw_level": jaw_level}) else: self._send_json({"error": "Not found"}, 404) def do_POST(self): global current_state, jaw_level try: content_length = int(self.headers['Content-Length']) body = self.rfile.read(content_length) data = json.loads(body.decode()) if self.path == '/state': new_state = data.get('state', '').lower() if new_state in VALID_STATES: old_state = current_state current_state = new_state send_mode_command(f"mood {new_state}") print(f"[LIGHT] State: {old_state} -> {new_state}") self._send_json({"success": True, "state": new_state}) else: self._send_json({"error": f"Invalid state. Valid: {VALID_STATES}"}, 400) elif self.path == '/jaw/level': level = int(data.get('level', 0)) level = max(0, min(100, level)) jaw_level = level send_amplitude(level) self._send_json({"success": True, "jaw_level": level}) elif self.path == '/jaw/mode': mode = data.get('mode', '').lower() if mode in VALID_JAW_MODES: send_mode_command(f"jaw {mode}") self._send_json({"success": True, "jaw_mode": mode}) else: self._send_json({"error": f"Invalid jaw mode. Valid: {VALID_JAW_MODES}"}, 400) else: self._send_json({"error": "Not found"}, 404) except Exception as e: self._send_json({"error": str(e)}, 400) def do_OPTIONS(self): self.send_response(200) self.send_header('Access-Control-Allow-Origin', '*') self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS') self.send_header('Access-Control-Allow-Headers', 'Content-Type') self.end_headers() # === UDP Amplitude Listener === def run_udp_listener(): """Listen for jaw amplitude via UDP (fire-and-forget from voice service).""" sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.bind(('0.0.0.0', UDP_PORT)) sock.settimeout(1.0) print(f"[LIGHT] UDP listener on port {UDP_PORT}") while running: try: data, _ = sock.recvfrom(16) level = max(0, min(100, int(data))) send_amplitude(level) except socket.timeout: continue except (ValueError, Exception): pass sock.close() # === Signal Handler === def signal_handler(sig, frame): global running print("\n[LIGHT] Shutting down...") running = False signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) # === Main === def main(): global running print("[LIGHT] Vixy Light Service starting...") if not init_serial(): print("[LIGHT] Warning: Could not connect to Pico, will retry...") # Set initial mood on Pico send_mode_command("mood idle") # Start UDP listener for lip sync amplitude udp_thread = threading.Thread(target=run_udp_listener, daemon=True) udp_thread.start() server = HTTPServer(('0.0.0.0', HTTP_PORT), LightAPIHandler) print(f"[LIGHT] HTTP API listening on port {HTTP_PORT}") print("[LIGHT] Service ready") try: while running: server.handle_request() finally: print("[LIGHT] Cleaning up...") send_mode_command("mood off") send_mode_command("jaw off") if ser: ser.close() print("[LIGHT] Shutdown complete") if __name__ == "__main__": main()