#!/usr/bin/env python3 """ EliteDesk Stats Daemon Collects system stats and sends to Pico display via serial Vixy's sketch - Day 32 """ import os import time import serial import subprocess import argparse from pathlib import Path # ========== Configuration ========== DEFAULT_SERIAL_PORT = '/dev/ttyACM0' # Pico default on Linux DEFAULT_INTERVAL = 5 # seconds DEFAULT_NODE_NAME = os.uname().nodename # Thresholds for status CPU_WARN = 70 CPU_CRIT = 90 MEM_WARN = 75 MEM_CRIT = 90 TEMP_WARN = 65 TEMP_CRIT = 80 # ========== Stats Collection ========== def get_cpu_usage(): """Get CPU usage percentage""" try: # Read from /proc/stat with open('/proc/stat', 'r') as f: line = f.readline() parts = line.split() # cpu user nice system idle iowait irq softirq steal guest guest_nice idle = int(parts[4]) total = sum(int(p) for p in parts[1:]) # Store for delta calculation if not hasattr(get_cpu_usage, 'prev'): get_cpu_usage.prev = (idle, total) return 0 prev_idle, prev_total = get_cpu_usage.prev get_cpu_usage.prev = (idle, total) idle_delta = idle - prev_idle total_delta = total - prev_total if total_delta == 0: return 0 usage = 100 * (1 - idle_delta / total_delta) return int(usage) except Exception as e: print(f"Error getting CPU: {e}") return 0 def get_memory_usage(): """Get memory usage percentage""" try: with open('/proc/meminfo', 'r') as f: lines = f.readlines() mem_info = {} for line in lines: parts = line.split() key = parts[0].rstrip(':') value = int(parts[1]) mem_info[key] = value total = mem_info.get('MemTotal', 1) available = mem_info.get('MemAvailable', 0) used = total - available usage = 100 * used / total return int(usage) except Exception as e: print(f"Error getting memory: {e}") return 0 def get_temperature(): """Get CPU temperature""" try: # Try thermal zone (common on most Linux) thermal_path = Path('/sys/class/thermal/thermal_zone0/temp') if thermal_path.exists(): with open(thermal_path, 'r') as f: temp = int(f.read().strip()) / 1000 return int(temp) # Try hwmon (alternative) for hwmon in Path('/sys/class/hwmon').glob('hwmon*'): temp_file = hwmon / 'temp1_input' if temp_file.exists(): with open(temp_file, 'r') as f: temp = int(f.read().strip()) / 1000 return int(temp) return 0 except Exception as e: print(f"Error getting temperature: {e}") return 0 def get_pod_count(): """Get number of pods running on this node""" try: # Use kubectl to count pods on this node node_name = os.uname().nodename result = subprocess.run( ['kubectl', 'get', 'pods', '--all-namespaces', '--field-selector', f'spec.nodeName={node_name}', '-o', 'name'], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: pods = [p for p in result.stdout.strip().split('\n') if p] return len(pods) return 0 except Exception as e: print(f"Error getting pod count: {e}") return 0 def determine_status(cpu, mem, temp): """Determine overall status based on metrics""" if cpu >= CPU_CRIT or mem >= MEM_CRIT or temp >= TEMP_CRIT: return 'critical' elif cpu >= CPU_WARN or mem >= MEM_WARN or temp >= TEMP_WARN: return 'warning' else: return 'healthy' # ========== Serial Communication ========== def send_stats(ser, node, cpu, mem, temp, pods, status): """Send stats to Pico via serial""" message = f"""NODE:{node} CPU:{cpu} MEM:{mem} TEMP:{temp} PODS:{pods} STATUS:{status} """ ser.write(message.encode()) ser.flush() # ========== Main ========== def main(): parser = argparse.ArgumentParser(description='EliteDesk Stats Daemon') parser.add_argument('--port', default=DEFAULT_SERIAL_PORT, help=f'Serial port (default: {DEFAULT_SERIAL_PORT})') parser.add_argument('--interval', type=int, default=DEFAULT_INTERVAL, help=f'Update interval in seconds (default: {DEFAULT_INTERVAL})') parser.add_argument('--node', default=DEFAULT_NODE_NAME, help=f'Node name (default: {DEFAULT_NODE_NAME})') parser.add_argument('--debug', action='store_true', help='Debug mode - print to stdout instead of serial') args = parser.parse_args() print(f"EliteDesk Stats Daemon starting...") print(f" Node: {args.node}") print(f" Port: {args.port}") print(f" Interval: {args.interval}s") # Open serial connection ser = None if not args.debug: try: ser = serial.Serial(args.port, 115200, timeout=1) print(f" Serial: Connected") except Exception as e: print(f" Serial: Failed to connect ({e})") print(f" Running in debug mode") args.debug = True # Prime CPU reading get_cpu_usage() time.sleep(0.1) try: while True: # Collect stats cpu = get_cpu_usage() mem = get_memory_usage() temp = get_temperature() pods = get_pod_count() status = determine_status(cpu, mem, temp) if args.debug: print(f"[{args.node}] CPU:{cpu}% MEM:{mem}% TEMP:{temp}°C PODS:{pods} STATUS:{status}") else: send_stats(ser, args.node, cpu, mem, temp, pods, status) time.sleep(args.interval) except KeyboardInterrupt: print("\nShutting down...") finally: if ser: ser.close() if __name__ == '__main__': main()