Files
curo1305 c4f0c7ad49 Add priority queue to ai-service and STATUS.md workflow
- Introduce async priority queue service in ai-service; all /chat calls now route through it
- Refactor chat router to separate execute_chat (core logic) from the HTTP handler
- Add /queue endpoints (status, pause, resume, cancel) for queue management
- Update ai-service config to use Pydantic v2 model_config style
- Add STATUS.md files for backend, ai-service, doc-service, and frontend
- Document STATUS.md workflow in CLAUDE.md
- Update doc-service documents router and schemas; frontend DocumentsPage and API client

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 22:58:10 +02:00

105 lines
3.7 KiB
Python

"""
Queue management router.
POST /queue/jobs — enqueue a job, return immediately with job metadata
GET /queue/jobs/{id} — poll job status / result
DELETE /queue/jobs/{id} — cancel a pending job
GET /queue/status — worker state + queue depth
POST /queue/pause — finish current job, stop picking new ones
POST /queue/resume — resume from pause
POST /queue/start — start (or restart) the worker
POST /queue/stop — stop worker immediately (pending jobs stay queued)
"""
from fastapi import APIRouter, HTTPException
from app.schemas.queue import JobStatus, QueueRequest, QueueStatus
from app.services.queue import PRIORITY_MAP, Job, Priority, queue_service
router = APIRouter(prefix="/queue", tags=["queue"])
# ── Job endpoints ─────────────────────────────────────────────────────────────
@router.post("/jobs", response_model=JobStatus, status_code=202)
async def enqueue_job(request: QueueRequest) -> JobStatus:
priority = PRIORITY_MAP[request.priority]
job = await queue_service.enqueue(request, priority)
return _job_to_status(job)
@router.get("/jobs/{job_id}", response_model=JobStatus)
async def get_job(job_id: str) -> JobStatus:
job = queue_service.get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
return _job_to_status(job)
@router.delete("/jobs/{job_id}", status_code=204)
async def cancel_job(job_id: str) -> None:
if not queue_service.cancel_job(job_id):
raise HTTPException(status_code=404, detail="Job not found or already started")
# ── Worker control endpoints ──────────────────────────────────────────────────
@router.get("/status", response_model=QueueStatus)
async def get_status() -> QueueStatus:
cur = queue_service.current_job
return QueueStatus(
running=queue_service._running,
paused=queue_service.is_paused,
queue_size=queue_service.queue_size,
current_job_id=cur.id if cur else None,
)
@router.post("/pause", status_code=204)
async def pause() -> None:
"""Pause after the current job finishes."""
queue_service.pause()
@router.post("/resume", status_code=204)
async def resume() -> None:
"""Resume from a paused state."""
queue_service.resume()
@router.post("/start", status_code=204)
async def start() -> None:
"""Start (or restart) the worker task."""
queue_service.start()
@router.post("/stop", status_code=204)
async def stop() -> None:
"""Stop the worker. Pending jobs remain in queue; POST /queue/start to resume."""
queue_service.stop()
# ── Helper ────────────────────────────────────────────────────────────────────
def _job_to_status(job: Job) -> JobStatus:
pos: int | None = None
if job.status == "pending":
# Count jobs that are ahead: same or higher priority AND earlier seq
pos = sum(
1
for j in queue_service._jobs.values()
if j.status == "pending"
and (int(j.priority), j.seq) < (int(job.priority), job.seq)
)
return JobStatus(
id=job.id,
status=job.status,
priority=Priority(job.priority).name.lower(),
position=pos,
created_at=job.created_at,
started_at=job.started_at,
finished_at=job.finished_at,
result=job.result,
error=job.error,
)