#!/usr/bin/env python3 """ Pi Cluster Node Status Display 128x64 I2C OLED (SSD1306/SH1106) via luma.oled Horizontal bar design - matches EliteDesk aesthetic Shows: hostname, pod status, CPU/MEM/TEMP bars, uptime Vixy's design - Day 85 (2026-01-25) """ import os import time import math import subprocess import psutil from PIL import Image, ImageDraw, ImageFont from luma.core.interface.serial import i2c # ---------- Config ---------- HOSTNAME = os.uname().nodename ROTATE_180 = os.getenv("ROTATE_180", "0") == "1" REFRESH_MS = int(os.getenv("REFRESH_MS", "500")) # Thresholds for warning/critical CPU_WARN, CPU_CRIT = 70, 90 MEM_WARN, MEM_CRIT = 75, 90 TEMP_WARN, TEMP_CRIT = 65, 80 # ---------- I2C / Device ---------- def _to_7bit(addr): return addr >> 1 if addr in (0x78, 0x7A) else addr def open_device(): rotate_val = 2 if ROTATE_180 else 0 addrs = [] if "OLED_ADDR" in os.environ: try: addrs.append(int(os.environ["OLED_ADDR"], 16)) except ValueError: pass addrs += [0x3C, 0x3D, 0x78, 0x7A] for raw in addrs: addr = _to_7bit(raw) try: serial = i2c(port=1, address=addr) try: from luma.oled.device import ssd1306 return ssd1306(serial, width=128, height=64, rotate=rotate_val) except Exception: from luma.oled.device import sh1106 return sh1106(serial, width=128, height=64, rotate=rotate_val) except Exception: continue raise SystemExit("No OLED found. Check wiring & i2cdetect.") device = open_device() W, H = device.width, device.height # 128x64 # ---------- Fonts ---------- FONT_PATHS = [ "/usr/share/fonts/truetype/dejavu/DejaVuSansMono.ttf", "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", ] def best_font(size): for p in FONT_PATHS: try: return ImageFont.truetype(p, size) except Exception: continue return ImageFont.load_default() FONT_SM = best_font(9) FONT_MD = best_font(11) # ---------- Helpers ---------- def get_temp_c(): """Get CPU temperature in Celsius""" try: out = subprocess.check_output(["vcgencmd", "measure_temp"], timeout=0.4).decode() return float(out.split("=")[1].split("'")[0]) except Exception: pass try: temps = psutil.sensors_temperatures() for key in ("cpu-thermal", "cpu_thermal", "soc_thermal"): if key in temps and temps[key]: return float(temps[key][0].current) except Exception: pass for zone in ("thermal_zone0", "thermal_zone1"): p = f"/sys/class/thermal/{zone}/temp" if os.path.exists(p): try: return int(open(p).read().strip()) / 1000.0 except Exception: pass return float("nan") def get_uptime_hours(): """Get system uptime in hours""" try: with open("/proc/uptime", "r") as f: return int(float(f.read().split()[0]) // 3600) except Exception: return 0 def get_pod_count(): """Get number of pods running on this node""" try: result = subprocess.run( ['kubectl', 'get', 'pods', '--all-namespaces', '--field-selector', f'spec.nodeName={HOSTNAME}', '-o', 'name'], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: pods = [p for p in result.stdout.strip().split('\n') if p] return len(pods) except Exception: pass return 0 def get_status(cpu, mem, temp): """Determine overall health status""" if cpu >= CPU_CRIT or mem >= MEM_CRIT or temp >= TEMP_CRIT: return 'CRIT' elif cpu >= CPU_WARN or mem >= MEM_WARN or temp >= TEMP_WARN: return 'WARN' return 'OK' def draw_bar(draw, x, y, width, height, percent, warn_thresh, crit_thresh): """Draw a horizontal progress bar""" # Background (empty bar) draw.rectangle([x, y, x + width, y + height], outline=255, fill=0) # Fill based on percentage fill_width = int((width - 2) * min(100, percent) / 100) if fill_width > 0: draw.rectangle([x + 1, y + 1, x + 1 + fill_width, y + height - 1], fill=255) def draw_pod_indicators(draw, x, y, count, expected=2): """Draw pod status circles""" for i in range(expected): cx = x + i * 10 if i < count: # Filled circle for running pod draw.ellipse([cx, y, cx + 6, y + 6], fill=255, outline=255) else: # Empty circle for missing pod draw.ellipse([cx, y, cx + 6, y + 6], fill=0, outline=255) # ---------- Main Loop ---------- psutil.cpu_percent(None) # Prime CPU sampler # Layout constants BAR_X = 28 # Bar start X (after label) BAR_W = 70 # Bar width BAR_H = 7 # Bar height LINE_H = 14 # Line height while True: # Gather stats cpu = psutil.cpu_percent(interval=None) mem = psutil.virtual_memory().percent temp = get_temp_c() temp_val = 0 if math.isnan(temp) else int(round(temp)) uptime_h = get_uptime_hours() pods = get_pod_count() status = get_status(cpu, mem, temp_val) # Create frame image = Image.new("1", (W, H), 0) draw = ImageDraw.Draw(image) # === Row 0: Header === # Hostname left, pods center, uptime+status right draw.text((0, 0), HOSTNAME, font=FONT_MD, fill=255) # Pod indicators (center-ish) draw_pod_indicators(draw, 58, 2, pods, expected=2) # Uptime + status indicator uptime_str = f"{uptime_h}h" status_char = "●" if status == "OK" else ("◐" if status == "WARN" else "○") draw.text((90, 0), f"{uptime_str}{status_char}", font=FONT_SM, fill=255) # === Separator line === draw.line([(0, 12), (W, 12)], fill=255) # === Row 1: CPU === y = 16 draw.text((0, y), "CPU", font=FONT_SM, fill=255) draw_bar(draw, BAR_X, y + 1, BAR_W, BAR_H, cpu, CPU_WARN, CPU_CRIT) draw.text((BAR_X + BAR_W + 3, y), f"{int(cpu):2d}%", font=FONT_SM, fill=255) # === Row 2: MEM === y = 16 + LINE_H draw.text((0, y), "MEM", font=FONT_SM, fill=255) draw_bar(draw, BAR_X, y + 1, BAR_W, BAR_H, mem, MEM_WARN, MEM_CRIT) draw.text((BAR_X + BAR_W + 3, y), f"{int(mem):2d}%", font=FONT_SM, fill=255) # === Row 3: TMP === y = 16 + LINE_H * 2 draw.text((0, y), "TMP", font=FONT_SM, fill=255) # Temp bar: 0-100°C range temp_pct = min(100, max(0, temp_val)) draw_bar(draw, BAR_X, y + 1, BAR_W, BAR_H, temp_pct, TEMP_WARN, TEMP_CRIT) draw.text((BAR_X + BAR_W + 3, y), f"{temp_val:2d}°", font=FONT_SM, fill=255) # === Bottom separator === draw.line([(0, 51), (W, 51)], fill=255) # === Row 4: Status summary === y = 54 if status == "OK": draw.text((W // 2 - 10, y), "● OK", font=FONT_SM, fill=255) elif status == "WARN": draw.text((W // 2 - 16, y), "◐ WARN", font=FONT_SM, fill=255) else: draw.text((W // 2 - 16, y), "○ CRIT", font=FONT_SM, fill=255) # Display device.display(image) time.sleep(REFRESH_MS / 1000.0)