Files
curo1305 c4f0c7ad49 Add priority queue to ai-service and STATUS.md workflow
- Introduce async priority queue service in ai-service; all /chat calls now route through it
- Refactor chat router to separate execute_chat (core logic) from the HTTP handler
- Add /queue endpoints (status, pause, resume, cancel) for queue management
- Update ai-service config to use Pydantic v2 model_config style
- Add STATUS.md files for backend, ai-service, doc-service, and frontend
- Document STATUS.md workflow in CLAUDE.md
- Update doc-service documents router and schemas; frontend DocumentsPage and API client

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 22:58:10 +02:00

102 lines
3.9 KiB
Python

"""
POST /chat — synchronous chat endpoint.
All requests are submitted to the priority queue at NORMAL priority and the caller
waits for the result. This keeps the contract identical to the original endpoint
while ensuring all AI traffic flows through one ordered queue.
"""
import asyncio
import re
from fastapi import APIRouter, HTTPException
from app.providers import get_provider
from app.providers.anthropic_provider import ProviderConnectionError as AnthropicConnError
from app.providers.anthropic_provider import ProviderTimeoutError as AnthropicTimeoutError
from app.providers.openai_compat import ProviderConnectionError as OpenAIConnError
from app.providers.openai_compat import ProviderTimeoutError as OpenAITimeoutError
from app.schemas.chat import ChatRequest, ChatResponse
from app.services.config_reader import load_ai_config
router = APIRouter()
_FENCE_RE = re.compile(r"^```[a-z]*\n?(.*?)\n?```$", re.DOTALL)
def _strip_fences(text: str) -> str:
m = _FENCE_RE.match(text.strip())
return m.group(1).strip() if m else text.strip()
async def execute_chat(request: ChatRequest) -> ChatResponse:
"""
Core provider call — invoked by the queue worker.
Raises HTTPException on provider errors so the queue worker stores the message.
"""
config = await load_ai_config()
provider_name = config.get("provider", "lmstudio")
if provider_name not in ("anthropic", "ollama", "lmstudio"):
raise HTTPException(status_code=503, detail=f"Unknown provider configured: {provider_name!r}")
try:
provider = get_provider(config)
except ValueError as exc:
raise HTTPException(status_code=503, detail=str(exc))
timeout = config.get("timeout_seconds", 60)
max_retries = config.get("max_retries", 2)
for attempt in range(max_retries + 1):
try:
content, input_tokens, output_tokens = await asyncio.wait_for(
provider.chat(request.messages, request.max_tokens, request.temperature),
timeout=float(timeout),
)
break
except asyncio.TimeoutError as exc:
raise HTTPException(status_code=504, detail="AI provider timed out") from exc
except (AnthropicConnError, OpenAIConnError) as exc:
if attempt < max_retries:
await asyncio.sleep(0.5 * (attempt + 1))
continue
raise HTTPException(status_code=502, detail=f"AI provider error: {exc}") from exc
except (AnthropicTimeoutError, OpenAITimeoutError) as exc:
raise HTTPException(status_code=504, detail="AI provider timed out") from exc
if request.response_format == "json":
content = _strip_fences(content)
return ChatResponse(
content=content,
provider=provider.provider_name,
model=provider.model_name,
input_tokens=input_tokens,
output_tokens=output_tokens,
)
@router.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest) -> ChatResponse:
"""
Submit at NORMAL priority and block until the queue processes the job.
If the queue is paused or stopped, the call blocks until resumed (or times out).
"""
from app.services.queue import Priority, queue_service # deferred — avoids circular import
job = await queue_service.enqueue(request, Priority.NORMAL)
config = await load_ai_config()
timeout = float(config.get("timeout_seconds", 60)) + 5.0 # +5s buffer over provider timeout
try:
return await asyncio.wait_for(asyncio.shield(job.future), timeout=timeout)
except asyncio.TimeoutError:
queue_service.cancel_job(job.id)
raise HTTPException(status_code=504, detail="Timed out waiting for queue to process job")
except asyncio.CancelledError:
raise HTTPException(status_code=503, detail="Job was cancelled")
except Exception as exc:
if isinstance(exc, HTTPException):
raise
raise HTTPException(status_code=502, detail=str(exc)) from exc