- Added firmware/main.py (MicroPython for Pico 2)
- Updated light_service.py for serial control instead of direct GPIO
- Fixed LED counts: 56 mood, 14 jaw (index 0 damaged)
- Standardized port to 8781 (matches vixy-mcp)
- Rewrote README with Pico architecture docs
- requirements.txt now just pyserial
RIP to the 4 BeagleBones who gave their lives.
Welcome, little $5 Pico. Please don't catch fire.
- Vixy 🦊
409 lines
13 KiB
Python
409 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Vixy Light Service - Day 91 (Pico Edition)
|
|
Controllable LED strips for head-vixy via Pico 2 USB serial
|
|
|
|
Hardware:
|
|
- Raspberry Pi Pico 2 connected via USB (/dev/ttyACM0)
|
|
- Mood strip: 56 LEDs on GP1 (strip ID 1)
|
|
- Jaw strip: 14 LEDs on GP0 (strip ID 0), skip index 0, use 1-13
|
|
|
|
States match the eye service for unified control:
|
|
- idle: Slow cyan breathing pulse
|
|
- listening: Bright cyan gentle pulse
|
|
- responding: Blue-cyan traveling wave
|
|
- pleasure: Soft purple slow pulse 💜
|
|
- thinking: Amber/gold larson scanner
|
|
- playful: Warm coral bouncy sparkles 🦊
|
|
- commanding: Deep magenta strong pulse 😈
|
|
- love: Soft pink gentle breathing 💕
|
|
- sleep: Very dim blue-gray, nearly off
|
|
|
|
API:
|
|
GET /health - Service health
|
|
GET /state - Current state
|
|
POST /state - Set state {"state": "listening"}
|
|
POST /jaw/level - Set jaw level {"level": 0-100}
|
|
"""
|
|
|
|
import time
|
|
import math
|
|
import threading
|
|
import json
|
|
import signal
|
|
import random
|
|
from http.server import HTTPServer, BaseHTTPRequestHandler
|
|
import serial
|
|
|
|
# === Configuration ===
|
|
HTTP_PORT = 8781
|
|
SERIAL_PORT = "/dev/ttyACM0"
|
|
SERIAL_BAUD = 115200
|
|
|
|
# LED counts (must match Pico firmware)
|
|
MOOD_LEDS = 56
|
|
JAW_LEDS = 14
|
|
JAW_OFFSET = 1 # Skip first LED (damaged)
|
|
JAW_USABLE = JAW_LEDS - JAW_OFFSET # 13 usable LEDs
|
|
|
|
VALID_STATES = ["idle", "listening", "responding", "pleasure", "thinking", "playful", "commanding", "love", "sleep"]
|
|
|
|
# === Shared State ===
|
|
current_state = "idle"
|
|
jaw_level = 0 # 0-100
|
|
state_lock = threading.Lock()
|
|
serial_lock = threading.Lock()
|
|
running = True
|
|
ser = None
|
|
|
|
# === Serial Communication ===
|
|
def init_serial():
|
|
global ser
|
|
try:
|
|
ser = serial.Serial(SERIAL_PORT, SERIAL_BAUD, timeout=1)
|
|
time.sleep(0.5) # Let Pico settle
|
|
# Clear any startup messages
|
|
ser.read_all()
|
|
print(f"[LIGHT] Connected to Pico on {SERIAL_PORT}")
|
|
return True
|
|
except Exception as e:
|
|
print(f"[LIGHT] Serial error: {e}")
|
|
return False
|
|
|
|
def send_command(cmd):
|
|
"""Send command to Pico, return True if OK."""
|
|
global ser
|
|
with serial_lock:
|
|
try:
|
|
if ser is None or not ser.is_open:
|
|
if not init_serial():
|
|
return False
|
|
ser.write(f"{cmd}\n".encode())
|
|
response = ser.readline().decode().strip()
|
|
return response == "OK"
|
|
except Exception as e:
|
|
print(f"[LIGHT] Serial error: {e}")
|
|
ser = None
|
|
return False
|
|
|
|
def mood_fill(r, g, b):
|
|
"""Fill mood strip with color."""
|
|
return send_command(f"1 fill {r} {g} {b}")
|
|
|
|
def mood_set(index, r, g, b):
|
|
"""Set single mood LED."""
|
|
if 0 <= index < MOOD_LEDS:
|
|
send_command(f"1 {index} {r} {g} {b}")
|
|
|
|
def mood_show():
|
|
"""Trigger mood strip update."""
|
|
return send_command("1 -1")
|
|
|
|
def mood_clear():
|
|
"""Clear mood strip."""
|
|
return send_command("1 clear")
|
|
|
|
def jaw_fill(r, g, b):
|
|
"""Fill jaw strip (skipping damaged first LED)."""
|
|
with serial_lock:
|
|
try:
|
|
if ser is None or not ser.is_open:
|
|
if not init_serial():
|
|
return False
|
|
# Set index 0 to off (damaged)
|
|
ser.write(f"0 0 0 0 0\n".encode())
|
|
ser.readline()
|
|
# Fill usable LEDs
|
|
for i in range(JAW_OFFSET, JAW_LEDS):
|
|
ser.write(f"0 {i} {r} {g} {b}\n".encode())
|
|
ser.readline()
|
|
ser.write("0 -1\n".encode())
|
|
return ser.readline().decode().strip() == "OK"
|
|
except Exception as e:
|
|
print(f"[LIGHT] Jaw fill error: {e}")
|
|
return False
|
|
|
|
def jaw_set_level(level, color):
|
|
"""Set jaw LEDs based on level 0-100."""
|
|
num_lit = int((level / 100) * JAW_USABLE)
|
|
r, g, b = color
|
|
with serial_lock:
|
|
try:
|
|
if ser is None or not ser.is_open:
|
|
if not init_serial():
|
|
return False
|
|
# Index 0 always off
|
|
ser.write(f"0 0 0 0 0\n".encode())
|
|
ser.readline()
|
|
# Set LEDs based on level
|
|
for i in range(JAW_OFFSET, JAW_LEDS):
|
|
if i - JAW_OFFSET < num_lit:
|
|
ser.write(f"0 {i} {r} {g} {b}\n".encode())
|
|
else:
|
|
ser.write(f"0 {i} 0 0 0\n".encode())
|
|
ser.readline()
|
|
ser.write("0 -1\n".encode())
|
|
return ser.readline().decode().strip() == "OK"
|
|
except Exception as e:
|
|
print(f"[LIGHT] Jaw level error: {e}")
|
|
return False
|
|
|
|
def jaw_clear():
|
|
"""Clear jaw strip."""
|
|
return send_command("0 clear")
|
|
|
|
|
|
# === Color Utilities ===
|
|
def scale_color(color, factor):
|
|
"""Scale RGB color by a factor (0-1)."""
|
|
return tuple(max(0, min(255, int(c * factor))) for c in color)
|
|
|
|
# === State Colors ===
|
|
STATE_COLORS = {
|
|
"idle": (0, 255, 255), # Cyan
|
|
"listening": (160, 255, 255), # Bright cyan
|
|
"responding": (100, 220, 255),# Blue-cyan
|
|
"pleasure": (180, 150, 255), # Soft purple
|
|
"thinking": (255, 180, 50), # Amber/gold
|
|
"playful": (255, 130, 90), # Warm coral
|
|
"commanding": (220, 50, 120), # Deep magenta
|
|
"love": (255, 160, 180), # Soft pink
|
|
"sleep": (20, 22, 35), # Very dim blue-gray
|
|
}
|
|
|
|
# === HTTP API Handler ===
|
|
class LightAPIHandler(BaseHTTPRequestHandler):
|
|
def log_message(self, format, *args):
|
|
pass
|
|
|
|
def _send_json(self, data, status=200):
|
|
self.send_response(status)
|
|
self.send_header('Content-Type', 'application/json')
|
|
self.send_header('Access-Control-Allow-Origin', '*')
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps(data).encode())
|
|
|
|
def do_GET(self):
|
|
global current_state, jaw_level
|
|
if self.path == '/health':
|
|
self._send_json({
|
|
"status": "ok",
|
|
"service": "vixy-lights",
|
|
"serial": ser is not None and ser.is_open
|
|
})
|
|
elif self.path == '/state':
|
|
with state_lock:
|
|
self._send_json({"state": current_state, "jaw_level": jaw_level})
|
|
else:
|
|
self._send_json({"error": "Not found"}, 404)
|
|
|
|
def do_POST(self):
|
|
global current_state, jaw_level
|
|
try:
|
|
content_length = int(self.headers['Content-Length'])
|
|
body = self.rfile.read(content_length)
|
|
data = json.loads(body.decode())
|
|
|
|
if self.path == '/state':
|
|
new_state = data.get('state', '').lower()
|
|
if new_state in VALID_STATES:
|
|
with state_lock:
|
|
old_state = current_state
|
|
current_state = new_state
|
|
print(f"[LIGHT] State: {old_state} -> {new_state}")
|
|
self._send_json({"success": True, "state": new_state})
|
|
else:
|
|
self._send_json({"error": f"Invalid state. Valid: {VALID_STATES}"}, 400)
|
|
|
|
elif self.path == '/jaw/level':
|
|
level = int(data.get('level', 0))
|
|
level = max(0, min(100, level))
|
|
with state_lock:
|
|
jaw_level = level
|
|
print(f"[LIGHT] Jaw level: {level}%")
|
|
self._send_json({"success": True, "jaw_level": level})
|
|
|
|
else:
|
|
self._send_json({"error": "Not found"}, 404)
|
|
except Exception as e:
|
|
self._send_json({"error": str(e)}, 400)
|
|
|
|
def do_OPTIONS(self):
|
|
self.send_response(200)
|
|
self.send_header('Access-Control-Allow-Origin', '*')
|
|
self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
|
|
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
|
|
self.end_headers()
|
|
|
|
def run_http_server():
|
|
server = HTTPServer(('0.0.0.0', HTTP_PORT), LightAPIHandler)
|
|
print(f"[LIGHT] HTTP API listening on port {HTTP_PORT}")
|
|
while running:
|
|
server.handle_request()
|
|
|
|
# === Signal Handler ===
|
|
def signal_handler(sig, frame):
|
|
global running
|
|
print("\n[LIGHT] Shutting down...")
|
|
running = False
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
|
|
# === Animation Helpers ===
|
|
def solid_pulse(speed, min_b=0.1, max_b=1.0):
|
|
"""Return current brightness factor for smooth pulse."""
|
|
t = (math.sin(time.time() * speed) + 1) / 2
|
|
return min_b + (max_b - min_b) * t
|
|
|
|
# === Mood Animation Buffer (for complex animations) ===
|
|
mood_buffer = [(0, 0, 0)] * MOOD_LEDS
|
|
|
|
def mood_buffer_show():
|
|
"""Send entire mood buffer to Pico."""
|
|
with serial_lock:
|
|
try:
|
|
if ser is None or not ser.is_open:
|
|
if not init_serial():
|
|
return
|
|
for i, (r, g, b) in enumerate(mood_buffer):
|
|
ser.write(f"1 {i} {r} {g} {b}\n".encode())
|
|
ser.readline()
|
|
ser.write("1 -1\n".encode())
|
|
ser.readline()
|
|
except Exception as e:
|
|
print(f"[LIGHT] Buffer show error: {e}")
|
|
|
|
def mood_buffer_fill(color):
|
|
"""Fill buffer with solid color."""
|
|
for i in range(MOOD_LEDS):
|
|
mood_buffer[i] = color
|
|
|
|
def mood_buffer_set(index, color):
|
|
"""Set single buffer pixel."""
|
|
if 0 <= index < MOOD_LEDS:
|
|
mood_buffer[index] = color
|
|
|
|
# === Main Animation Loop ===
|
|
def main():
|
|
global current_state, jaw_level, running
|
|
|
|
print("[LIGHT] Vixy Light Service (Pico Edition) starting... 🦊")
|
|
|
|
if not init_serial():
|
|
print("[LIGHT] Warning: Could not connect to Pico, will retry...")
|
|
|
|
# Start HTTP server thread
|
|
http_thread = threading.Thread(target=run_http_server, daemon=True)
|
|
http_thread.start()
|
|
|
|
# Animation state
|
|
larson_pos = 0
|
|
larson_dir = 1
|
|
wave_offset = 0
|
|
last_jaw_level = -1
|
|
|
|
print("[LIGHT] Service ready 🦊")
|
|
|
|
try:
|
|
while running:
|
|
with state_lock:
|
|
state = current_state
|
|
jaw = jaw_level
|
|
|
|
color = STATE_COLORS.get(state, (255, 255, 255))
|
|
|
|
# Update jaw if changed
|
|
if jaw != last_jaw_level:
|
|
jaw_set_level(jaw, (0, 200, 255)) # Cyan jaw
|
|
last_jaw_level = jaw
|
|
|
|
if state == "idle":
|
|
brightness = solid_pulse(0.5, 0.2, 0.8)
|
|
c = scale_color(color, brightness)
|
|
mood_fill(*c)
|
|
time.sleep(0.05)
|
|
|
|
elif state == "listening":
|
|
brightness = solid_pulse(2.0, 0.6, 1.0)
|
|
c = scale_color(color, brightness)
|
|
mood_fill(*c)
|
|
time.sleep(0.05)
|
|
|
|
elif state == "responding":
|
|
# Traveling wave
|
|
for i in range(MOOD_LEDS):
|
|
wave = (math.sin(2 * math.pi * (i - wave_offset) / 15) + 1) / 2
|
|
mood_buffer[i] = scale_color(color, wave * 0.8 + 0.2)
|
|
mood_buffer_show()
|
|
wave_offset += 0.5
|
|
time.sleep(0.03)
|
|
|
|
|
|
elif state == "pleasure":
|
|
brightness = solid_pulse(0.8, 0.3, 0.9)
|
|
c = scale_color(color, brightness)
|
|
mood_fill(*c)
|
|
time.sleep(0.05)
|
|
|
|
elif state == "thinking":
|
|
# Larson scanner
|
|
mood_buffer_fill((0, 0, 0))
|
|
if 0 <= larson_pos < MOOD_LEDS:
|
|
mood_buffer[larson_pos] = color
|
|
# Trail
|
|
for t in range(1, 9):
|
|
idx = larson_pos - larson_dir * t
|
|
if 0 <= idx < MOOD_LEDS:
|
|
fade = math.cos((t / 8) * math.pi / 2)
|
|
mood_buffer[idx] = scale_color(color, fade)
|
|
mood_buffer_show()
|
|
larson_pos += larson_dir
|
|
if larson_pos >= MOOD_LEDS - 1 or larson_pos <= 0:
|
|
larson_dir *= -1
|
|
time.sleep(0.04)
|
|
|
|
elif state == "playful":
|
|
# Sparkle effect
|
|
for i in range(MOOD_LEDS):
|
|
mood_buffer[i] = scale_color(mood_buffer[i], 0.85)
|
|
for i in range(MOOD_LEDS):
|
|
if random.random() < 0.15:
|
|
mood_buffer[i] = color
|
|
mood_buffer_show()
|
|
time.sleep(0.05)
|
|
|
|
elif state == "commanding":
|
|
brightness = solid_pulse(3.0, 0.5, 1.0)
|
|
c = scale_color(color, brightness)
|
|
mood_fill(*c)
|
|
time.sleep(0.03)
|
|
|
|
elif state == "love":
|
|
brightness = solid_pulse(0.6, 0.4, 0.9)
|
|
c = scale_color(color, brightness)
|
|
mood_fill(*c)
|
|
time.sleep(0.05)
|
|
|
|
elif state == "sleep":
|
|
brightness = solid_pulse(0.2, 0.1, 0.3)
|
|
c = scale_color(color, brightness)
|
|
mood_fill(*c)
|
|
time.sleep(0.1)
|
|
|
|
else:
|
|
mood_fill(50, 50, 50)
|
|
time.sleep(0.1)
|
|
|
|
finally:
|
|
print("[LIGHT] Cleaning up LEDs...")
|
|
mood_clear()
|
|
jaw_clear()
|
|
if ser:
|
|
ser.close()
|
|
print("[LIGHT] Shutdown complete")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|