""" In-memory priority queue for AI requests. Jobs are ordered by (priority, sequence_number) so HIGH=1 jobs always run before NORMAL=3 and LOW=5 regardless of arrival order. Within the same priority level insertion order (FIFO) is preserved via the monotonically incrementing seq counter. The QueueService runs a single async worker task. It can be paused (current job finishes, no new jobs start), resumed, started, or stopped from outside. Module-level singleton `queue_service` is imported by routers and the app lifespan. """ import asyncio import uuid from dataclasses import dataclass, field from datetime import datetime, timezone from enum import IntEnum class Priority(IntEnum): HIGH = 1 NORMAL = 3 LOW = 5 PRIORITY_MAP: dict[str, Priority] = { "high": Priority.HIGH, "normal": Priority.NORMAL, "low": Priority.LOW, } @dataclass class Job: id: str priority: Priority seq: int request: object # ChatRequest — typed as object to avoid circular import future: asyncio.Future status: str = "pending" # pending | processing | done | failed | cancelled created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) started_at: datetime | None = None finished_at: datetime | None = None result: object = None error: str | None = None def __lt__(self, other: "Job") -> bool: # asyncio.PriorityQueue requires items to be orderable return (self.priority, self.seq) < (other.priority, other.seq) class QueueService: def __init__(self) -> None: self._queue: asyncio.PriorityQueue = asyncio.PriorityQueue() self._jobs: dict[str, Job] = {} self._seq: int = 0 self._worker_task: asyncio.Task | None = None # Event: set = allowed to run; clear = paused self._resume_event: asyncio.Event = asyncio.Event() self._resume_event.set() self._running: bool = False self.current_job: Job | None = None # ── Public API ──────────────────────────────────────────────────────────── async def enqueue(self, request: object, priority: Priority = Priority.NORMAL) -> Job: self._seq += 1 job = Job( id=str(uuid.uuid4()), priority=priority, seq=self._seq, request=request, future=asyncio.get_event_loop().create_future(), ) self._jobs[job.id] = job await self._queue.put((int(priority), self._seq, job)) return job def get_job(self, job_id: str) -> Job | None: return self._jobs.get(job_id) def cancel_job(self, job_id: str) -> bool: """Cancel a pending job. Returns False if not found or already started.""" job = self._jobs.get(job_id) if job and job.status == "pending": job.status = "cancelled" if not job.future.done(): job.future.cancel() return True return False def start(self) -> None: """Start the worker. No-op if already running.""" if not self._running or (self._worker_task and self._worker_task.done()): self._resume_event.set() self._running = True self._worker_task = asyncio.create_task(self._worker_loop()) def pause(self) -> None: """Pause after the current job finishes. Does not cancel in-progress work.""" self._resume_event.clear() def resume(self) -> None: """Resume from a paused state.""" self._resume_event.set() def stop(self) -> None: """Stop the worker. Pending jobs remain in the queue; start() will resume them.""" self._running = False self._resume_event.set() # unblock the wait so the loop can exit if self._worker_task and not self._worker_task.done(): self._worker_task.cancel() @property def is_paused(self) -> bool: return not self._resume_event.is_set() @property def queue_size(self) -> int: return self._queue.qsize() # ── Internal ────────────────────────────────────────────────────────────── async def _worker_loop(self) -> None: while self._running: # Block here while paused await self._resume_event.wait() try: _, _, job = await asyncio.wait_for(self._queue.get(), timeout=1.0) except asyncio.TimeoutError: continue except asyncio.CancelledError: break if job.status == "cancelled": self._queue.task_done() continue try: await self._process(job) finally: self._queue.task_done() async def _process(self, job: Job) -> None: # Deferred import — avoids circular dependency with chat router from app.routers.chat import execute_chat # noqa: PLC0415 job.status = "processing" job.started_at = datetime.now(timezone.utc) self.current_job = job try: result = await execute_chat(job.request) job.status = "done" job.result = result if not job.future.done(): job.future.set_result(result) except Exception as exc: job.status = "failed" job.error = str(exc) if not job.future.done(): job.future.set_exception(exc) finally: job.finished_at = datetime.now(timezone.utc) self.current_job = None # Singleton used throughout the app queue_service = QueueService()