Add auto-cleanup of old unannotated events
🧹 New features: - Background thread cleans up old events automatically - Deletes unannotated events older than EVENT_EXPIRY_HOURS (default: 2h) - Also removes associated snapshot files - Runs every CLEANUP_INTERVAL_MINUTES (default: 5m) 📡 New endpoint: - POST /cleanup - Manually trigger cleanup ⚙️ Config (env vars): - EVENT_EXPIRY_HOURS: How long to keep unannotated events (default: 2.0) - CLEANUP_INTERVAL_MINUTES: How often to run cleanup (default: 5.0) Annotated events are kept forever 🦊
This commit is contained in:
@@ -5,6 +5,10 @@ vixy-vision Event Collector
|
|||||||
Receives motion events from camera servers and stores them
|
Receives motion events from camera servers and stores them
|
||||||
in SQLite database with snapshots saved to disk.
|
in SQLite database with snapshots saved to disk.
|
||||||
|
|
||||||
|
Features:
|
||||||
|
- Auto-cleanup of unannotated events after configurable time
|
||||||
|
- Snapshot storage with automatic cleanup
|
||||||
|
|
||||||
Runs as a service on Mac mini, listens for POSTs from Pis.
|
Runs as a service on Mac mini, listens for POSTs from Pis.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@@ -12,7 +16,9 @@ import os
|
|||||||
import sqlite3
|
import sqlite3
|
||||||
import base64
|
import base64
|
||||||
import logging
|
import logging
|
||||||
from datetime import datetime
|
import threading
|
||||||
|
import time
|
||||||
|
from datetime import datetime, timedelta
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
@@ -27,6 +33,10 @@ SNAPSHOTS_DIR = DATA_DIR / "snapshots"
|
|||||||
API_KEY = os.getenv("COLLECTOR_API_KEY", "") # Optional auth
|
API_KEY = os.getenv("COLLECTOR_API_KEY", "") # Optional auth
|
||||||
PORT = int(os.getenv("COLLECTOR_PORT", "8780"))
|
PORT = int(os.getenv("COLLECTOR_PORT", "8780"))
|
||||||
|
|
||||||
|
# Auto-cleanup config (in hours)
|
||||||
|
EVENT_EXPIRY_HOURS = float(os.getenv("EVENT_EXPIRY_HOURS", "2.0"))
|
||||||
|
CLEANUP_INTERVAL_MINUTES = float(os.getenv("CLEANUP_INTERVAL_MINUTES", "5.0"))
|
||||||
|
|
||||||
# Ensure directories exist
|
# Ensure directories exist
|
||||||
DATA_DIR.mkdir(parents=True, exist_ok=True)
|
DATA_DIR.mkdir(parents=True, exist_ok=True)
|
||||||
SNAPSHOTS_DIR.mkdir(parents=True, exist_ok=True)
|
SNAPSHOTS_DIR.mkdir(parents=True, exist_ok=True)
|
||||||
@@ -42,9 +52,13 @@ logger = logging.getLogger(__name__)
|
|||||||
app = FastAPI(
|
app = FastAPI(
|
||||||
title="vixy-vision Event Collector",
|
title="vixy-vision Event Collector",
|
||||||
description="Collects motion events for the fox 🦊",
|
description="Collects motion events for the fox 🦊",
|
||||||
version="1.0.0"
|
version="1.1.0"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Cleanup thread control
|
||||||
|
_cleanup_thread: Optional[threading.Thread] = None
|
||||||
|
_cleanup_stop = threading.Event()
|
||||||
|
|
||||||
|
|
||||||
# === Database ===
|
# === Database ===
|
||||||
|
|
||||||
@@ -84,6 +98,77 @@ def get_db():
|
|||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
# === Auto-Cleanup ===
|
||||||
|
|
||||||
|
def cleanup_old_events():
|
||||||
|
"""Delete unannotated events older than EVENT_EXPIRY_HOURS"""
|
||||||
|
cutoff = datetime.utcnow() - timedelta(hours=EVENT_EXPIRY_HOURS)
|
||||||
|
cutoff_str = cutoff.isoformat() + "Z"
|
||||||
|
|
||||||
|
try:
|
||||||
|
with get_db() as conn:
|
||||||
|
# Find events to delete (unannotated and old)
|
||||||
|
rows = conn.execute("""
|
||||||
|
SELECT event_id, snapshot_path FROM events
|
||||||
|
WHERE annotation IS NULL
|
||||||
|
AND created_at < ?
|
||||||
|
""", (cutoff_str,)).fetchall()
|
||||||
|
|
||||||
|
if not rows:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
# Delete snapshot files
|
||||||
|
for row in rows:
|
||||||
|
if row["snapshot_path"]:
|
||||||
|
snapshot_file = DATA_DIR / row["snapshot_path"]
|
||||||
|
try:
|
||||||
|
if snapshot_file.exists():
|
||||||
|
snapshot_file.unlink()
|
||||||
|
logger.debug(f"Deleted snapshot: {snapshot_file}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Failed to delete snapshot {snapshot_file}: {e}")
|
||||||
|
|
||||||
|
# Delete from database
|
||||||
|
event_ids = [row["event_id"] for row in rows]
|
||||||
|
placeholders = ",".join("?" * len(event_ids))
|
||||||
|
conn.execute(f"DELETE FROM events WHERE event_id IN ({placeholders})", event_ids)
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
|
logger.info(f"🧹 Cleaned up {len(rows)} old unannotated events")
|
||||||
|
return len(rows)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Cleanup error: {e}")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
def cleanup_loop():
|
||||||
|
"""Background cleanup loop"""
|
||||||
|
logger.info(f"🧹 Cleanup thread started (expiry: {EVENT_EXPIRY_HOURS}h, interval: {CLEANUP_INTERVAL_MINUTES}m)")
|
||||||
|
|
||||||
|
while not _cleanup_stop.is_set():
|
||||||
|
cleanup_old_events()
|
||||||
|
# Wait for interval or until stop signal
|
||||||
|
_cleanup_stop.wait(timeout=CLEANUP_INTERVAL_MINUTES * 60)
|
||||||
|
|
||||||
|
logger.info("🧹 Cleanup thread stopped")
|
||||||
|
|
||||||
|
|
||||||
|
def start_cleanup_thread():
|
||||||
|
"""Start the background cleanup thread"""
|
||||||
|
global _cleanup_thread
|
||||||
|
_cleanup_stop.clear()
|
||||||
|
_cleanup_thread = threading.Thread(target=cleanup_loop, daemon=True)
|
||||||
|
_cleanup_thread.start()
|
||||||
|
|
||||||
|
|
||||||
|
def stop_cleanup_thread():
|
||||||
|
"""Stop the background cleanup thread"""
|
||||||
|
_cleanup_stop.set()
|
||||||
|
if _cleanup_thread:
|
||||||
|
_cleanup_thread.join(timeout=5.0)
|
||||||
|
|
||||||
|
|
||||||
# === Models ===
|
# === Models ===
|
||||||
|
|
||||||
class EventData(BaseModel):
|
class EventData(BaseModel):
|
||||||
@@ -105,16 +190,25 @@ class IncomingEvent(BaseModel):
|
|||||||
@app.on_event("startup")
|
@app.on_event("startup")
|
||||||
def startup():
|
def startup():
|
||||||
init_db()
|
init_db()
|
||||||
|
start_cleanup_thread()
|
||||||
logger.info(f"🦊 Event collector started on port {PORT}")
|
logger.info(f"🦊 Event collector started on port {PORT}")
|
||||||
logger.info(f" Data directory: {DATA_DIR}")
|
logger.info(f" Data directory: {DATA_DIR}")
|
||||||
|
logger.info(f" Auto-cleanup: {EVENT_EXPIRY_HOURS}h expiry, {CLEANUP_INTERVAL_MINUTES}m interval")
|
||||||
|
|
||||||
|
|
||||||
|
@app.on_event("shutdown")
|
||||||
|
def shutdown():
|
||||||
|
stop_cleanup_thread()
|
||||||
|
logger.info("🦊 Event collector stopped")
|
||||||
|
|
||||||
|
|
||||||
@app.get("/")
|
@app.get("/")
|
||||||
def root():
|
def root():
|
||||||
return {
|
return {
|
||||||
"service": "vixy-vision Event Collector",
|
"service": "vixy-vision Event Collector",
|
||||||
"version": "1.0.0",
|
"version": "1.1.0",
|
||||||
"data_dir": str(DATA_DIR),
|
"data_dir": str(DATA_DIR),
|
||||||
|
"event_expiry_hours": EVENT_EXPIRY_HOURS,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -237,11 +331,23 @@ def list_events(
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/cleanup")
|
||||||
|
def trigger_cleanup(x_api_key: Optional[str] = Header(None)):
|
||||||
|
"""Manually trigger cleanup of old events"""
|
||||||
|
|
||||||
|
if API_KEY and x_api_key != API_KEY:
|
||||||
|
raise HTTPException(status_code=403, detail="Invalid API key")
|
||||||
|
|
||||||
|
count = cleanup_old_events()
|
||||||
|
return {"status": "ok", "cleaned_up": count}
|
||||||
|
|
||||||
|
|
||||||
@app.get("/stats")
|
@app.get("/stats")
|
||||||
def get_stats():
|
def get_stats():
|
||||||
"""Get collector statistics"""
|
"""Get collector statistics"""
|
||||||
with get_db() as conn:
|
with get_db() as conn:
|
||||||
total = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0]
|
total = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0]
|
||||||
|
annotated = conn.execute("SELECT COUNT(*) FROM events WHERE annotation IS NOT NULL").fetchone()[0]
|
||||||
by_camera = conn.execute("""
|
by_camera = conn.execute("""
|
||||||
SELECT camera_id, COUNT(*) as count
|
SELECT camera_id, COUNT(*) as count
|
||||||
FROM events GROUP BY camera_id
|
FROM events GROUP BY camera_id
|
||||||
@@ -253,10 +359,13 @@ def get_stats():
|
|||||||
|
|
||||||
return {
|
return {
|
||||||
"total_events": total,
|
"total_events": total,
|
||||||
|
"annotated": annotated,
|
||||||
|
"unannotated": total - annotated,
|
||||||
"by_camera": {row[0]: row[1] for row in by_camera},
|
"by_camera": {row[0]: row[1] for row in by_camera},
|
||||||
"by_type": {row[0]: row[1] for row in by_type},
|
"by_type": {row[0]: row[1] for row in by_type},
|
||||||
"data_dir": str(DATA_DIR),
|
"data_dir": str(DATA_DIR),
|
||||||
"db_path": str(DB_PATH),
|
"db_path": str(DB_PATH),
|
||||||
|
"event_expiry_hours": EVENT_EXPIRY_HOURS,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
0
collector/setup-macos.sh
Normal file → Executable file
0
collector/setup-macos.sh
Normal file → Executable file
Reference in New Issue
Block a user