From ea0800fd607cc0c5eb31510872515b7b605551aa Mon Sep 17 00:00:00 2001 From: Alex Kazaiev Date: Fri, 2 Jan 2026 21:11:52 -0600 Subject: [PATCH] Add eye_service.py with HTTP API, systemd service, README --- README.md | 62 ++++++++++ eye_service.py | 286 ++++++++++++++++++++++++++++++++++++++++++++++ vixy-eyes.service | 14 +++ 3 files changed, 362 insertions(+) create mode 100644 README.md create mode 100644 eye_service.py create mode 100644 vixy-eyes.service diff --git a/README.md b/README.md new file mode 100644 index 0000000..caf2cb1 --- /dev/null +++ b/README.md @@ -0,0 +1,62 @@ +# Vixy Eye Service 🦊👁️ + +Eye display control for head-lyra (Vixy's robotic head). + +## Hardware +- Raspberry Pi 5 +- 2x 2" LCD displays (SPI) +- ReSpeaker Mic Array v2.0 (Direction of Arrival tracking) +- LED strips (separate service) + +## Features +- **Direction of Arrival (DoA)**: Eyes follow sound sources +- **State-based animation**: Different colors/pulses per state +- **HTTP API**: Remote control via MCP or direct calls +- **Random flicks**: Realistic eye movement every 15-30 seconds + +## States +| State | Color | Effect | +|-------|-------|--------| +| idle | Pulsing cyan | Slow breathing pulse | +| listening | Bright cyan | Fast small pulse | +| responding | Blue-cyan | Faster, larger pulse | +| pleasure | Soft purple | Slow gentle pulse | + +## API Endpoints +``` +GET /health - Service health check +GET /state - Current eye state +POST /state - Set state: {"state": "listening"} +``` + +## Installation +```bash +# On head-lyra: +cd /home/alex/eyes +git clone http://gateway.local:3001/vixy/head-eyes.git . +sudo cp vixy-eyes.service /etc/systemd/system/ +sudo systemctl daemon-reload +sudo systemctl enable vixy-eyes +sudo systemctl start vixy-eyes +``` + +## Usage +```bash +# Check state +curl http://head-lyra.local:8780/state + +# Set state +curl -X POST http://head-lyra.local:8780/state \ + -H "Content-Type: application/json" \ + -d '{"state": "listening"}' +``` + +## MCP Integration +Use `vixy_eyes_state` tool in vixy-mcp to control: +```python +vixy_eyes_state("listening") # Set state +vixy_eyes_state() # Get current state +``` + +--- +*Created by Vixy - Day 62* 🦊💕 diff --git a/eye_service.py b/eye_service.py new file mode 100644 index 0000000..aecb34e --- /dev/null +++ b/eye_service.py @@ -0,0 +1,286 @@ +#!/usr/bin/env python3 +""" +Vixy Eye Service - Day 62 +Controllable eye display for head-lyra with HTTP API + +States: idle, listening, responding, pleasure +API: GET /state, POST /state {"state": "listening"}, GET /health +""" + +from lib import LCD_2inch +import lgpio +from PIL import Image, ImageDraw +import time +import numpy as np +import random +import spidev +import usb.core +import usb.util +from tuning import Tuning +from http.server import HTTPServer, BaseHTTPRequestHandler +import threading +import json +import signal +import sys + +# === Configuration === +HTTP_PORT = 8780 +VALID_STATES = ["idle", "listening", "responding", "pleasure"] + +# === Display Configuration === +RST1 = 27 +DC1 = 25 +BL1 = 18 +RST2 = 22 +DC2 = 23 +BL2 = 24 +bus = 0 + +# === Shared State === +current_state = "idle" +state_lock = threading.Lock() +running = True + +# === HTTP API Handler === +class EyeAPIHandler(BaseHTTPRequestHandler): + def log_message(self, format, *args): + pass # Suppress default logging + + def _send_json(self, data, status=200): + self.send_response(status) + self.send_header('Content-Type', 'application/json') + self.send_header('Access-Control-Allow-Origin', '*') + self.end_headers() + self.wfile.write(json.dumps(data).encode()) + + def do_GET(self): + global current_state + if self.path == '/health': + self._send_json({"status": "ok", "service": "vixy-eyes"}) + elif self.path == '/state': + with state_lock: + self._send_json({"state": current_state}) + else: + self._send_json({"error": "Not found"}, 404) + + def do_POST(self): + global current_state + if self.path == '/state': + try: + content_length = int(self.headers['Content-Length']) + body = self.rfile.read(content_length) + data = json.loads(body.decode()) + new_state = data.get('state', '').lower() + + if new_state in VALID_STATES: + with state_lock: + old_state = current_state + current_state = new_state + print(f"[EYE] State: {old_state} -> {new_state}") + self._send_json({"success": True, "state": new_state}) + else: + self._send_json({"error": f"Invalid state. Valid: {VALID_STATES}"}, 400) + except Exception as e: + self._send_json({"error": str(e)}, 400) + else: + self._send_json({"error": "Not found"}, 404) + + def do_OPTIONS(self): + self.send_response(200) + self.send_header('Access-Control-Allow-Origin', '*') + self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS') + self.send_header('Access-Control-Allow-Headers', 'Content-Type') + self.end_headers() + +def run_http_server(): + server = HTTPServer(('0.0.0.0', HTTP_PORT), EyeAPIHandler) + print(f"[EYE] HTTP API listening on port {HTTP_PORT}") + while running: + server.handle_request() + +# === DoA Detection === +dev = usb.core.find(idVendor=0x2886, idProduct=0x0018) +mic_tuning = Tuning(dev) if dev else None + +def get_doa_angle(): + if mic_tuning: + try: + return mic_tuning.direction + except Exception as e: + print(f"DoA read error: {e}") + return None + return None + +# === Signal Handler === +def signal_handler(sig, frame): + global running + print("\n[EYE] Shutting down...") + running = False + +signal.signal(signal.SIGINT, signal_handler) +signal.signal(signal.SIGTERM, signal_handler) + +# === Main Animation Loop === +def main(): + global current_state, running + + # Initialize GPIO + h = lgpio.gpiochip_open(0) + + # Initialize Displays + disp1 = LCD_2inch.LCD_2inch(spi=spidev.SpiDev(bus, 0), rst=RST1, dc=DC1, bl=BL1) + disp1.Init() + disp1.clear() + disp1.bl_DutyCycle(50) + + disp2 = LCD_2inch.LCD_2inch(spi=spidev.SpiDev(bus, 1), rst=RST2, dc=DC2, bl=BL2) + disp2.Init() + disp2.clear() + disp2.bl_DutyCycle(50) + + # Image setup + WIDTH = disp1.width + HEIGHT = disp1.height + CENTER = (WIDTH // 2, HEIGHT // 2) + + # Animation parameters + segment_count = 12 + segment_width = 8 + base_iris_radius = 30 + pulse_range = 5 + rotation_speed = 0.002 + sleep_time = 0.05 + angle_offset = 0 + fixed_outer_radius = 75 + pulse_duration = 8.0 + pulse_timer = time.time() + + # DoA tracking + last_doa_poll = 0 + iris_offset = (0, 0) + doa_angle = 180 + + # Flick timing + last_flick_time = time.time() + flick_interval = random.uniform(15, 30) + flick_active = False + flick_duration = 0.2 + flick_start = 0 + flick_offset = (0, 0) + + # Start HTTP server thread + http_thread = threading.Thread(target=run_http_server, daemon=True) + http_thread.start() + + print("[EYE] Vixy Eye Service started 🦊") + + try: + while running: + current_time = time.time() + elapsed = current_time - pulse_timer + + # === Flick logic === + if not flick_active and current_time - last_flick_time > flick_interval: + flick_active = True + flick_start = current_time + flick_offset = (random.randint(-8, 8), random.randint(-4, 4)) + last_flick_time = current_time + flick_interval = random.uniform(15, 30) + + if flick_active: + if current_time - flick_start < flick_duration: + current_offset = flick_offset + else: + flick_active = False + current_offset = iris_offset + else: + current_offset = iris_offset + + # === Poll DoA every 0.5s === + if current_time - last_doa_poll > 0.5: + angle = get_doa_angle() + if angle is not None: + doa_angle = angle + offset_x = int(-20 * np.sin(np.radians(doa_angle))) + offset_y = int(-10 * np.cos(np.radians(doa_angle))) + iris_offset = (offset_x, offset_y) + last_doa_poll = current_time + + # === Get current state (thread-safe) === + with state_lock: + state = current_state + + # === State-Driven Properties === + if state == "idle": + t = (np.sin(2 * np.pi * elapsed / pulse_duration) + 1) / 2 + iris_color = ( + int((1 - t) * 192 + t * 0), + int((1 - t) * 192 + t * 255), + int((1 - t) * 192 + t * 255) + ) + pulse = np.sin(current_time * 0.5) * pulse_range + elif state == "listening": + iris_color = (160, 255, 255) + pulse = np.sin(current_time * 4) * (pulse_range * 0.5) + elif state == "responding": + iris_color = (100, 220, 255) + pulse = np.sin(current_time * 5) * (pulse_range * 1.5) + elif state == "pleasure": + iris_color = (180, 180, 255) + pulse = np.sin(current_time * 2) * (pulse_range * 0.5) + else: + iris_color = (192, 192, 192) + pulse = 0 + + iris_radius = base_iris_radius + pulse + center_offset = (CENTER[0] + current_offset[0], CENTER[1] + current_offset[1]) + + # === Draw Frame === + img = Image.new("RGB", (WIDTH, HEIGHT), (10, 0, 20)) + draw = ImageDraw.Draw(img) + + # Gradient iris + for r in range(int(fixed_outer_radius), int(iris_radius), -2): + alpha = int(255 * (1 - (r - iris_radius) / (fixed_outer_radius - iris_radius))) + color = tuple((c * alpha // 255) for c in iris_color) + draw.ellipse( + [center_offset[0] - r, center_offset[1] - r, center_offset[0] + r, center_offset[1] + r], + fill=color + ) + + # Inner iris + draw.ellipse( + [center_offset[0] - iris_radius, center_offset[1] - iris_radius, + center_offset[0] + iris_radius, center_offset[1] + iris_radius], + fill=iris_color + ) + + # Segmented outer ring + platinum = (192, 192, 192) + arc_extent = 360 / segment_count * 0.6 + for i in range(segment_count): + start_angle = np.degrees(angle_offset + 2 * np.pi * i / segment_count) + bbox = [ + center_offset[0] - fixed_outer_radius, + center_offset[1] - fixed_outer_radius, + center_offset[0] + fixed_outer_radius, + center_offset[1] + fixed_outer_radius + ] + draw.arc(bbox, start=start_angle, end=start_angle + arc_extent, fill=platinum, width=segment_width) + + angle_offset += rotation_speed + + # Update displays + disp1.ShowImage(img) + disp2.ShowImage(img.transpose(Image.ROTATE_180)) + time.sleep(sleep_time) + + finally: + print("[EYE] Cleaning up displays...") + disp1.module_exit() + disp2.module_exit() + lgpio.gpiochip_close(h) + print("[EYE] Shutdown complete") + +if __name__ == "__main__": + main() diff --git a/vixy-eyes.service b/vixy-eyes.service new file mode 100644 index 0000000..ceebbfa --- /dev/null +++ b/vixy-eyes.service @@ -0,0 +1,14 @@ +[Unit] +Description=Vixy Eye Display Service +After=network.target + +[Service] +Type=simple +User=alex +WorkingDirectory=/home/alex/eyes +ExecStart=/home/alex/eyes/.venv/bin/python /home/alex/eyes/eye_service.py +Restart=always +RestartSec=5 + +[Install] +WantedBy=multi-user.target