import asyncio import json import math import uuid from datetime import datetime, timezone import aiofiles import pdfplumber from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query, UploadFile from fastapi.responses import StreamingResponse from sqlalchemy import func, or_, select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from app.database import AsyncSessionLocal, get_db from app.deps import get_user_id from app.models.category import DocumentCategory from app.models.category_assignment import CategoryAssignment from app.models.document import Document from app.schemas.document import DocumentOut, DocumentPage, DocumentStatusOut, DocumentTypeUpdate, TagsUpdate, TitleUpdate from app.services.ai_client import AIServiceError, classify_document from app.services.config_reader import load_doc_config from app.services.storage import delete_file, get_upload_path, save_upload router = APIRouter() _DEFAULT_MAX_BYTES = 20 * 1024 * 1024 # ── Helpers ─────────────────────────────────────────────────────────────────── async def _get_user_doc(doc_id: str, user_id: str, db: AsyncSession) -> Document: result = await db.execute( select(Document) .where(Document.id == doc_id, Document.user_id == user_id) .options( selectinload(Document.category_assignments) .selectinload(CategoryAssignment.category) ) ) doc = result.scalar_one_or_none() if doc is None: raise HTTPException(status_code=404, detail="Document not found") return doc def _doc_with_categories(doc: Document) -> DocumentOut: from app.schemas.document import CategoryOut cats = [CategoryOut(id=a.category.id, name=a.category.name) for a in doc.category_assignments] return DocumentOut( id=doc.id, user_id=doc.user_id, filename=doc.filename, title=doc.title, file_size=doc.file_size, status=doc.status, document_type=doc.document_type, extracted_data=doc.extracted_data, tags=doc.tags, error_message=doc.error_message, created_at=doc.created_at, processed_at=doc.processed_at, categories=cats, ) def _extract_pdf_text(file_path: str) -> str: """Synchronous — must be called via asyncio.to_thread.""" text_parts = [] with pdfplumber.open(file_path) as pdf: for page in pdf.pages: page_text = page.extract_text() if page_text: text_parts.append(page_text) return "\n".join(text_parts) # ── Background processing ───────────────────────────────────────────────────── async def process_document(doc_id: str) -> None: """ Runs after the upload response is sent. Opens its own DB session — never use the request's Depends session here. Loads AI config fresh from the config file so settings changes apply without restart. """ async with AsyncSessionLocal() as db: doc = await db.get(Document, doc_id) if doc is None: return doc.status = "processing" await db.commit() try: text = await asyncio.to_thread(_extract_pdf_text, doc.file_path) result = await classify_document(text) doc.raw_text = text[:500_000] # cap stored text at 500k chars doc.extracted_data = json.dumps(result) doc.title = result.get("title") or None doc.document_type = result.get("document_type", "unknown") doc.tags = json.dumps(result.get("tags", [])) doc.status = "done" doc.processed_at = datetime.now(timezone.utc) except Exception as exc: doc.status = "failed" doc.error_message = str(exc)[:500] await db.commit() # ── Routes ──────────────────────────────────────────────────────────────────── @router.post("/upload", response_model=DocumentOut, status_code=202) async def upload_document( file: UploadFile, background_tasks: BackgroundTasks, user_id: str = Depends(get_user_id), db: AsyncSession = Depends(get_db), ) -> DocumentOut: if file.content_type not in ("application/pdf", "application/octet-stream"): if not (file.filename or "").lower().endswith(".pdf"): raise HTTPException(status_code=415, detail="Only PDF files are accepted") config = await load_doc_config() max_bytes = config.get("documents", {}).get("max_pdf_bytes", _DEFAULT_MAX_BYTES) file_data = await file.read() if len(file_data) > max_bytes: raise HTTPException( status_code=413, detail=f"File exceeds maximum size of {max_bytes // (1024*1024)} MB", ) doc_id = str(uuid.uuid4()) dest = await save_upload(file_data, user_id, doc_id) doc = Document( id=doc_id, user_id=user_id, filename=file.filename or "upload.pdf", file_path=str(dest), file_size=len(file_data), status="pending", ) db.add(doc) await db.commit() background_tasks.add_task(process_document, doc_id) # Re-query with selectinload so category_assignments is eagerly loaded. # A new doc has no categories yet, but we need the relationship populated # to avoid MissingGreenlet in the async session. doc = await _get_user_doc(doc_id, user_id, db) return _doc_with_categories(doc) _SORT_COLUMNS = { "created_at": Document.created_at, "processed_at": Document.processed_at, "filename": Document.filename, "title": Document.title, "file_size": Document.file_size, "status": Document.status, "document_type": Document.document_type, } @router.get("", response_model=DocumentPage) async def list_documents( page: int = Query(default=1, ge=1), per_page: int = Query(default=20, ge=1, le=100), sort: str = Query(default="created_at"), order: str = Query(default="desc", pattern="^(asc|desc)$"), status: str | None = Query(default=None), document_type: str | None = Query(default=None), search: str | None = Query(default=None), category_id: str | None = Query(default=None), user_id: str = Depends(get_user_id), db: AsyncSession = Depends(get_db), ) -> DocumentPage: sort_col = _SORT_COLUMNS.get(sort, Document.created_at) sort_expr = sort_col.desc() if order == "desc" else sort_col.asc() # Build filter conditions once and reuse for both count + items queries. conditions = [Document.user_id == user_id] if status: conditions.append(Document.status == status) if document_type: conditions.append(Document.document_type == document_type) if search: like = f"%{search}%" conditions.append( or_( Document.title.ilike(like), Document.filename.ilike(like), Document.tags.ilike(like), Document.document_type.ilike(like), ) ) if category_id: subq = select(CategoryAssignment.document_id).where( CategoryAssignment.category_id == category_id ) conditions.append(Document.id.in_(subq)) count_result = await db.execute( select(func.count(Document.id)).where(*conditions) ) total = count_result.scalar_one() items_result = await db.execute( select(Document) .where(*conditions) .options( selectinload(Document.category_assignments) .selectinload(CategoryAssignment.category) ) .order_by(sort_expr) .offset((page - 1) * per_page) .limit(per_page) ) items = [_doc_with_categories(d) for d in items_result.scalars().all()] return DocumentPage( items=items, total=total, page=page, pages=max(1, math.ceil(total / per_page)), ) @router.get("/{doc_id}", response_model=DocumentOut) async def get_document( doc_id: str, user_id: str = Depends(get_user_id), db: AsyncSession = Depends(get_db), ) -> DocumentOut: doc = await _get_user_doc(doc_id, user_id, db) return _doc_with_categories(doc) @router.get("/{doc_id}/status", response_model=DocumentStatusOut) async def get_document_status( doc_id: str, user_id: str = Depends(get_user_id), db: AsyncSession = Depends(get_db), ) -> Document: result = await db.execute( select(Document).where(Document.id == doc_id, Document.user_id == user_id) ) doc = result.scalar_one_or_none() if doc is None: raise HTTPException(status_code=404, detail="Document not found") return doc @router.patch("/{doc_id}/type", response_model=DocumentOut) async def update_document_type( doc_id: str, body: DocumentTypeUpdate, user_id: str = Depends(get_user_id), db: AsyncSession = Depends(get_db), ) -> DocumentOut: doc = await _get_user_doc(doc_id, user_id, db) doc.document_type = body.document_type await db.commit() await db.refresh(doc) return _doc_with_categories(doc) @router.patch("/{doc_id}/tags", response_model=DocumentOut) async def update_document_tags( doc_id: str, body: TagsUpdate, user_id: str = Depends(get_user_id), db: AsyncSession = Depends(get_db), ) -> DocumentOut: doc = await _get_user_doc(doc_id, user_id, db) # Normalise: strip whitespace, drop empties, deduplicate while preserving order seen: set[str] = set() clean: list[str] = [] for t in body.tags: t = t.strip() if t and t.lower() not in seen: seen.add(t.lower()) clean.append(t) doc.tags = json.dumps(clean) await db.commit() doc = await _get_user_doc(doc_id, user_id, db) return _doc_with_categories(doc) @router.patch("/{doc_id}/title", response_model=DocumentOut) async def update_document_title( doc_id: str, body: TitleUpdate, user_id: str = Depends(get_user_id), db: AsyncSession = Depends(get_db), ) -> DocumentOut: doc = await _get_user_doc(doc_id, user_id, db) doc.title = body.title.strip() or None await db.commit() doc = await _get_user_doc(doc_id, user_id, db) return _doc_with_categories(doc) @router.delete("/{doc_id}", status_code=204) async def delete_document( doc_id: str, user_id: str = Depends(get_user_id), db: AsyncSession = Depends(get_db), ) -> None: result = await db.execute( select(Document).where(Document.id == doc_id, Document.user_id == user_id) ) doc = result.scalar_one_or_none() if doc is None: raise HTTPException(status_code=404, detail="Document not found") delete_file(doc.file_path) await db.delete(doc) await db.commit() @router.get("/{doc_id}/file") async def download_file( doc_id: str, user_id: str = Depends(get_user_id), db: AsyncSession = Depends(get_db), ) -> StreamingResponse: result = await db.execute( select(Document).where(Document.id == doc_id, Document.user_id == user_id) ) doc = result.scalar_one_or_none() if doc is None: raise HTTPException(status_code=404, detail="Document not found") async def file_generator(): async with aiofiles.open(doc.file_path, "rb") as f: while chunk := await f.read(64 * 1024): yield chunk return StreamingResponse( file_generator(), media_type="application/pdf", headers={"Content-Disposition": f'inline; filename="{doc.filename}"'}, ) # ── Category assignment ─────────────────────────────────────────────────────── @router.post("/{doc_id}/categories/{cat_id}", status_code=204) async def assign_category( doc_id: str, cat_id: str, user_id: str = Depends(get_user_id), db: AsyncSession = Depends(get_db), ) -> None: # Verify both belong to this user doc_result = await db.execute( select(Document).where(Document.id == doc_id, Document.user_id == user_id) ) if doc_result.scalar_one_or_none() is None: raise HTTPException(status_code=404, detail="Document not found") cat_result = await db.execute( select(DocumentCategory).where( DocumentCategory.id == cat_id, DocumentCategory.user_id == user_id ) ) if cat_result.scalar_one_or_none() is None: raise HTTPException(status_code=404, detail="Category not found") # Upsert — ignore if already assigned existing = await db.execute( select(CategoryAssignment).where( CategoryAssignment.document_id == doc_id, CategoryAssignment.category_id == cat_id, ) ) if existing.scalar_one_or_none() is None: db.add(CategoryAssignment(document_id=doc_id, category_id=cat_id)) await db.commit() @router.delete("/{doc_id}/categories/{cat_id}", status_code=204) async def remove_category( doc_id: str, cat_id: str, user_id: str = Depends(get_user_id), db: AsyncSession = Depends(get_db), ) -> None: result = await db.execute( select(CategoryAssignment).where( CategoryAssignment.document_id == doc_id, CategoryAssignment.category_id == cat_id, ) ) assignment = result.scalar_one_or_none() if assignment: await db.delete(assignment) await db.commit()