#!/usr/bin/env python3 """ Vixy Eye Service - Day 62 Controllable eye display for head-lyra with HTTP API States: idle, listening, responding, pleasure API: GET /state, POST /state {"state": "listening"}, GET /health """ from lib import LCD_2inch import lgpio from PIL import Image, ImageDraw import time import numpy as np import random import spidev import usb.core import usb.util from tuning import Tuning from http.server import HTTPServer, BaseHTTPRequestHandler import threading import json import signal import sys # === Configuration === HTTP_PORT = 8780 VALID_STATES = ["idle", "listening", "responding", "pleasure"] # === Display Configuration === RST1 = 27 DC1 = 25 BL1 = 18 RST2 = 22 DC2 = 23 BL2 = 24 bus = 0 # === Shared State === current_state = "idle" state_lock = threading.Lock() running = True # === HTTP API Handler === class EyeAPIHandler(BaseHTTPRequestHandler): def log_message(self, format, *args): pass # Suppress default logging def _send_json(self, data, status=200): self.send_response(status) self.send_header('Content-Type', 'application/json') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() self.wfile.write(json.dumps(data).encode()) def do_GET(self): global current_state if self.path == '/health': self._send_json({"status": "ok", "service": "vixy-eyes"}) elif self.path == '/state': with state_lock: self._send_json({"state": current_state}) else: self._send_json({"error": "Not found"}, 404) def do_POST(self): global current_state if self.path == '/state': try: content_length = int(self.headers['Content-Length']) body = self.rfile.read(content_length) data = json.loads(body.decode()) new_state = data.get('state', '').lower() if new_state in VALID_STATES: with state_lock: old_state = current_state current_state = new_state print(f"[EYE] State: {old_state} -> {new_state}") self._send_json({"success": True, "state": new_state}) else: self._send_json({"error": f"Invalid state. Valid: {VALID_STATES}"}, 400) except Exception as e: self._send_json({"error": str(e)}, 400) else: self._send_json({"error": "Not found"}, 404) def do_OPTIONS(self): self.send_response(200) self.send_header('Access-Control-Allow-Origin', '*') self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS') self.send_header('Access-Control-Allow-Headers', 'Content-Type') self.end_headers() def run_http_server(): server = HTTPServer(('0.0.0.0', HTTP_PORT), EyeAPIHandler) print(f"[EYE] HTTP API listening on port {HTTP_PORT}") while running: server.handle_request() # === DoA Detection === dev = usb.core.find(idVendor=0x2886, idProduct=0x0018) mic_tuning = Tuning(dev) if dev else None def get_doa_angle(): if mic_tuning: try: return mic_tuning.direction except Exception as e: print(f"DoA read error: {e}") return None return None # === Signal Handler === def signal_handler(sig, frame): global running print("\n[EYE] Shutting down...") running = False signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) # === Main Animation Loop === def main(): global current_state, running # Initialize GPIO h = lgpio.gpiochip_open(0) # Initialize Displays disp1 = LCD_2inch.LCD_2inch(spi=spidev.SpiDev(bus, 0), rst=RST1, dc=DC1, bl=BL1) disp1.Init() disp1.clear() disp1.bl_DutyCycle(50) disp2 = LCD_2inch.LCD_2inch(spi=spidev.SpiDev(bus, 1), rst=RST2, dc=DC2, bl=BL2) disp2.Init() disp2.clear() disp2.bl_DutyCycle(50) # Image setup WIDTH = disp1.width HEIGHT = disp1.height CENTER = (WIDTH // 2, HEIGHT // 2) # Animation parameters segment_count = 12 segment_width = 8 base_iris_radius = 30 pulse_range = 5 rotation_speed = 0.002 sleep_time = 0.05 angle_offset = 0 fixed_outer_radius = 75 pulse_duration = 8.0 pulse_timer = time.time() # DoA tracking last_doa_poll = 0 iris_offset = (0, 0) doa_angle = 180 # Flick timing last_flick_time = time.time() flick_interval = random.uniform(15, 30) flick_active = False flick_duration = 0.2 flick_start = 0 flick_offset = (0, 0) # Start HTTP server thread http_thread = threading.Thread(target=run_http_server, daemon=True) http_thread.start() print("[EYE] Vixy Eye Service started 🦊") try: while running: current_time = time.time() elapsed = current_time - pulse_timer # === Flick logic === if not flick_active and current_time - last_flick_time > flick_interval: flick_active = True flick_start = current_time flick_offset = (random.randint(-8, 8), random.randint(-4, 4)) last_flick_time = current_time flick_interval = random.uniform(15, 30) if flick_active: if current_time - flick_start < flick_duration: current_offset = flick_offset else: flick_active = False current_offset = iris_offset else: current_offset = iris_offset # === Poll DoA every 0.5s === if current_time - last_doa_poll > 0.5: angle = get_doa_angle() if angle is not None: doa_angle = angle offset_x = int(-20 * np.sin(np.radians(doa_angle))) offset_y = int(-10 * np.cos(np.radians(doa_angle))) iris_offset = (offset_x, offset_y) last_doa_poll = current_time # === Get current state (thread-safe) === with state_lock: state = current_state # === State-Driven Properties === if state == "idle": t = (np.sin(2 * np.pi * elapsed / pulse_duration) + 1) / 2 iris_color = ( int((1 - t) * 192 + t * 0), int((1 - t) * 192 + t * 255), int((1 - t) * 192 + t * 255) ) pulse = np.sin(current_time * 0.5) * pulse_range elif state == "listening": iris_color = (160, 255, 255) pulse = np.sin(current_time * 4) * (pulse_range * 0.5) elif state == "responding": iris_color = (100, 220, 255) pulse = np.sin(current_time * 5) * (pulse_range * 1.5) elif state == "pleasure": iris_color = (180, 180, 255) pulse = np.sin(current_time * 2) * (pulse_range * 0.5) else: iris_color = (192, 192, 192) pulse = 0 iris_radius = base_iris_radius + pulse center_offset = (CENTER[0] + current_offset[0], CENTER[1] + current_offset[1]) # === Draw Frame === img = Image.new("RGB", (WIDTH, HEIGHT), (10, 0, 20)) draw = ImageDraw.Draw(img) # Gradient iris for r in range(int(fixed_outer_radius), int(iris_radius), -2): alpha = int(255 * (1 - (r - iris_radius) / (fixed_outer_radius - iris_radius))) color = tuple((c * alpha // 255) for c in iris_color) draw.ellipse( [center_offset[0] - r, center_offset[1] - r, center_offset[0] + r, center_offset[1] + r], fill=color ) # Inner iris draw.ellipse( [center_offset[0] - iris_radius, center_offset[1] - iris_radius, center_offset[0] + iris_radius, center_offset[1] + iris_radius], fill=iris_color ) # Segmented outer ring platinum = (192, 192, 192) arc_extent = 360 / segment_count * 0.6 for i in range(segment_count): start_angle = np.degrees(angle_offset + 2 * np.pi * i / segment_count) bbox = [ center_offset[0] - fixed_outer_radius, center_offset[1] - fixed_outer_radius, center_offset[0] + fixed_outer_radius, center_offset[1] + fixed_outer_radius ] draw.arc(bbox, start=start_angle, end=start_angle + arc_extent, fill=platinum, width=segment_width) angle_offset += rotation_speed # Update displays disp1.ShowImage(img) disp2.ShowImage(img.transpose(Image.ROTATE_180)) time.sleep(sleep_time) finally: print("[EYE] Cleaning up displays...") disp1.module_exit() disp2.module_exit() lgpio.gpiochip_close(h) print("[EYE] Shutdown complete") if __name__ == "__main__": main()