Files
curo1305 c4f0c7ad49 Add priority queue to ai-service and STATUS.md workflow
- Introduce async priority queue service in ai-service; all /chat calls now route through it
- Refactor chat router to separate execute_chat (core logic) from the HTTP handler
- Add /queue endpoints (status, pause, resume, cancel) for queue management
- Update ai-service config to use Pydantic v2 model_config style
- Add STATUS.md files for backend, ai-service, doc-service, and frontend
- Document STATUS.md workflow in CLAUDE.md
- Update doc-service documents router and schemas; frontend DocumentsPage and API client

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 22:58:10 +02:00

170 lines
5.7 KiB
Python

"""
In-memory priority queue for AI requests.
Jobs are ordered by (priority, sequence_number) so HIGH=1 jobs always run before
NORMAL=3 and LOW=5 regardless of arrival order. Within the same priority level
insertion order (FIFO) is preserved via the monotonically incrementing seq counter.
The QueueService runs a single async worker task. It can be paused (current job
finishes, no new jobs start), resumed, started, or stopped from outside.
Module-level singleton `queue_service` is imported by routers and the app lifespan.
"""
import asyncio
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import IntEnum
class Priority(IntEnum):
HIGH = 1
NORMAL = 3
LOW = 5
PRIORITY_MAP: dict[str, Priority] = {
"high": Priority.HIGH,
"normal": Priority.NORMAL,
"low": Priority.LOW,
}
@dataclass
class Job:
id: str
priority: Priority
seq: int
request: object # ChatRequest — typed as object to avoid circular import
future: asyncio.Future
status: str = "pending" # pending | processing | done | failed | cancelled
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
started_at: datetime | None = None
finished_at: datetime | None = None
result: object = None
error: str | None = None
def __lt__(self, other: "Job") -> bool:
# asyncio.PriorityQueue requires items to be orderable
return (self.priority, self.seq) < (other.priority, other.seq)
class QueueService:
def __init__(self) -> None:
self._queue: asyncio.PriorityQueue = asyncio.PriorityQueue()
self._jobs: dict[str, Job] = {}
self._seq: int = 0
self._worker_task: asyncio.Task | None = None
# Event: set = allowed to run; clear = paused
self._resume_event: asyncio.Event = asyncio.Event()
self._resume_event.set()
self._running: bool = False
self.current_job: Job | None = None
# ── Public API ────────────────────────────────────────────────────────────
async def enqueue(self, request: object, priority: Priority = Priority.NORMAL) -> Job:
self._seq += 1
job = Job(
id=str(uuid.uuid4()),
priority=priority,
seq=self._seq,
request=request,
future=asyncio.get_event_loop().create_future(),
)
self._jobs[job.id] = job
await self._queue.put((int(priority), self._seq, job))
return job
def get_job(self, job_id: str) -> Job | None:
return self._jobs.get(job_id)
def cancel_job(self, job_id: str) -> bool:
"""Cancel a pending job. Returns False if not found or already started."""
job = self._jobs.get(job_id)
if job and job.status == "pending":
job.status = "cancelled"
if not job.future.done():
job.future.cancel()
return True
return False
def start(self) -> None:
"""Start the worker. No-op if already running."""
if not self._running or (self._worker_task and self._worker_task.done()):
self._resume_event.set()
self._running = True
self._worker_task = asyncio.create_task(self._worker_loop())
def pause(self) -> None:
"""Pause after the current job finishes. Does not cancel in-progress work."""
self._resume_event.clear()
def resume(self) -> None:
"""Resume from a paused state."""
self._resume_event.set()
def stop(self) -> None:
"""Stop the worker. Pending jobs remain in the queue; start() will resume them."""
self._running = False
self._resume_event.set() # unblock the wait so the loop can exit
if self._worker_task and not self._worker_task.done():
self._worker_task.cancel()
@property
def is_paused(self) -> bool:
return not self._resume_event.is_set()
@property
def queue_size(self) -> int:
return self._queue.qsize()
# ── Internal ──────────────────────────────────────────────────────────────
async def _worker_loop(self) -> None:
while self._running:
# Block here while paused
await self._resume_event.wait()
try:
_, _, job = await asyncio.wait_for(self._queue.get(), timeout=1.0)
except asyncio.TimeoutError:
continue
except asyncio.CancelledError:
break
if job.status == "cancelled":
self._queue.task_done()
continue
try:
await self._process(job)
finally:
self._queue.task_done()
async def _process(self, job: Job) -> None:
# Deferred import — avoids circular dependency with chat router
from app.routers.chat import execute_chat # noqa: PLC0415
job.status = "processing"
job.started_at = datetime.now(timezone.utc)
self.current_job = job
try:
result = await execute_chat(job.request)
job.status = "done"
job.result = result
if not job.future.done():
job.future.set_result(result)
except Exception as exc:
job.status = "failed"
job.error = str(exc)
if not job.future.done():
job.future.set_exception(exc)
finally:
job.finished_at = datetime.now(timezone.utc)
self.current_job = None
# Singleton used throughout the app
queue_service = QueueService()