""" POST /chat — synchronous chat endpoint. All requests are submitted to the priority queue at NORMAL priority and the caller waits for the result. This keeps the contract identical to the original endpoint while ensuring all AI traffic flows through one ordered queue. """ import asyncio import re from fastapi import APIRouter, HTTPException from app.providers import get_provider from app.providers.anthropic_provider import ProviderConnectionError as AnthropicConnError from app.providers.anthropic_provider import ProviderTimeoutError as AnthropicTimeoutError from app.providers.openai_compat import ProviderConnectionError as OpenAIConnError from app.providers.openai_compat import ProviderTimeoutError as OpenAITimeoutError from app.schemas.chat import ChatRequest, ChatResponse from app.services.config_reader import load_ai_config router = APIRouter() _FENCE_RE = re.compile(r"^```[a-z]*\n?(.*?)\n?```$", re.DOTALL) def _strip_fences(text: str) -> str: m = _FENCE_RE.match(text.strip()) return m.group(1).strip() if m else text.strip() async def execute_chat(request: ChatRequest) -> ChatResponse: """ Core provider call — invoked by the queue worker. Raises HTTPException on provider errors so the queue worker stores the message. """ config = await load_ai_config() provider_name = config.get("provider", "lmstudio") if provider_name not in ("anthropic", "ollama", "lmstudio"): raise HTTPException(status_code=503, detail=f"Unknown provider configured: {provider_name!r}") try: provider = get_provider(config) except ValueError as exc: raise HTTPException(status_code=503, detail=str(exc)) timeout = config.get("timeout_seconds", 60) max_retries = config.get("max_retries", 2) for attempt in range(max_retries + 1): try: content, input_tokens, output_tokens = await asyncio.wait_for( provider.chat(request.messages, request.max_tokens, request.temperature), timeout=float(timeout), ) break except asyncio.TimeoutError as exc: raise HTTPException(status_code=504, detail="AI provider timed out") from exc except (AnthropicConnError, OpenAIConnError) as exc: if attempt < max_retries: await asyncio.sleep(0.5 * (attempt + 1)) continue raise HTTPException(status_code=502, detail=f"AI provider error: {exc}") from exc except (AnthropicTimeoutError, OpenAITimeoutError) as exc: raise HTTPException(status_code=504, detail="AI provider timed out") from exc if request.response_format == "json": content = _strip_fences(content) return ChatResponse( content=content, provider=provider.provider_name, model=provider.model_name, input_tokens=input_tokens, output_tokens=output_tokens, ) @router.post("/chat", response_model=ChatResponse) async def chat(request: ChatRequest) -> ChatResponse: """ Submit at NORMAL priority and block until the queue processes the job. If the queue is paused or stopped, the call blocks until resumed (or times out). """ from app.services.queue import Priority, queue_service # deferred — avoids circular import job = await queue_service.enqueue(request, Priority.NORMAL) config = await load_ai_config() timeout = float(config.get("timeout_seconds", 60)) + 5.0 # +5s buffer over provider timeout try: return await asyncio.wait_for(asyncio.shield(job.future), timeout=timeout) except asyncio.TimeoutError: queue_service.cancel_job(job.id) raise HTTPException(status_code=504, detail="Timed out waiting for queue to process job") except asyncio.CancelledError: raise HTTPException(status_code=503, detail="Job was cancelled") except Exception as exc: if isinstance(exc, HTTPException): raise raise HTTPException(status_code=502, detail=str(exc)) from exc