Add priority queue to ai-service and STATUS.md workflow
- Introduce async priority queue service in ai-service; all /chat calls now route through it - Refactor chat router to separate execute_chat (core logic) from the HTTP handler - Add /queue endpoints (status, pause, resume, cancel) for queue management - Update ai-service config to use Pydantic v2 model_config style - Add STATUS.md files for backend, ai-service, doc-service, and frontend - Document STATUS.md workflow in CLAUDE.md - Update doc-service documents router and schemas; frontend DocumentsPage and API client Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -5,8 +5,7 @@ class Settings(BaseSettings):
|
||||
PROJECT_NAME: str = "ai-service"
|
||||
CONFIG_PATH: str = "/config/ai_service_config.json"
|
||||
|
||||
class Config:
|
||||
env_file = ".env"
|
||||
model_config = {"env_file": ".env", "extra": "ignore"}
|
||||
|
||||
|
||||
settings = Settings()
|
||||
|
||||
@@ -5,7 +5,9 @@ from fastapi import FastAPI
|
||||
|
||||
from app.core.config import settings
|
||||
from app.routers import chat, health
|
||||
from app.routers import queue as queue_router
|
||||
from app.services.config_reader import load_ai_config
|
||||
from app.services.queue import queue_service
|
||||
|
||||
logger = logging.getLogger("ai-service")
|
||||
|
||||
@@ -16,10 +18,18 @@ async def lifespan(app: FastAPI):
|
||||
provider = config.get("provider", "lmstudio")
|
||||
model = config.get(provider, {}).get("model", "unknown")
|
||||
logger.info("[ai-service] active provider: %s model: %s", provider, model)
|
||||
|
||||
queue_service.start()
|
||||
logger.info("[ai-service] queue worker started")
|
||||
|
||||
yield
|
||||
|
||||
queue_service.stop()
|
||||
logger.info("[ai-service] queue worker stopped")
|
||||
|
||||
|
||||
app = FastAPI(title=settings.PROJECT_NAME, lifespan=lifespan)
|
||||
|
||||
app.include_router(chat.router, tags=["chat"])
|
||||
app.include_router(health.router, tags=["health"])
|
||||
app.include_router(queue_router.router)
|
||||
|
||||
@@ -1,3 +1,10 @@
|
||||
"""
|
||||
POST /chat — synchronous chat endpoint.
|
||||
|
||||
All requests are submitted to the priority queue at NORMAL priority and the caller
|
||||
waits for the result. This keeps the contract identical to the original endpoint
|
||||
while ensuring all AI traffic flows through one ordered queue.
|
||||
"""
|
||||
import asyncio
|
||||
import re
|
||||
|
||||
@@ -21,8 +28,11 @@ def _strip_fences(text: str) -> str:
|
||||
return m.group(1).strip() if m else text.strip()
|
||||
|
||||
|
||||
@router.post("/chat", response_model=ChatResponse)
|
||||
async def chat(request: ChatRequest) -> ChatResponse:
|
||||
async def execute_chat(request: ChatRequest) -> ChatResponse:
|
||||
"""
|
||||
Core provider call — invoked by the queue worker.
|
||||
Raises HTTPException on provider errors so the queue worker stores the message.
|
||||
"""
|
||||
config = await load_ai_config()
|
||||
|
||||
provider_name = config.get("provider", "lmstudio")
|
||||
@@ -36,7 +46,6 @@ async def chat(request: ChatRequest) -> ChatResponse:
|
||||
|
||||
timeout = config.get("timeout_seconds", 60)
|
||||
max_retries = config.get("max_retries", 2)
|
||||
last_exc: Exception | None = None
|
||||
|
||||
for attempt in range(max_retries + 1):
|
||||
try:
|
||||
@@ -46,11 +55,8 @@ async def chat(request: ChatRequest) -> ChatResponse:
|
||||
)
|
||||
break
|
||||
except asyncio.TimeoutError as exc:
|
||||
last_exc = exc
|
||||
# Don't retry on timeout — the model is busy; fail fast
|
||||
raise HTTPException(status_code=504, detail="AI provider timed out") from exc
|
||||
except (AnthropicConnError, OpenAIConnError) as exc:
|
||||
last_exc = exc
|
||||
if attempt < max_retries:
|
||||
await asyncio.sleep(0.5 * (attempt + 1))
|
||||
continue
|
||||
@@ -68,3 +74,28 @@ async def chat(request: ChatRequest) -> ChatResponse:
|
||||
input_tokens=input_tokens,
|
||||
output_tokens=output_tokens,
|
||||
)
|
||||
|
||||
|
||||
@router.post("/chat", response_model=ChatResponse)
|
||||
async def chat(request: ChatRequest) -> ChatResponse:
|
||||
"""
|
||||
Submit at NORMAL priority and block until the queue processes the job.
|
||||
If the queue is paused or stopped, the call blocks until resumed (or times out).
|
||||
"""
|
||||
from app.services.queue import Priority, queue_service # deferred — avoids circular import
|
||||
|
||||
job = await queue_service.enqueue(request, Priority.NORMAL)
|
||||
config = await load_ai_config()
|
||||
timeout = float(config.get("timeout_seconds", 60)) + 5.0 # +5s buffer over provider timeout
|
||||
|
||||
try:
|
||||
return await asyncio.wait_for(asyncio.shield(job.future), timeout=timeout)
|
||||
except asyncio.TimeoutError:
|
||||
queue_service.cancel_job(job.id)
|
||||
raise HTTPException(status_code=504, detail="Timed out waiting for queue to process job")
|
||||
except asyncio.CancelledError:
|
||||
raise HTTPException(status_code=503, detail="Job was cancelled")
|
||||
except Exception as exc:
|
||||
if isinstance(exc, HTTPException):
|
||||
raise
|
||||
raise HTTPException(status_code=502, detail=str(exc)) from exc
|
||||
|
||||
@@ -0,0 +1,104 @@
|
||||
"""
|
||||
Queue management router.
|
||||
|
||||
POST /queue/jobs — enqueue a job, return immediately with job metadata
|
||||
GET /queue/jobs/{id} — poll job status / result
|
||||
DELETE /queue/jobs/{id} — cancel a pending job
|
||||
|
||||
GET /queue/status — worker state + queue depth
|
||||
POST /queue/pause — finish current job, stop picking new ones
|
||||
POST /queue/resume — resume from pause
|
||||
POST /queue/start — start (or restart) the worker
|
||||
POST /queue/stop — stop worker immediately (pending jobs stay queued)
|
||||
"""
|
||||
from fastapi import APIRouter, HTTPException
|
||||
|
||||
from app.schemas.queue import JobStatus, QueueRequest, QueueStatus
|
||||
from app.services.queue import PRIORITY_MAP, Job, Priority, queue_service
|
||||
|
||||
router = APIRouter(prefix="/queue", tags=["queue"])
|
||||
|
||||
|
||||
# ── Job endpoints ─────────────────────────────────────────────────────────────
|
||||
|
||||
@router.post("/jobs", response_model=JobStatus, status_code=202)
|
||||
async def enqueue_job(request: QueueRequest) -> JobStatus:
|
||||
priority = PRIORITY_MAP[request.priority]
|
||||
job = await queue_service.enqueue(request, priority)
|
||||
return _job_to_status(job)
|
||||
|
||||
|
||||
@router.get("/jobs/{job_id}", response_model=JobStatus)
|
||||
async def get_job(job_id: str) -> JobStatus:
|
||||
job = queue_service.get_job(job_id)
|
||||
if not job:
|
||||
raise HTTPException(status_code=404, detail="Job not found")
|
||||
return _job_to_status(job)
|
||||
|
||||
|
||||
@router.delete("/jobs/{job_id}", status_code=204)
|
||||
async def cancel_job(job_id: str) -> None:
|
||||
if not queue_service.cancel_job(job_id):
|
||||
raise HTTPException(status_code=404, detail="Job not found or already started")
|
||||
|
||||
|
||||
# ── Worker control endpoints ──────────────────────────────────────────────────
|
||||
|
||||
@router.get("/status", response_model=QueueStatus)
|
||||
async def get_status() -> QueueStatus:
|
||||
cur = queue_service.current_job
|
||||
return QueueStatus(
|
||||
running=queue_service._running,
|
||||
paused=queue_service.is_paused,
|
||||
queue_size=queue_service.queue_size,
|
||||
current_job_id=cur.id if cur else None,
|
||||
)
|
||||
|
||||
|
||||
@router.post("/pause", status_code=204)
|
||||
async def pause() -> None:
|
||||
"""Pause after the current job finishes."""
|
||||
queue_service.pause()
|
||||
|
||||
|
||||
@router.post("/resume", status_code=204)
|
||||
async def resume() -> None:
|
||||
"""Resume from a paused state."""
|
||||
queue_service.resume()
|
||||
|
||||
|
||||
@router.post("/start", status_code=204)
|
||||
async def start() -> None:
|
||||
"""Start (or restart) the worker task."""
|
||||
queue_service.start()
|
||||
|
||||
|
||||
@router.post("/stop", status_code=204)
|
||||
async def stop() -> None:
|
||||
"""Stop the worker. Pending jobs remain in queue; POST /queue/start to resume."""
|
||||
queue_service.stop()
|
||||
|
||||
|
||||
# ── Helper ────────────────────────────────────────────────────────────────────
|
||||
|
||||
def _job_to_status(job: Job) -> JobStatus:
|
||||
pos: int | None = None
|
||||
if job.status == "pending":
|
||||
# Count jobs that are ahead: same or higher priority AND earlier seq
|
||||
pos = sum(
|
||||
1
|
||||
for j in queue_service._jobs.values()
|
||||
if j.status == "pending"
|
||||
and (int(j.priority), j.seq) < (int(job.priority), job.seq)
|
||||
)
|
||||
return JobStatus(
|
||||
id=job.id,
|
||||
status=job.status,
|
||||
priority=Priority(job.priority).name.lower(),
|
||||
position=pos,
|
||||
created_at=job.created_at,
|
||||
started_at=job.started_at,
|
||||
finished_at=job.finished_at,
|
||||
result=job.result,
|
||||
error=job.error,
|
||||
)
|
||||
@@ -0,0 +1,40 @@
|
||||
from datetime import datetime
|
||||
from typing import Literal
|
||||
|
||||
from pydantic import BaseModel, field_validator
|
||||
|
||||
from app.schemas.chat import ChatMessage, ChatResponse
|
||||
|
||||
|
||||
class QueueRequest(BaseModel):
|
||||
messages: list[ChatMessage]
|
||||
max_tokens: int = 2048
|
||||
temperature: float = 0.0
|
||||
response_format: Literal["json", "text"] = "text"
|
||||
priority: Literal["high", "normal", "low"] = "normal"
|
||||
|
||||
@field_validator("messages")
|
||||
@classmethod
|
||||
def messages_not_empty(cls, v: list) -> list:
|
||||
if not v:
|
||||
raise ValueError("messages must not be empty")
|
||||
return v
|
||||
|
||||
|
||||
class JobStatus(BaseModel):
|
||||
id: str
|
||||
status: str
|
||||
priority: str
|
||||
position: int | None = None # number of jobs ahead; None when not pending
|
||||
created_at: datetime
|
||||
started_at: datetime | None = None
|
||||
finished_at: datetime | None = None
|
||||
result: ChatResponse | None = None
|
||||
error: str | None = None
|
||||
|
||||
|
||||
class QueueStatus(BaseModel):
|
||||
running: bool
|
||||
paused: bool
|
||||
queue_size: int
|
||||
current_job_id: str | None = None
|
||||
@@ -23,7 +23,7 @@ _DEFAULT_CONFIG: dict = {
|
||||
"max_retries": 2,
|
||||
"anthropic": {"api_key": "", "model": "claude-haiku-4-5-20251001"},
|
||||
"ollama": {"base_url": "http://host.docker.internal:11434/v1", "model": "llama3.2", "api_key": "ollama"},
|
||||
"lmstudio": {"base_url": "http://host.docker.internal:1234/v1", "model": "local-model", "api_key": "lm-studio"},
|
||||
"lmstudio": {"base_url": "http://host.docker.internal:1234/v1", "model": "gemma-4-e4b-it", "api_key": "lm-studio"},
|
||||
}
|
||||
|
||||
_cache: dict | None = None
|
||||
|
||||
@@ -0,0 +1,169 @@
|
||||
"""
|
||||
In-memory priority queue for AI requests.
|
||||
|
||||
Jobs are ordered by (priority, sequence_number) so HIGH=1 jobs always run before
|
||||
NORMAL=3 and LOW=5 regardless of arrival order. Within the same priority level
|
||||
insertion order (FIFO) is preserved via the monotonically incrementing seq counter.
|
||||
|
||||
The QueueService runs a single async worker task. It can be paused (current job
|
||||
finishes, no new jobs start), resumed, started, or stopped from outside.
|
||||
|
||||
Module-level singleton `queue_service` is imported by routers and the app lifespan.
|
||||
"""
|
||||
import asyncio
|
||||
import uuid
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from enum import IntEnum
|
||||
|
||||
|
||||
class Priority(IntEnum):
|
||||
HIGH = 1
|
||||
NORMAL = 3
|
||||
LOW = 5
|
||||
|
||||
|
||||
PRIORITY_MAP: dict[str, Priority] = {
|
||||
"high": Priority.HIGH,
|
||||
"normal": Priority.NORMAL,
|
||||
"low": Priority.LOW,
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class Job:
|
||||
id: str
|
||||
priority: Priority
|
||||
seq: int
|
||||
request: object # ChatRequest — typed as object to avoid circular import
|
||||
future: asyncio.Future
|
||||
status: str = "pending" # pending | processing | done | failed | cancelled
|
||||
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
|
||||
started_at: datetime | None = None
|
||||
finished_at: datetime | None = None
|
||||
result: object = None
|
||||
error: str | None = None
|
||||
|
||||
def __lt__(self, other: "Job") -> bool:
|
||||
# asyncio.PriorityQueue requires items to be orderable
|
||||
return (self.priority, self.seq) < (other.priority, other.seq)
|
||||
|
||||
|
||||
class QueueService:
|
||||
def __init__(self) -> None:
|
||||
self._queue: asyncio.PriorityQueue = asyncio.PriorityQueue()
|
||||
self._jobs: dict[str, Job] = {}
|
||||
self._seq: int = 0
|
||||
self._worker_task: asyncio.Task | None = None
|
||||
# Event: set = allowed to run; clear = paused
|
||||
self._resume_event: asyncio.Event = asyncio.Event()
|
||||
self._resume_event.set()
|
||||
self._running: bool = False
|
||||
self.current_job: Job | None = None
|
||||
|
||||
# ── Public API ────────────────────────────────────────────────────────────
|
||||
|
||||
async def enqueue(self, request: object, priority: Priority = Priority.NORMAL) -> Job:
|
||||
self._seq += 1
|
||||
job = Job(
|
||||
id=str(uuid.uuid4()),
|
||||
priority=priority,
|
||||
seq=self._seq,
|
||||
request=request,
|
||||
future=asyncio.get_event_loop().create_future(),
|
||||
)
|
||||
self._jobs[job.id] = job
|
||||
await self._queue.put((int(priority), self._seq, job))
|
||||
return job
|
||||
|
||||
def get_job(self, job_id: str) -> Job | None:
|
||||
return self._jobs.get(job_id)
|
||||
|
||||
def cancel_job(self, job_id: str) -> bool:
|
||||
"""Cancel a pending job. Returns False if not found or already started."""
|
||||
job = self._jobs.get(job_id)
|
||||
if job and job.status == "pending":
|
||||
job.status = "cancelled"
|
||||
if not job.future.done():
|
||||
job.future.cancel()
|
||||
return True
|
||||
return False
|
||||
|
||||
def start(self) -> None:
|
||||
"""Start the worker. No-op if already running."""
|
||||
if not self._running or (self._worker_task and self._worker_task.done()):
|
||||
self._resume_event.set()
|
||||
self._running = True
|
||||
self._worker_task = asyncio.create_task(self._worker_loop())
|
||||
|
||||
def pause(self) -> None:
|
||||
"""Pause after the current job finishes. Does not cancel in-progress work."""
|
||||
self._resume_event.clear()
|
||||
|
||||
def resume(self) -> None:
|
||||
"""Resume from a paused state."""
|
||||
self._resume_event.set()
|
||||
|
||||
def stop(self) -> None:
|
||||
"""Stop the worker. Pending jobs remain in the queue; start() will resume them."""
|
||||
self._running = False
|
||||
self._resume_event.set() # unblock the wait so the loop can exit
|
||||
if self._worker_task and not self._worker_task.done():
|
||||
self._worker_task.cancel()
|
||||
|
||||
@property
|
||||
def is_paused(self) -> bool:
|
||||
return not self._resume_event.is_set()
|
||||
|
||||
@property
|
||||
def queue_size(self) -> int:
|
||||
return self._queue.qsize()
|
||||
|
||||
# ── Internal ──────────────────────────────────────────────────────────────
|
||||
|
||||
async def _worker_loop(self) -> None:
|
||||
while self._running:
|
||||
# Block here while paused
|
||||
await self._resume_event.wait()
|
||||
|
||||
try:
|
||||
_, _, job = await asyncio.wait_for(self._queue.get(), timeout=1.0)
|
||||
except asyncio.TimeoutError:
|
||||
continue
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
|
||||
if job.status == "cancelled":
|
||||
self._queue.task_done()
|
||||
continue
|
||||
|
||||
try:
|
||||
await self._process(job)
|
||||
finally:
|
||||
self._queue.task_done()
|
||||
|
||||
async def _process(self, job: Job) -> None:
|
||||
# Deferred import — avoids circular dependency with chat router
|
||||
from app.routers.chat import execute_chat # noqa: PLC0415
|
||||
|
||||
job.status = "processing"
|
||||
job.started_at = datetime.now(timezone.utc)
|
||||
self.current_job = job
|
||||
try:
|
||||
result = await execute_chat(job.request)
|
||||
job.status = "done"
|
||||
job.result = result
|
||||
if not job.future.done():
|
||||
job.future.set_result(result)
|
||||
except Exception as exc:
|
||||
job.status = "failed"
|
||||
job.error = str(exc)
|
||||
if not job.future.done():
|
||||
job.future.set_exception(exc)
|
||||
finally:
|
||||
job.finished_at = datetime.now(timezone.utc)
|
||||
self.current_job = None
|
||||
|
||||
|
||||
# Singleton used throughout the app
|
||||
queue_service = QueueService()
|
||||
Reference in New Issue
Block a user