Files
head-lights/light_service.py
Alex c74371a24a Move all animations to Pico firmware, Pi becomes HTTP-serial bridge
Pico runs all 9 mood animations and jaw modes locally instead of
receiving per-LED serial commands from the Pi. New protocol: bare
integers for fire-and-forget jaw amplitude, "mood X" / "jaw X" for
mode switches. Cuts light_service.py from 409 to 193 lines.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-31 21:34:03 -06:00

193 lines
6.0 KiB
Python

#!/usr/bin/env python3
"""
Vixy Light Service - HTTP-to-Serial Bridge
Thin bridge between HTTP API and Pico 2 animation engine.
All animations run on the Pico. This service just forwards mode commands.
Hardware:
- Raspberry Pi Pico 2 connected via USB (/dev/ttyACM0)
- Pico runs all LED animations locally
API:
GET /health - Service health
GET /state - Current state
POST /state - Set state {"state": "listening"}
POST /jaw/level - Set jaw level {"level": 0-100}
POST /jaw/mode - Set jaw mode {"mode": "talking"}
"""
import time
import threading
import json
import signal
from http.server import HTTPServer, BaseHTTPRequestHandler
import serial
# === Configuration ===
HTTP_PORT = 8781
SERIAL_PORT = "/dev/ttyACM0"
SERIAL_BAUD = 115200
VALID_STATES = ["idle", "listening", "responding", "pleasure", "thinking", "playful", "commanding", "love", "sleep"]
VALID_JAW_MODES = ["idle", "talking", "off"]
# === Shared State ===
current_state = "idle"
jaw_level = 0
serial_lock = threading.Lock()
running = True
ser = None
# === Serial Communication ===
def init_serial():
global ser
try:
ser = serial.Serial(SERIAL_PORT, SERIAL_BAUD, timeout=1)
time.sleep(0.5)
ser.read_all()
print(f"[LIGHT] Connected to Pico on {SERIAL_PORT}")
return True
except Exception as e:
print(f"[LIGHT] Serial error: {e}")
return False
def send_mode_command(cmd):
"""Send a mode command to Pico, wait for OK response."""
global ser
with serial_lock:
try:
if ser is None or not ser.is_open:
if not init_serial():
return False
ser.write(f"{cmd}\n".encode())
response = ser.readline().decode().strip()
return response == "OK"
except Exception as e:
print(f"[LIGHT] Serial error: {e}")
ser = None
return False
def send_amplitude(level):
"""Send bare integer for jaw amplitude. Fire-and-forget."""
global ser
with serial_lock:
try:
if ser is None or not ser.is_open:
if not init_serial():
return False
ser.write(f"{level}\n".encode())
return True
except Exception as e:
print(f"[LIGHT] Serial error: {e}")
ser = None
return False
# === HTTP API Handler ===
class LightAPIHandler(BaseHTTPRequestHandler):
def log_message(self, format, *args):
pass
def _send_json(self, data, status=200):
self.send_response(status)
self.send_header('Content-Type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
self.wfile.write(json.dumps(data).encode())
def do_GET(self):
if self.path == '/health':
self._send_json({
"status": "ok",
"service": "vixy-lights",
"serial": ser is not None and ser.is_open
})
elif self.path == '/state':
self._send_json({"state": current_state, "jaw_level": jaw_level})
else:
self._send_json({"error": "Not found"}, 404)
def do_POST(self):
global current_state, jaw_level
try:
content_length = int(self.headers['Content-Length'])
body = self.rfile.read(content_length)
data = json.loads(body.decode())
if self.path == '/state':
new_state = data.get('state', '').lower()
if new_state in VALID_STATES:
old_state = current_state
current_state = new_state
send_mode_command(f"mood {new_state}")
print(f"[LIGHT] State: {old_state} -> {new_state}")
self._send_json({"success": True, "state": new_state})
else:
self._send_json({"error": f"Invalid state. Valid: {VALID_STATES}"}, 400)
elif self.path == '/jaw/level':
level = int(data.get('level', 0))
level = max(0, min(100, level))
jaw_level = level
send_amplitude(level)
self._send_json({"success": True, "jaw_level": level})
elif self.path == '/jaw/mode':
mode = data.get('mode', '').lower()
if mode in VALID_JAW_MODES:
send_mode_command(f"jaw {mode}")
self._send_json({"success": True, "jaw_mode": mode})
else:
self._send_json({"error": f"Invalid jaw mode. Valid: {VALID_JAW_MODES}"}, 400)
else:
self._send_json({"error": "Not found"}, 404)
except Exception as e:
self._send_json({"error": str(e)}, 400)
def do_OPTIONS(self):
self.send_response(200)
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
self.end_headers()
# === Signal Handler ===
def signal_handler(sig, frame):
global running
print("\n[LIGHT] Shutting down...")
running = False
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# === Main ===
def main():
global running
print("[LIGHT] Vixy Light Service starting...")
if not init_serial():
print("[LIGHT] Warning: Could not connect to Pico, will retry...")
# Set initial mood on Pico
send_mode_command("mood idle")
server = HTTPServer(('0.0.0.0', HTTP_PORT), LightAPIHandler)
print(f"[LIGHT] HTTP API listening on port {HTTP_PORT}")
print("[LIGHT] Service ready")
try:
while running:
server.handle_request()
finally:
print("[LIGHT] Cleaning up...")
send_mode_command("mood off")
send_mode_command("jaw off")
if ser:
ser.close()
print("[LIGHT] Shutdown complete")
if __name__ == "__main__":
main()