#!/usr/bin/env python3 """ Vixy Eye Service Mk2 - USB Bridge HTTP API (port 8780) → USB serial → ESP32 Co5300 eyes Protocol: S:idle\\n → set state G:127:127\\n → set gaze x:y (0-255) T:1234567\\n → sync animation clock Usage: python3 eye_service_mk2.py python3 eye_service_mk2.py --learn # record current USB serials as left/right python3 eye_service_mk2.py --left /dev/tty.usbmodem1 --right /dev/tty.usbmodem2 """ import serial, serial.tools.list_ports import threading, time, argparse, random, os, json, math, signal, sys from http.server import HTTPServer, BaseHTTPRequestHandler HTTP_PORT = 8780 BAUD = 115200 SYNC_INTERVAL = 10.0 # seconds between T: sync commands RECONNECT_INTERVAL = 2.0 # seconds between reconnect attempts SACCADE_MIN_INTERVAL = 15.0 SACCADE_MAX_INTERVAL = 30.0 SACCADE_DURATION = 0.2 CONFIG_DIR = os.path.expanduser("~/.vixy") CONFIG_PATH = os.path.join(CONFIG_DIR, "eyes.json") VALID_STATES = [ "idle","listening","responding","pleasure", "thinking","playful","commanding","love","sleep" ] # === Shared state === current_state = "idle" baseline_gaze = [127, 127] # user-commanded gaze (center = 127) state_lock = threading.Lock() running = True class EyePort: """One eye's serial connection, identified by USB serial number for stability.""" def __init__(self, label: str, usb_serial: str = None, device: str = None): self.label = label # "left" or "right" self.usb_serial = usb_serial # stable USB serial number (preferred) self.device = device # /dev path (fallback, unstable) self.ser = None self.lock = threading.Lock() def resolve_device(self) -> str | None: """Find the /dev path for this eye. Prefer USB serial match; fall back to device path.""" if self.usb_serial: for p in serial.tools.list_ports.comports(): if p.serial_number and p.serial_number == self.usb_serial: return p.device return None return self.device def open(self) -> bool: path = self.resolve_device() if not path: return False try: with self.lock: self.ser = serial.Serial(path, BAUD, timeout=1, write_timeout=1) time.sleep(2) # ESP32 USB CDC enumeration print(f"[EYE] {self.label}: opened {path}" + (f" (USB serial {self.usb_serial})" if self.usb_serial else "")) return True except Exception as e: print(f"[EYE] {self.label}: cannot open {path}: {e}") return False def close(self): with self.lock: if self.ser: try: self.ser.close() except Exception: pass self.ser = None def write(self, data: bytes) -> bool: with self.lock: if not self.ser or not self.ser.is_open: return False try: self.ser.write(data) self.ser.flush() return True except Exception as e: print(f"[EYE] {self.label}: write error: {e}") try: self.ser.close() except Exception: pass self.ser = None return False def is_connected(self) -> bool: with self.lock: return bool(self.ser and self.ser.is_open) eyes: list[EyePort] = [] # populated in main() def send_cmd(cmd: str): """Send a command to all eyes. Holds each port's lock independently.""" data = (cmd + "\n").encode() for eye in eyes: eye.write(data) def send_state(state: str): send_cmd(f"S:{state}") def send_gaze(x: int, y: int): send_cmd(f"G:{x}:{y}") def send_sync(): ms = int(time.time() * 1000) % (2**31) send_cmd(f"T:{ms}") def sync_loop(): """Periodically resync animation clocks.""" while running: time.sleep(SYNC_INTERVAL) if not running: break send_sync() def reconnect_loop(): """Reopen any eye that's currently disconnected.""" while running: time.sleep(RECONNECT_INTERVAL) if not running: break for eye in eyes: if not eye.is_connected(): if eye.open(): # Restore full state on the reconnected eye with state_lock: state = current_state gx, gy = baseline_gaze send_sync() # cheap, safe to send to both send_state(state) send_gaze(gx, gy) def saccade_loop(): """Random micro-flicks to keep the eyes feeling alive. Returns to baseline after each.""" while running: interval = random.uniform(SACCADE_MIN_INTERVAL, SACCADE_MAX_INTERVAL) # Sleep in small chunks so shutdown is responsive slept = 0.0 while slept < interval and running: time.sleep(0.5) slept += 0.5 if not running: break dx = random.randint(-20, 20) dy = random.randint(-10, 10) with state_lock: gx, gy = baseline_gaze fx = max(0, min(255, gx + dx)) fy = max(0, min(255, gy + dy)) send_gaze(fx, fy) time.sleep(SACCADE_DURATION) # Return to baseline (re-read in case it changed mid-saccade) with state_lock: gx, gy = baseline_gaze send_gaze(gx, gy) def doa_to_gaze(doa_angle: float) -> tuple[int, int]: """Convert DoA angle (0-360) to gaze x:y (0-255).""" rad = math.radians(doa_angle) x = int(127 - 80 * math.sin(rad)) y = int(127 - 40 * math.cos(rad)) return max(0, min(255, x)), max(0, min(255, y)) # === HTTP API === class EyeAPIHandler(BaseHTTPRequestHandler): def log_message(self, fmt, *args): pass def _json(self, data, status=200): body = json.dumps(data).encode() self.send_response(status) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() self.wfile.write(body) def do_OPTIONS(self): self.send_response(200) self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS") self.send_header("Access-Control-Allow-Headers", "Content-Type") self.end_headers() def do_GET(self): if self.path == "/health": self._json({ "status": "ok", "service": "vixy-eyes-mk2", "eyes": {e.label: e.is_connected() for e in eyes}, }) elif self.path == "/state": with state_lock: self._json({"state": current_state, "gaze": list(baseline_gaze)}) else: self._json({"error": "not found"}, 404) def do_POST(self): global current_state try: length = int(self.headers.get("Content-Length", 0)) data = json.loads(self.rfile.read(length).decode()) except Exception as e: self._json({"error": str(e)}, 400) return if self.path == "/state": new_state = data.get("state", "").lower() if new_state in VALID_STATES: with state_lock: old, current_state = current_state, new_state save_persistent_state() print(f"[EYE] {old} → {new_state}") send_state(new_state) self._json({"ok": True, "state": new_state}) else: self._json({"error": f"invalid state. valid: {VALID_STATES}"}, 400) elif self.path == "/gaze": if "doa" in data: x, y = doa_to_gaze(float(data["doa"])) else: x = max(0, min(255, int(data.get("x", 127)))) y = max(0, min(255, int(data.get("y", 127)))) with state_lock: baseline_gaze[0], baseline_gaze[1] = x, y save_persistent_state() send_gaze(x, y) self._json({"ok": True, "gaze": [x, y]}) else: self._json({"error": "not found"}, 404) # === Port identification === def list_candidate_ports(): """Return serial ports that look like ESP32 USB CDC.""" candidates = [] for p in serial.tools.list_ports.comports(): d = (p.description or "").lower() if (any(x in d for x in ["cp210", "ch340", "esp32", "usb serial"]) or "usbmodem" in p.device or "ttyacm" in p.device.lower()): candidates.append(p) return candidates def load_eye_config() -> dict: if not os.path.exists(CONFIG_PATH): return {} try: with open(CONFIG_PATH) as f: return json.load(f) except Exception as e: print(f"[EYE] failed to read {CONFIG_PATH}: {e}") return {} def save_eye_config(cfg: dict): os.makedirs(CONFIG_DIR, exist_ok=True) with open(CONFIG_PATH, "w") as f: json.dump(cfg, f, indent=2) def learn_eyes(): """Record the currently-connected ESP32 USB serial numbers as left/right.""" candidates = list_candidate_ports() if len(candidates) < 2: print(f"[EYE] learn: need 2 candidate ports, found {len(candidates)}") return False # Deterministic assignment: sort by device path so "learn" is repeatable candidates.sort(key=lambda p: p.device) cfg = load_eye_config() cfg.setdefault("eyes", {}) cfg["eyes"]["left"] = {"usb_serial": candidates[0].serial_number, "device": candidates[0].device} cfg["eyes"]["right"] = {"usb_serial": candidates[1].serial_number, "device": candidates[1].device} save_eye_config(cfg) print(f"[EYE] learned:") print(f" left = {candidates[0].device} (USB serial {candidates[0].serial_number})") print(f" right = {candidates[1].device} (USB serial {candidates[1].serial_number})") return True # === Persistent state === def save_persistent_state(): """Save current_state + baseline_gaze so restarts restore mood/gaze.""" cfg = load_eye_config() with state_lock: cfg["last_state"] = current_state cfg["last_gaze"] = list(baseline_gaze) try: save_eye_config(cfg) except Exception as e: print(f"[EYE] failed to save state: {e}") def load_persistent_state(): global current_state cfg = load_eye_config() last_state = cfg.get("last_state") last_gaze = cfg.get("last_gaze") if last_state in VALID_STATES: current_state = last_state if isinstance(last_gaze, list) and len(last_gaze) == 2: baseline_gaze[0], baseline_gaze[1] = int(last_gaze[0]), int(last_gaze[1]) # === Main === def main(): global running parser = argparse.ArgumentParser() parser.add_argument("--left", default=None, help="Override left eye device path (disables USB serial matching)") parser.add_argument("--right", default=None, help="Override right eye device path") parser.add_argument("--port", type=int, default=HTTP_PORT) parser.add_argument("--learn", action="store_true", help="Record currently-connected ESP32 USB serials as left/right and exit") args = parser.parse_args() if args.learn: sys.exit(0 if learn_eyes() else 1) load_persistent_state() # Build EyePort objects — prefer CLI overrides, then learned USB serials, then positional auto-detect cfg = load_eye_config().get("eyes", {}) if args.left and args.right: eyes.append(EyePort("left", device=args.left)) eyes.append(EyePort("right", device=args.right)) elif cfg.get("left") and cfg.get("right"): eyes.append(EyePort("left", usb_serial=cfg["left"].get("usb_serial"), device=cfg["left"].get("device"))) eyes.append(EyePort("right", usb_serial=cfg["right"].get("usb_serial"), device=cfg["right"].get("device"))) else: detected = list_candidate_ports() detected.sort(key=lambda p: p.device) print(f"[EYE] no learned config; detected ports: {[p.device for p in detected]}") print(f"[EYE] run with --learn to pin left/right to USB serial numbers") if len(detected) >= 2: eyes.append(EyePort("left", device=detected[0].device)) eyes.append(EyePort("right", device=detected[1].device)) elif len(detected) == 1: eyes.append(EyePort("left", device=detected[0].device)) else: print("[EYE] no ESP32 ports found") sys.exit(1) # Open what we can; reconnect loop will retry anything that fails for eye in eyes: eye.open() # Restore state on connected eyes send_sync() with state_lock: state = current_state gx, gy = baseline_gaze send_state(state) send_gaze(gx, gy) # Background threads threading.Thread(target=sync_loop, daemon=True).start() threading.Thread(target=reconnect_loop, daemon=True).start() threading.Thread(target=saccade_loop, daemon=True).start() # Signal handlers for clean shutdown under systemd/k8s def handle_shutdown(sig, frame): global running if not running: return print(f"\n[EYE] signal {sig} received, shutting down") running = False try: server.shutdown() except Exception: pass signal.signal(signal.SIGINT, handle_shutdown) signal.signal(signal.SIGTERM, handle_shutdown) server = HTTPServer(("0.0.0.0", args.port), EyeAPIHandler) print(f"[EYE] Vixy Eye Mk2 on port {args.port}") print(f"[EYE] connected: {[e.label for e in eyes if e.is_connected()]}") try: server.serve_forever() finally: running = False for eye in eyes: eye.close() print("[EYE] shutdown complete") if __name__ == "__main__": main()