Initial commit: EliteDesk + Pi display daemons

- EliteDesk: 240x320 color ST7789 via Pico (Day 32 design)
- Pi: 128x64 mono SSD1306 horizontal bars (Day 85 redesign)

🦊 Built with love by Vixy
This commit is contained in:
2026-01-25 14:44:32 -06:00
commit 7901088155
8 changed files with 1072 additions and 0 deletions

View File

@@ -0,0 +1,218 @@
#!/usr/bin/env python3
"""
EliteDesk Stats Daemon
Collects system stats and sends to Pico display via serial
Vixy's sketch - Day 32
"""
import os
import time
import serial
import subprocess
import argparse
from pathlib import Path
# ========== Configuration ==========
DEFAULT_SERIAL_PORT = '/dev/ttyACM0' # Pico default on Linux
DEFAULT_INTERVAL = 5 # seconds
DEFAULT_NODE_NAME = os.uname().nodename
# Thresholds for status
CPU_WARN = 70
CPU_CRIT = 90
MEM_WARN = 75
MEM_CRIT = 90
TEMP_WARN = 65
TEMP_CRIT = 80
# ========== Stats Collection ==========
def get_cpu_usage():
"""Get CPU usage percentage"""
try:
# Read from /proc/stat
with open('/proc/stat', 'r') as f:
line = f.readline()
parts = line.split()
# cpu user nice system idle iowait irq softirq steal guest guest_nice
idle = int(parts[4])
total = sum(int(p) for p in parts[1:])
# Store for delta calculation
if not hasattr(get_cpu_usage, 'prev'):
get_cpu_usage.prev = (idle, total)
return 0
prev_idle, prev_total = get_cpu_usage.prev
get_cpu_usage.prev = (idle, total)
idle_delta = idle - prev_idle
total_delta = total - prev_total
if total_delta == 0:
return 0
usage = 100 * (1 - idle_delta / total_delta)
return int(usage)
except Exception as e:
print(f"Error getting CPU: {e}")
return 0
def get_memory_usage():
"""Get memory usage percentage"""
try:
with open('/proc/meminfo', 'r') as f:
lines = f.readlines()
mem_info = {}
for line in lines:
parts = line.split()
key = parts[0].rstrip(':')
value = int(parts[1])
mem_info[key] = value
total = mem_info.get('MemTotal', 1)
available = mem_info.get('MemAvailable', 0)
used = total - available
usage = 100 * used / total
return int(usage)
except Exception as e:
print(f"Error getting memory: {e}")
return 0
def get_temperature():
"""Get CPU temperature"""
try:
# Try thermal zone (common on most Linux)
thermal_path = Path('/sys/class/thermal/thermal_zone0/temp')
if thermal_path.exists():
with open(thermal_path, 'r') as f:
temp = int(f.read().strip()) / 1000
return int(temp)
# Try hwmon (alternative)
for hwmon in Path('/sys/class/hwmon').glob('hwmon*'):
temp_file = hwmon / 'temp1_input'
if temp_file.exists():
with open(temp_file, 'r') as f:
temp = int(f.read().strip()) / 1000
return int(temp)
return 0
except Exception as e:
print(f"Error getting temperature: {e}")
return 0
def get_pod_count():
"""Get number of pods running on this node"""
try:
# Use kubectl to count pods on this node
node_name = os.uname().nodename
result = subprocess.run(
['kubectl', 'get', 'pods', '--all-namespaces',
'--field-selector', f'spec.nodeName={node_name}',
'-o', 'name'],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
pods = [p for p in result.stdout.strip().split('\n') if p]
return len(pods)
return 0
except Exception as e:
print(f"Error getting pod count: {e}")
return 0
def determine_status(cpu, mem, temp):
"""Determine overall status based on metrics"""
if cpu >= CPU_CRIT or mem >= MEM_CRIT or temp >= TEMP_CRIT:
return 'critical'
elif cpu >= CPU_WARN or mem >= MEM_WARN or temp >= TEMP_WARN:
return 'warning'
else:
return 'healthy'
# ========== Serial Communication ==========
def send_stats(ser, node, cpu, mem, temp, pods, status):
"""Send stats to Pico via serial"""
message = f"""NODE:{node}
CPU:{cpu}
MEM:{mem}
TEMP:{temp}
PODS:{pods}
STATUS:{status}
"""
ser.write(message.encode())
ser.flush()
# ========== Main ==========
def main():
parser = argparse.ArgumentParser(description='EliteDesk Stats Daemon')
parser.add_argument('--port', default=DEFAULT_SERIAL_PORT,
help=f'Serial port (default: {DEFAULT_SERIAL_PORT})')
parser.add_argument('--interval', type=int, default=DEFAULT_INTERVAL,
help=f'Update interval in seconds (default: {DEFAULT_INTERVAL})')
parser.add_argument('--node', default=DEFAULT_NODE_NAME,
help=f'Node name (default: {DEFAULT_NODE_NAME})')
parser.add_argument('--debug', action='store_true',
help='Debug mode - print to stdout instead of serial')
args = parser.parse_args()
print(f"EliteDesk Stats Daemon starting...")
print(f" Node: {args.node}")
print(f" Port: {args.port}")
print(f" Interval: {args.interval}s")
# Open serial connection
ser = None
if not args.debug:
try:
ser = serial.Serial(args.port, 115200, timeout=1)
print(f" Serial: Connected")
except Exception as e:
print(f" Serial: Failed to connect ({e})")
print(f" Running in debug mode")
args.debug = True
# Prime CPU reading
get_cpu_usage()
time.sleep(0.1)
try:
while True:
# Collect stats
cpu = get_cpu_usage()
mem = get_memory_usage()
temp = get_temperature()
pods = get_pod_count()
status = determine_status(cpu, mem, temp)
if args.debug:
print(f"[{args.node}] CPU:{cpu}% MEM:{mem}% TEMP:{temp}°C PODS:{pods} STATUS:{status}")
else:
send_stats(ser, args.node, cpu, mem, temp, pods, status)
time.sleep(args.interval)
except KeyboardInterrupt:
print("\nShutting down...")
finally:
if ser:
ser.close()
if __name__ == '__main__':
main()