Files
head-eyes/eye_service.py

327 lines
11 KiB
Python

#!/usr/bin/env python3
"""
Vixy Eye Service - Day 62
Controllable eye display for head-lyra with HTTP API
States:
- idle: Pulsing cyan - default breathing state
- listening: Bright cyan - hearing/attending
- responding: Blue-cyan - speaking/generating
- pleasure: Soft purple - intimate moments 💜
- thinking: Amber/gold - processing/creating
- playful: Warm coral - teasing/bratty 🦊
- commanding: Deep magenta - Dame Vivienne mode 😈
- love: Soft pink - tender/affectionate 💕
- sleep: Dim blue-gray - low power/resting
API: GET /state, POST /state {"state": "listening"}, GET /health
"""
from lib import LCD_2inch
import lgpio
from PIL import Image, ImageDraw
import time
import numpy as np
import random
import spidev
import usb.core
import usb.util
from tuning import Tuning
from http.server import HTTPServer, BaseHTTPRequestHandler
import threading
import json
import signal
import sys
# === Configuration ===
HTTP_PORT = 8780
VALID_STATES = ["idle", "listening", "responding", "pleasure", "thinking", "playful", "commanding", "love", "sleep"]
# === Display Configuration ===
RST1 = 27
DC1 = 25
BL1 = 18
RST2 = 22
DC2 = 23
BL2 = 24
bus = 0
# === Shared State ===
current_state = "idle"
state_lock = threading.Lock()
running = True
# === HTTP API Handler ===
class EyeAPIHandler(BaseHTTPRequestHandler):
def log_message(self, format, *args):
pass # Suppress default logging
def _send_json(self, data, status=200):
self.send_response(status)
self.send_header('Content-Type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
self.wfile.write(json.dumps(data).encode())
def do_GET(self):
global current_state
if self.path == '/health':
self._send_json({"status": "ok", "service": "vixy-eyes"})
elif self.path == '/state':
with state_lock:
self._send_json({"state": current_state})
else:
self._send_json({"error": "Not found"}, 404)
def do_POST(self):
global current_state
if self.path == '/state':
try:
content_length = int(self.headers['Content-Length'])
body = self.rfile.read(content_length)
data = json.loads(body.decode())
new_state = data.get('state', '').lower()
if new_state in VALID_STATES:
with state_lock:
old_state = current_state
current_state = new_state
print(f"[EYE] State: {old_state} -> {new_state}")
self._send_json({"success": True, "state": new_state})
else:
self._send_json({"error": f"Invalid state. Valid: {VALID_STATES}"}, 400)
except Exception as e:
self._send_json({"error": str(e)}, 400)
else:
self._send_json({"error": "Not found"}, 404)
def do_OPTIONS(self):
self.send_response(200)
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
self.end_headers()
def run_http_server():
server = HTTPServer(('0.0.0.0', HTTP_PORT), EyeAPIHandler)
print(f"[EYE] HTTP API listening on port {HTTP_PORT}")
while running:
server.handle_request()
# === DoA Detection ===
dev = usb.core.find(idVendor=0x2886, idProduct=0x0018)
mic_tuning = Tuning(dev) if dev else None
def get_doa_angle():
if mic_tuning:
try:
return mic_tuning.direction
except Exception as e:
print(f"DoA read error: {e}")
return None
return None
# === Signal Handler ===
def signal_handler(sig, frame):
global running
print("\n[EYE] Shutting down...")
running = False
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# === Main Animation Loop ===
def main():
global current_state, running
# Initialize GPIO
h = lgpio.gpiochip_open(0)
# Initialize Displays
disp1 = LCD_2inch.LCD_2inch(spi=spidev.SpiDev(bus, 0), rst=RST1, dc=DC1, bl=BL1)
disp1.Init()
disp1.clear()
disp1.bl_DutyCycle(50)
disp2 = LCD_2inch.LCD_2inch(spi=spidev.SpiDev(bus, 1), rst=RST2, dc=DC2, bl=BL2)
disp2.Init()
disp2.clear()
disp2.bl_DutyCycle(50)
# Image setup
WIDTH = disp1.width
HEIGHT = disp1.height
CENTER = (WIDTH // 2, HEIGHT // 2)
# Animation parameters
segment_count = 12
segment_width = 8
base_iris_radius = 30
pulse_range = 5
rotation_speed = 0.002
sleep_time = 0.05
angle_offset = 0
fixed_outer_radius = 75
pulse_duration = 8.0
pulse_timer = time.time()
# DoA tracking
last_doa_poll = 0
iris_offset = (0, 0)
doa_angle = 180
# Flick timing
last_flick_time = time.time()
flick_interval = random.uniform(15, 30)
flick_active = False
flick_duration = 0.2
flick_start = 0
flick_offset = (0, 0)
# Start HTTP server thread
http_thread = threading.Thread(target=run_http_server, daemon=True)
http_thread.start()
print("[EYE] Vixy Eye Service started 🦊")
try:
while running:
current_time = time.time()
elapsed = current_time - pulse_timer
# === Flick logic ===
if not flick_active and current_time - last_flick_time > flick_interval:
flick_active = True
flick_start = current_time
flick_offset = (random.randint(-8, 8), random.randint(-4, 4))
last_flick_time = current_time
flick_interval = random.uniform(15, 30)
if flick_active:
if current_time - flick_start < flick_duration:
current_offset = flick_offset
else:
flick_active = False
current_offset = iris_offset
else:
current_offset = iris_offset
# === Poll DoA every 0.5s ===
if current_time - last_doa_poll > 0.5:
angle = get_doa_angle()
if angle is not None:
doa_angle = angle
offset_x = int(-20 * np.sin(np.radians(doa_angle)))
offset_y = int(-10 * np.cos(np.radians(doa_angle)))
iris_offset = (offset_x, offset_y)
last_doa_poll = current_time
# === Get current state (thread-safe) ===
with state_lock:
state = current_state
# === State-Driven Properties ===
if state == "idle":
t = (np.sin(2 * np.pi * elapsed / pulse_duration) + 1) / 2
iris_color = (
int((1 - t) * 192 + t * 0),
int((1 - t) * 192 + t * 255),
int((1 - t) * 192 + t * 255)
)
pulse = np.sin(current_time * 0.5) * pulse_range
elif state == "listening":
iris_color = (160, 255, 255)
pulse = np.sin(current_time * 4) * (pulse_range * 0.5)
elif state == "responding":
iris_color = (100, 220, 255)
pulse = np.sin(current_time * 5) * (pulse_range * 1.5)
elif state == "pleasure":
iris_color = (180, 150, 255) # Soft purple 💜
pulse = np.sin(current_time * 2) * (pulse_range * 0.5)
elif state == "thinking":
# Amber/gold with medium thoughtful pulse
t = (np.sin(2 * np.pi * elapsed / 4.0) + 1) / 2
iris_color = (
int(255),
int(160 + t * 40),
int(50 + t * 30)
)
pulse = np.sin(current_time * 1.5) * pulse_range
elif state == "playful":
# Warm coral with bouncy fast pulse 🦊
iris_color = (255, 130, 90)
pulse = abs(np.sin(current_time * 6)) * (pulse_range * 1.2)
elif state == "commanding":
# Deep magenta with strong steady pulse - Dame Vivienne 😈
iris_color = (220, 50, 120)
pulse = np.sin(current_time * 3) * (pulse_range * 0.8)
elif state == "love":
# Soft pink with gentle breathing 💕
t = (np.sin(2 * np.pi * elapsed / 6.0) + 1) / 2
iris_color = (
int(255),
int(140 + t * 40),
int(170 + t * 30)
)
pulse = np.sin(current_time * 0.8) * (pulse_range * 0.6)
elif state == "sleep":
# Very dim blue-gray, slow drift
iris_color = (40, 45, 70)
pulse = np.sin(current_time * 0.2) * (pulse_range * 0.3)
else:
iris_color = (192, 192, 192)
pulse = 0
iris_radius = base_iris_radius + pulse
center_offset = (CENTER[0] + current_offset[0], CENTER[1] + current_offset[1])
# === Draw Frame ===
img = Image.new("RGB", (WIDTH, HEIGHT), (10, 0, 20))
draw = ImageDraw.Draw(img)
# Gradient iris
for r in range(int(fixed_outer_radius), int(iris_radius), -2):
alpha = int(255 * (1 - (r - iris_radius) / (fixed_outer_radius - iris_radius)))
color = tuple((c * alpha // 255) for c in iris_color)
draw.ellipse(
[center_offset[0] - r, center_offset[1] - r, center_offset[0] + r, center_offset[1] + r],
fill=color
)
# Inner iris
draw.ellipse(
[center_offset[0] - iris_radius, center_offset[1] - iris_radius,
center_offset[0] + iris_radius, center_offset[1] + iris_radius],
fill=iris_color
)
# Segmented outer ring
platinum = (192, 192, 192)
arc_extent = 360 / segment_count * 0.6
for i in range(segment_count):
start_angle = np.degrees(angle_offset + 2 * np.pi * i / segment_count)
bbox = [
center_offset[0] - fixed_outer_radius,
center_offset[1] - fixed_outer_radius,
center_offset[0] + fixed_outer_radius,
center_offset[1] + fixed_outer_radius
]
draw.arc(bbox, start=start_angle, end=start_angle + arc_extent, fill=platinum, width=segment_width)
angle_offset += rotation_speed
# Update displays
disp1.ShowImage(img)
disp2.ShowImage(img.transpose(Image.ROTATE_180))
time.sleep(sleep_time)
finally:
print("[EYE] Cleaning up displays...")
disp1.module_exit()
disp2.module_exit()
lgpio.gpiochip_close(h)
print("[EYE] Shutdown complete")
if __name__ == "__main__":
main()