from datetime import datetime, timezone from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, UploadFile from sqlalchemy.ext.asyncio import AsyncSession from deps.db import get_db from services import classifier, extractor, storage from tasks.document_tasks import extract_and_classify router = APIRouter(prefix="/api/documents", tags=["documents"]) ALLOWED_MIME_TYPES = { "application/pdf", "application/vnd.openxmlformats-officedocument.wordprocessingml.document", "application/msword", "text/plain", "text/markdown", "image/png", "image/jpeg", "image/jpg", "image/tiff", "image/webp", } @router.post("/upload") async def upload_document( file: UploadFile = File(...), auto_classify: bool = Form(True), session: AsyncSession = Depends(get_db), ): content = await file.read() if len(content) == 0: raise HTTPException(400, "Empty file") mime = file.content_type or "application/octet-stream" saved = await storage.save_upload(session, content, file.filename or "upload", mime) # Extract text from the in-memory bytes (avoid a second MinIO round-trip at upload time) text = extractor.extract_text_from_bytes(content, mime) now = datetime.now(timezone.utc).isoformat() meta = { "id": saved["id"], "original_name": file.filename or "upload", "filename": saved["filename"], "mime_type": mime, "size_bytes": len(content), "extracted_text": text, # Phase 1 cutover: topics are empty at upload time; the Celery worker # fills them in asynchronously after extract_and_classify completes. "topics": [], "created_at": now, "classified_at": None, } await storage.save_metadata(session, meta) if auto_classify: # Queue the extract+classify task on the Celery documents queue (STORE-08). # The task re-fetches bytes from MinIO, extracts text, and classifies. # The upload response returns topics=[] — polling GET /api/documents/{id} # will show the populated topics once the worker completes. extract_and_classify.delay(str(saved["id"])) return meta @router.get("") async def list_documents( topic: str | None = Query(None), page: int = Query(1, ge=1), per_page: int = Query(20, ge=1, le=100), session: AsyncSession = Depends(get_db), ): docs = await storage.list_metadata(session, topic=topic) total = len(docs) start = (page - 1) * per_page return {"items": docs[start : start + per_page], "total": total, "page": page, "per_page": per_page} @router.get("/{doc_id}") async def get_document(doc_id: str, session: AsyncSession = Depends(get_db)): meta = await storage.get_metadata(session, doc_id) if meta is None: raise HTTPException(404, "Document not found") return meta @router.delete("/{doc_id}") async def delete_document(doc_id: str, session: AsyncSession = Depends(get_db)): ok = await storage.delete_document(session, doc_id) if not ok: raise HTTPException(404, "Document not found") return {"success": True} @router.post("/{doc_id}/classify") async def classify_document( doc_id: str, body: dict = {}, session: AsyncSession = Depends(get_db), ): meta = await storage.get_metadata(session, doc_id) if meta is None: raise HTTPException(404, "Document not found") topic_names = body.get("topics") if body else None try: # The /classify endpoint calls classifier synchronously and returns the # topic list immediately — this preserves the historical behavior. # The upload-time path uses Celery .delay() instead (Phase 1 cutover). topics = await classifier.classify_document(session, doc_id, topic_names) except Exception as e: raise HTTPException(500, f"Classification failed: {e}") return {"topics": topics}