""" Job Queue Manager In-memory job queue for managing image generation requests. """ import asyncio import uuid from datetime import datetime from typing import Dict, Optional, Literal from dataclasses import dataclass, field import logging import config logger = logging.getLogger(__name__) @dataclass class Job: """Represents a single generation job.""" job_id: str prompt: str negative_prompt: Optional[str] params: Dict status: Literal["queued", "processing", "completed", "failed"] progress: int = 0 created_at: datetime = field(default_factory=datetime.utcnow) started_at: Optional[datetime] = None completed_at: Optional[datetime] = None error: Optional[str] = None result_path: Optional[str] = None class QueueManager: """Manages the job queue and job lifecycle.""" def __init__(self): self.jobs: Dict[str, Job] = {} self.queue: asyncio.Queue = asyncio.Queue(maxsize=config.MAX_QUEUE_SIZE) self.active_jobs: int = 0 self._lock = asyncio.Lock() async def submit_job( self, prompt: str, negative_prompt: Optional[str], params: Dict ) -> str: """ Submit a new generation job. Args: prompt: Text prompt negative_prompt: Negative prompt params: Generation parameters Returns: job_id: Unique job identifier Raises: asyncio.QueueFull: If queue is at capacity """ job_id = str(uuid.uuid4()) job = Job( job_id=job_id, prompt=prompt, negative_prompt=negative_prompt, params=params, status="queued" ) async with self._lock: self.jobs[job_id] = job # Add to queue (raises QueueFull if at capacity) await self.queue.put(job_id) logger.info(f"Job {job_id} submitted: '{prompt[:50]}...'") return job_id async def get_next_job(self) -> Optional[str]: """ Get the next job from the queue (blocks until available). Returns: job_id or None if queue is empty """ try: job_id = await self.queue.get() return job_id except asyncio.CancelledError: return None async def start_job(self, job_id: str): """Mark a job as started.""" async with self._lock: if job_id in self.jobs: self.jobs[job_id].status = "processing" self.jobs[job_id].started_at = datetime.utcnow() self.active_jobs += 1 logger.info(f"Job {job_id} started processing") async def update_progress(self, job_id: str, progress: int): """Update job progress (0-100).""" async with self._lock: if job_id in self.jobs: self.jobs[job_id].progress = min(100, max(0, progress)) async def complete_job(self, job_id: str, result_path: str): """Mark a job as completed successfully.""" async with self._lock: if job_id in self.jobs: self.jobs[job_id].status = "completed" self.jobs[job_id].completed_at = datetime.utcnow() self.jobs[job_id].progress = 100 self.jobs[job_id].result_path = result_path self.active_jobs = max(0, self.active_jobs - 1) logger.info(f"Job {job_id} completed successfully") async def fail_job(self, job_id: str, error: str): """Mark a job as failed.""" async with self._lock: if job_id in self.jobs: self.jobs[job_id].status = "failed" self.jobs[job_id].completed_at = datetime.utcnow() self.jobs[job_id].error = error self.active_jobs = max(0, self.active_jobs - 1) logger.error(f"Job {job_id} failed: {error}") def get_job(self, job_id: str) -> Optional[Job]: """Get job by ID.""" return self.jobs.get(job_id) def get_queue_size(self) -> int: """Get current queue size.""" return self.queue.qsize() def get_active_jobs(self) -> int: """Get number of currently processing jobs.""" return self.active_jobs async def cleanup_old_jobs(self, max_age_hours: int = 24): """Remove old completed/failed jobs from memory.""" cutoff = datetime.utcnow().timestamp() - (max_age_hours * 3600) async with self._lock: to_remove = [] for job_id, job in self.jobs.items(): if job.status in ["completed", "failed"] and job.completed_at: if job.completed_at.timestamp() < cutoff: to_remove.append(job_id) for job_id in to_remove: del self.jobs[job_id] if to_remove: logger.info(f"Cleaned up {len(to_remove)} old jobs from memory") # Global queue manager instance queue_manager = QueueManager()