""" Document API endpoints for DocuVault — Phase 3 Wave 2. Implements the presigned PUT upload flow (D-04, D-05): POST /api/documents/upload-url — create pending Document row, return presigned URL POST /api/documents/{id}/confirm — stat MinIO for authoritative size, atomic quota UPDATE Preserved endpoints (auth guards added in Plan 03-03): GET /api/documents — list documents GET /api/documents/{id} — get document metadata DELETE /api/documents/{id} — delete document (decrements quota atomically) POST /api/documents/{id}/classify — reclassify document topics NOTE (Wave 2): No auth guards on any endpoint yet — Plan 03-03 adds get_current_user to all handlers. The doc.user_id=None guard in /confirm is a Wave 2 placeholder. """ from __future__ import annotations import uuid from pathlib import Path from typing import Optional from fastapi import APIRouter, Depends, HTTPException, Query, Request, status from fastapi.responses import StreamingResponse from pydantic import BaseModel from sqlalchemy import select, text, func from sqlalchemy.ext.asyncio import AsyncSession from db.models import Document, Quota, Share, User from deps.auth import get_regular_user from deps.db import get_db from services import classifier, storage from storage import get_storage_backend from tasks.document_tasks import extract_and_classify try: from minio.error import S3Error except ImportError: # Fallback for test environments where minio is not installed S3Error = Exception # type: ignore[assignment,misc] router = APIRouter(prefix="/api/documents", tags=["documents"]) # ── Request models ──────────────────────────────────────────────────────────── class UploadUrlRequest(BaseModel): filename: str content_type: str # ── POST /api/documents/upload-url ─────────────────────────────────────────── @router.post("/upload-url") async def request_upload_url( body: UploadUrlRequest, session: AsyncSession = Depends(get_db), current_user: User = Depends(get_regular_user), ): """Create a pending Document row and return a presigned PUT URL. D-05 step 1: FastAPI creates a Document row (status='pending'), generates a 15-minute presigned PUT URL, returns {upload_url, document_id}. Quota is NOT reserved at this step — quota enforcement happens at /confirm. T-03-04: object_key is computed server-side using str(current_user.id); filename stored in DB only (CLAUDE.md MinIO key schema). T-03-15: object_key prefix is always the authenticated user's id — never user-supplied. """ doc_id = uuid.uuid4() suffix = Path(body.filename).suffix.lower() object_key = f"{current_user.id}/{doc_id}/{uuid.uuid4()}{suffix}" doc = Document( id=doc_id, user_id=current_user.id, filename=body.filename, content_type=body.content_type, size_bytes=0, storage_backend="minio", status="pending", object_key=object_key, ) session.add(doc) await session.commit() upload_url = await get_storage_backend().generate_presigned_put_url( object_key, expires_minutes=15 ) return {"upload_url": upload_url, "document_id": str(doc_id)} # ── POST /api/documents/{doc_id}/confirm ───────────────────────────────────── @router.post("/{doc_id}/confirm") async def confirm_upload( doc_id: str, session: AsyncSession = Depends(get_db), current_user: User = Depends(get_regular_user), ): """Confirm a presigned PUT upload: stat MinIO for size, enforce quota atomically. D-05 step 3: FastAPI reads authoritative file size from MinIO stat_object (never from client), runs atomic quota UPDATE, sets status='uploaded', enqueues Celery task. Quota exceeded: HTTP 413 with {"used_bytes": N, "limit_bytes": M, "rejected_bytes": K} Upload not found: HTTP 422 (presigned URL may have expired) T-03-05: size always comes from backend.stat_object(doc.object_key) — never client. T-03-06: atomic SQL UPDATE prevents concurrent over-quota uploads (STORE-03 SC2). T-03-11: ownership assertion — cross-user access returns 404 (D-16). """ try: uid = uuid.UUID(doc_id) except ValueError: raise HTTPException(status_code=404, detail="Document not found") doc = await session.get(Document, uid) if doc is None or doc.user_id != current_user.id: raise HTTPException(status_code=404, detail="Document not found") # Get authoritative file size from MinIO (T-03-05 — never trust client-supplied size) try: size = await get_storage_backend().stat_object(doc.object_key) except Exception as exc: code = getattr(exc, "code", "") if code == "NoSuchKey": raise HTTPException( status_code=422, detail="Upload not found — presigned URL may have expired", ) raise HTTPException(status_code=502, detail=f"Storage error: {exc}") doc.size_bytes = size await session.flush() # Atomic quota enforcement — user_id is always set post-migration (Plan 03-03+) result = await session.execute( text( "UPDATE quotas " "SET used_bytes = used_bytes + :delta " "WHERE user_id = :uid " " AND (used_bytes + :delta) <= limit_bytes " "RETURNING used_bytes, limit_bytes" ), {"delta": size, "uid": str(doc.user_id)}, ) row = result.fetchone() if row is None: # Quota exceeded — fetch current quota state for the 413 body quota_result = await session.execute( text("SELECT used_bytes, limit_bytes FROM quotas WHERE user_id = :uid"), {"uid": str(doc.user_id)}, ) q = quota_result.fetchone() # Delete the pending Document row and best-effort remove the MinIO object await session.delete(doc) try: await get_storage_backend().delete_object(doc.object_key) except Exception: pass # MinIO cleanup is best-effort; object TTL will eventually expire await session.commit() raise HTTPException( status_code=413, detail={ "used_bytes": q.used_bytes if q else 0, "limit_bytes": q.limit_bytes if q else 0, "rejected_bytes": size, }, ) used_bytes = row.used_bytes doc.status = "uploaded" await session.commit() extract_and_classify.delay(str(doc.id)) return { "id": str(doc.id), "size_bytes": size, "used_bytes": used_bytes, "status": "uploaded", } # ── GET /api/documents ──────────────────────────────────────────────────────── @router.get("") async def list_documents( topic: Optional[str] = Query(None), page: int = Query(1, ge=1), per_page: int = Query(20, ge=1, le=100), sort: str = Query("date"), order: str = Query("desc"), folder_id: Optional[str] = Query(None), q: Optional[str] = Query(None), session: AsyncSession = Depends(get_db), current_user: User = Depends(get_regular_user), ): """List documents with optional sort, folder filter, and full-text search. D-16: requires authenticated regular user (get_regular_user rejects admins). Returns only documents belonging to the current user. FOLD-05: sort by name|date|size; order asc|desc; folder_id filter; q full-text search via plainto_tsquery (PostgreSQL only — silently skipped on SQLite when function is unavailable). FTS scope is always scoped to current_user.id (T-04-03-02). Backward-compat: when sort/order/folder_id/q are not provided, behaviour is identical to the pre-Phase-4 implementation. """ # If no new params used, fall through to the legacy storage.list_metadata path # to preserve full backward compatibility with topic filtering. if folder_id is None and q is None and sort == "date" and order == "desc": docs = await storage.list_metadata(session, user_id=current_user.id, topic=topic) total = len(docs) start = (page - 1) * per_page # Add is_shared field (Phase 4 addition) shared_result = await session.execute( select(Share.document_id).where(Share.owner_id == current_user.id) ) shared_ids = {row[0] for row in shared_result.fetchall()} items = [] for d in docs[start : start + per_page]: doc_id_str = d.get("id", "") try: doc_uuid = uuid.UUID(doc_id_str) except (ValueError, AttributeError): doc_uuid = None d["is_shared"] = doc_uuid in shared_ids if doc_uuid else False items.append(d) return {"items": items, "total": total, "page": page, "per_page": per_page} # New path: direct ORM query with sort/filter/FTS from db.models import DocumentTopic, Topic # noqa: PLC0415 (avoid circular at module top) stmt = select(Document).where(Document.user_id == current_user.id) # Topic filter (join-based, same as list_metadata) if topic is not None: stmt = ( stmt.join(DocumentTopic, DocumentTopic.document_id == Document.id) .join(Topic, Topic.id == DocumentTopic.topic_id) .where(Topic.name == topic) ) # Folder filter if folder_id is not None: try: folder_uuid = uuid.UUID(folder_id) except ValueError: raise HTTPException(status_code=404, detail="Folder not found") stmt = stmt.where(Document.folder_id == folder_uuid) # Full-text search — plainto_tsquery on extracted_text (PostgreSQL only) # Wrapped in try/except so unit tests on SQLite are not broken (FOLD-05) fts_requested = q is not None and len(q) >= 2 if fts_requested: try: stmt = stmt.where( func.to_tsvector("english", func.coalesce(Document.extracted_text, "")).op("@@")( func.plainto_tsquery("english", q) ) ) except Exception: pass # FTS not available (e.g. SQLite) — return unfiltered results # Sort sort_col = Document.created_at # default: date if sort == "name": sort_col = Document.filename elif sort == "size": sort_col = Document.size_bytes if order == "asc": stmt = stmt.order_by(sort_col.asc()) else: stmt = stmt.order_by(sort_col.desc()) result = await session.execute(stmt) docs_orm = result.scalars().all() # is_shared subquery shared_result = await session.execute( select(Share.document_id).where(Share.owner_id == current_user.id) ) shared_ids = {row[0] for row in shared_result.fetchall()} # Serialize all_items = [] for doc in docs_orm: from services.storage import _doc_to_dict, _load_topic_names # noqa: PLC0415 topic_names = await _load_topic_names(session, doc.id) d = _doc_to_dict(doc, topic_names) d["is_shared"] = doc.id in shared_ids all_items.append(d) total = len(all_items) start = (page - 1) * per_page return { "items": all_items[start : start + per_page], "total": total, "page": page, "per_page": per_page, } # ── GET /api/documents/{doc_id} ─────────────────────────────────────────────── @router.get("/{doc_id}") async def get_document( doc_id: str, session: AsyncSession = Depends(get_db), current_user: User = Depends(get_regular_user), ): """Return document metadata by ID. D-16: requires authenticated regular user. Asserts ownership — cross-user access returns 404 (not 403) to avoid information leakage (T-03-11). """ try: uid = uuid.UUID(doc_id) except ValueError: raise HTTPException(404, "Document not found") doc = await session.get(Document, uid) if doc is None or doc.user_id != current_user.id: raise HTTPException(404, "Document not found") meta = await storage.get_metadata(session, doc_id) if meta is None: raise HTTPException(404, "Document not found") return meta # ── DELETE /api/documents/{doc_id} ─────────────────────────────────────────── @router.delete("/{doc_id}") async def delete_document( doc_id: str, session: AsyncSession = Depends(get_db), current_user: User = Depends(get_regular_user), ): """Delete a document and decrement quota atomically. services.storage.delete_document handles the atomic quota decrement (STORE-06, D-07) via GREATEST(0, used_bytes - delta) SQL. D-16: requires authenticated regular user. Asserts ownership — cross-user delete returns 404 (not 403) to avoid information leakage (T-03-11). """ try: uid = uuid.UUID(doc_id) except ValueError: raise HTTPException(404, "Document not found") doc = await session.get(Document, uid) if doc is None or doc.user_id != current_user.id: raise HTTPException(404, "Document not found") ok = await storage.delete_document(session, doc_id) if not ok: raise HTTPException(404, "Document not found") return {"success": True} # ── POST /api/documents/{doc_id}/classify ──────────────────────────────────── @router.post("/{doc_id}/classify") async def classify_document( doc_id: str, body: dict = {}, session: AsyncSession = Depends(get_db), current_user: User = Depends(get_regular_user), ): """Reclassify a document's topics on demand. D-16: requires authenticated regular user. Asserts ownership — cross-user classify returns 404 (not 403) to avoid information leakage (T-03-11). """ try: uid = uuid.UUID(doc_id) except ValueError: raise HTTPException(404, "Document not found") doc = await session.get(Document, uid) if doc is None or doc.user_id != current_user.id: raise HTTPException(404, "Document not found") topic_names = body.get("topics") if body else None try: topics = await classifier.classify_document(session, doc_id, topic_names) except Exception as e: raise HTTPException(500, f"Classification failed: {e}") return {"topics": topics} # ── Range header parsing helper ─────────────────────────────────────────────── def _parse_range(range_header: str, file_size: int) -> tuple: """Parse a 'bytes=X-Y' Range header and return (start, end). Returns (start, end) where both are inclusive byte offsets. Raises HTTP 416 on any invalid or out-of-bounds range. T-04-05-03: validates start <= end, start >= 0, end < file_size. """ try: h = range_header.replace("bytes=", "").split("-") start = int(h[0]) if h[0] != "" else 0 end = int(h[1]) if h[1] != "" else file_size - 1 except (ValueError, IndexError): raise HTTPException(status.HTTP_416_RANGE_NOT_SATISFIABLE) if start > end or start < 0 or end >= file_size: raise HTTPException(status.HTTP_416_RANGE_NOT_SATISFIABLE) return start, end # ── GET /api/documents/{doc_id}/content ────────────────────────────────────── @router.get("/{doc_id}/content") async def stream_document_content( doc_id: str, request: Request, session: AsyncSession = Depends(get_db), current_user: User = Depends(get_regular_user), ): """Stream document bytes directly from MinIO (DOC-02). T-04-05-01: uses get_regular_user — admin role → 403 (critical security invariant). T-04-05-02: bytes fetched via get_object() ONLY — presigned_get_url() never called. T-04-05-03: Range header validated via _parse_range(); invalid range → 416. T-04-05-04: access gated on ownership OR active Share.recipient_id. Returns 200 (or 206 for Range requests) with: Content-Type: doc.content_type Content-Disposition: inline; filename="" Accept-Ranges: bytes Content-Length: """ try: uid = uuid.UUID(doc_id) except ValueError: raise HTTPException(status_code=404, detail="Document not found") doc = await session.get(Document, uid) if doc is None: raise HTTPException(status_code=404, detail="Document not found") # Access control: owner OR share recipient (T-04-05-04) if doc.user_id != current_user.id: result = await session.execute( select(Share).where( Share.document_id == doc.id, Share.recipient_id == current_user.id, ) ) share = result.scalar_one_or_none() if share is None: raise HTTPException(status_code=404, detail="Document not found") # Fetch bytes directly from MinIO — NEVER via presigned URL (T-04-05-02) file_bytes = await get_storage_backend().get_object(doc.object_key) file_size = len(file_bytes) headers = { "content-type": doc.content_type, "content-disposition": f'inline; filename="{doc.filename}"', "accept-ranges": "bytes", "content-length": str(file_size), } range_header = request.headers.get("range") if range_header: start, end = _parse_range(range_header, file_size) chunk = file_bytes[start : end + 1] headers["content-range"] = f"bytes {start}-{end}/{file_size}" headers["content-length"] = str(len(chunk)) return StreamingResponse( iter([chunk]), status_code=206, headers=headers, ) return StreamingResponse( iter([file_bytes]), status_code=200, headers=headers, )