feat(03-02): implement presigned upload flow, quota enforcement, cleanup task

- Replace POST /api/documents/upload with POST /api/documents/upload-url + /{id}/confirm
- upload-url: create pending Document row with user_id=None (Wave 2), return presigned PUT URL
- confirm: stat MinIO for authoritative size (T-03-05), atomic quota UPDATE (T-03-06, STORE-03)
- Confirm returns 413 with {used_bytes, limit_bytes, rejected_bytes} on quota exceeded (STORE-05)
- Wave 2 guard: skip quota UPDATE when doc.user_id is None (Plan 03-03 removes this)
- Add GET /api/auth/me/quota to api/auth.py (STORE-04)
- services/storage.py: remove save_upload (D-04); add GREATEST(0, used_bytes-delta) quota decrement to delete_document (STORE-06)
- tasks/document_tasks.py: add cleanup_abandoned_uploads Celery beat task (D-06)
- celery_app.py: add beat_schedule for cleanup-abandoned-uploads every 30 minutes
- tests/test_documents.py: replace legacy /upload tests with xfail; add real test logic for upload-url/confirm/get-quota
- tests/test_quota.py: implement real test logic with xfail for PostgreSQL-specific SQL
This commit is contained in:
curo1305
2026-05-23 14:32:12 +02:00
parent 3ed6dd494f
commit 0d51d023ce
7 changed files with 626 additions and 196 deletions
+196 -51
View File
@@ -1,71 +1,194 @@
from datetime import datetime, timezone
"""
Document API endpoints for DocuVault — Phase 3 Wave 2.
Implements the presigned PUT upload flow (D-04, D-05):
POST /api/documents/upload-url — create pending Document row, return presigned URL
POST /api/documents/{id}/confirm — stat MinIO for authoritative size, atomic quota UPDATE
Preserved endpoints (auth guards added in Plan 03-03):
GET /api/documents — list documents
GET /api/documents/{id} — get document metadata
DELETE /api/documents/{id} — delete document (decrements quota atomically)
POST /api/documents/{id}/classify — reclassify document topics
NOTE (Wave 2): No auth guards on any endpoint yet — Plan 03-03 adds get_current_user
to all handlers. The doc.user_id=None guard in /confirm is a Wave 2 placeholder.
"""
from __future__ import annotations
import uuid
from pathlib import Path
from typing import Optional
from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, UploadFile
from fastapi import APIRouter, Depends, HTTPException, Query, status
from pydantic import BaseModel
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from db.models import Document, Quota
from deps.db import get_db
from services import classifier, extractor, storage
from services import classifier, storage
from storage import get_storage_backend
from tasks.document_tasks import extract_and_classify
try:
from minio.error import S3Error
except ImportError:
# Fallback for test environments where minio is not installed
S3Error = Exception # type: ignore[assignment,misc]
router = APIRouter(prefix="/api/documents", tags=["documents"])
ALLOWED_MIME_TYPES = {
"application/pdf",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"application/msword",
"text/plain",
"text/markdown",
"image/png",
"image/jpeg",
"image/jpg",
"image/tiff",
"image/webp",
}
# ── Request models ────────────────────────────────────────────────────────────
class UploadUrlRequest(BaseModel):
filename: str
content_type: str
@router.post("/upload")
async def upload_document(
file: UploadFile = File(...),
auto_classify: bool = Form(True),
# ── POST /api/documents/upload-url ───────────────────────────────────────────
@router.post("/upload-url")
async def request_upload_url(
body: UploadUrlRequest,
session: AsyncSession = Depends(get_db),
):
content = await file.read()
if len(content) == 0:
raise HTTPException(400, "Empty file")
"""Create a pending Document row and return a presigned PUT URL.
mime = file.content_type or "application/octet-stream"
D-05 step 1: FastAPI creates a Document row (status='pending'), generates a
15-minute presigned PUT URL, returns {upload_url, document_id}.
Quota is NOT reserved at this step — quota enforcement happens at /confirm.
saved = await storage.save_upload(session, content, file.filename or "upload", mime)
Wave 2 placeholder: user_id=None. Plan 03-03 replaces with current_user.id
and computes object_key as f"{current_user.id}/{doc_id}/{uuid4()}{suffix}".
# Extract text from the in-memory bytes (avoid a second MinIO round-trip at upload time)
text = extractor.extract_text_from_bytes(content, mime)
T-03-04: object_key is computed server-side; filename stored in DB only.
"""
doc_id = uuid.uuid4()
suffix = Path(body.filename).suffix.lower()
# Wave 2 placeholder — Plan 03-03 replaces "null-user" with str(current_user.id)
object_key = f"null-user/{doc_id}/{uuid.uuid4()}{suffix}"
now = datetime.now(timezone.utc).isoformat()
meta = {
"id": saved["id"],
"original_name": file.filename or "upload",
"filename": saved["filename"],
"mime_type": mime,
"size_bytes": len(content),
"extracted_text": text,
# Phase 1 cutover: topics are empty at upload time; the Celery worker
# fills them in asynchronously after extract_and_classify completes.
"topics": [],
"created_at": now,
"classified_at": None,
doc = Document(
id=doc_id,
user_id=None, # Wave 2 — Plan 03-03 replaces with current_user.id
filename=body.filename,
content_type=body.content_type,
size_bytes=0,
storage_backend="minio",
status="pending",
object_key=object_key,
)
session.add(doc)
await session.commit()
upload_url = await get_storage_backend().generate_presigned_put_url(
object_key, expires_minutes=15
)
return {"upload_url": upload_url, "document_id": str(doc_id)}
# ── POST /api/documents/{doc_id}/confirm ─────────────────────────────────────
@router.post("/{doc_id}/confirm")
async def confirm_upload(
doc_id: str,
session: AsyncSession = Depends(get_db),
):
"""Confirm a presigned PUT upload: stat MinIO for size, enforce quota atomically.
D-05 step 3: FastAPI reads authoritative file size from MinIO stat_object (never
from client), runs atomic quota UPDATE, sets status='uploaded', enqueues Celery task.
Quota exceeded: HTTP 413 with {"used_bytes": N, "limit_bytes": M, "rejected_bytes": K}
Upload not found: HTTP 422 (presigned URL may have expired)
Wave 2: doc.user_id is None — quota update is skipped entirely.
Plan 03-03 removes this guard once user_id is always set.
T-03-05: size always comes from backend.stat_object(doc.object_key) — never client.
T-03-06: atomic SQL UPDATE prevents concurrent over-quota uploads (STORE-03 SC2).
"""
try:
uid = uuid.UUID(doc_id)
except ValueError:
raise HTTPException(status_code=404, detail="Document not found")
doc = await session.get(Document, uid)
if doc is None:
raise HTTPException(status_code=404, detail="Document not found")
# Get authoritative file size from MinIO (T-03-05 — never trust client-supplied size)
try:
size = await get_storage_backend().stat_object(doc.object_key)
except Exception as exc:
code = getattr(exc, "code", "")
if code == "NoSuchKey":
raise HTTPException(
status_code=422,
detail="Upload not found — presigned URL may have expired",
)
raise HTTPException(status_code=502, detail=f"Storage error: {exc}")
doc.size_bytes = size
await session.flush()
# Wave 2: skip quota update if user_id is None (placeholder until Plan 03-03)
if doc.user_id is not None:
result = await session.execute(
text(
"UPDATE quotas "
"SET used_bytes = used_bytes + :delta "
"WHERE user_id = :uid "
" AND (used_bytes + :delta) <= limit_bytes "
"RETURNING used_bytes, limit_bytes"
),
{"delta": size, "uid": str(doc.user_id)},
)
row = result.fetchone()
if row is None:
# Quota exceeded — fetch current quota state for the 413 body
quota_result = await session.execute(
text("SELECT used_bytes, limit_bytes FROM quotas WHERE user_id = :uid"),
{"uid": str(doc.user_id)},
)
q = quota_result.fetchone()
# Delete the pending Document row and best-effort remove the MinIO object
await session.delete(doc)
try:
await get_storage_backend().delete_object(doc.object_key)
except Exception:
pass # MinIO cleanup is best-effort; object TTL will eventually expire
await session.commit()
raise HTTPException(
status_code=413,
detail={
"used_bytes": q.used_bytes if q else 0,
"limit_bytes": q.limit_bytes if q else 0,
"rejected_bytes": size,
},
)
used_bytes = row.used_bytes
else:
# Wave 2 placeholder: no quota row to update when user_id is None
used_bytes = 0
doc.status = "uploaded"
await session.commit()
extract_and_classify.delay(str(doc.id))
return {
"id": str(doc.id),
"size_bytes": size,
"used_bytes": used_bytes,
"status": "uploaded",
}
await storage.save_metadata(session, meta)
if auto_classify:
# Queue the extract+classify task on the Celery documents queue (STORE-08).
# The task re-fetches bytes from MinIO, extracts text, and classifies.
# The upload response returns topics=[] — polling GET /api/documents/{id}
# will show the populated topics once the worker completes.
extract_and_classify.delay(str(saved["id"]))
return meta
# ── GET /api/documents ────────────────────────────────────────────────────────
@router.get("")
async def list_documents(
@@ -74,43 +197,65 @@ async def list_documents(
per_page: int = Query(20, ge=1, le=100),
session: AsyncSession = Depends(get_db),
):
"""List documents, optionally filtered by topic.
NOTE (Wave 2): No auth guard — Plan 03-03 adds get_regular_user dependency.
"""
docs = await storage.list_metadata(session, topic=topic)
total = len(docs)
start = (page - 1) * per_page
return {"items": docs[start : start + per_page], "total": total, "page": page, "per_page": per_page}
# ── GET /api/documents/{doc_id} ───────────────────────────────────────────────
@router.get("/{doc_id}")
async def get_document(doc_id: str, session: AsyncSession = Depends(get_db)):
"""Return document metadata by ID.
NOTE (Wave 2): No auth guard — Plan 03-03 adds get_regular_user dependency.
"""
meta = await storage.get_metadata(session, doc_id)
if meta is None:
raise HTTPException(404, "Document not found")
return meta
# ── DELETE /api/documents/{doc_id} ───────────────────────────────────────────
@router.delete("/{doc_id}")
async def delete_document(doc_id: str, session: AsyncSession = Depends(get_db)):
"""Delete a document and decrement quota atomically.
services.storage.delete_document handles the atomic quota decrement
(STORE-06, D-07) via GREATEST(0, used_bytes - delta) SQL.
NOTE (Wave 2): No auth guard — Plan 03-03 adds get_regular_user dependency.
"""
ok = await storage.delete_document(session, doc_id)
if not ok:
raise HTTPException(404, "Document not found")
return {"success": True}
# ── POST /api/documents/{doc_id}/classify ────────────────────────────────────
@router.post("/{doc_id}/classify")
async def classify_document(
doc_id: str,
body: dict = {},
session: AsyncSession = Depends(get_db),
):
"""Reclassify a document's topics on demand.
NOTE (Wave 2): No auth guard — Plan 03-03 adds get_regular_user dependency.
"""
meta = await storage.get_metadata(session, doc_id)
if meta is None:
raise HTTPException(404, "Document not found")
topic_names = body.get("topics") if body else None
try:
# The /classify endpoint calls classifier synchronously and returns the
# topic list immediately — this preserves the historical behavior.
# The upload-time path uses Celery .delay() instead (Phase 1 cutover).
topics = await classifier.classify_document(session, doc_id, topic_names)
except Exception as e:
raise HTTPException(500, f"Classification failed: {e}")