feat(05-06): extend upload and content-proxy endpoints for cloud backends
- Add POST /api/documents/upload multipart endpoint with target_backend form field
- Cloud backends (google_drive, onedrive, nextcloud, webdav) use direct put_object()
- MinIO path generates presigned PUT URL (unchanged flow)
- Cloud uploads do NOT touch quota (D-11: separate backend)
- GET /api/documents/{id}/content now uses get_storage_backend_for_document
- CloudConnectionError from any cloud op raises HTTPException(503) with safe message
- target_backend validated against _CLOUD_PROVIDERS allowlist (T-05-06-01)
- Import CloudConnectionError with fallback stub for envs without google-auth deps
This commit is contained in:
+187
-6
@@ -1,15 +1,20 @@
|
||||
"""
|
||||
Document API endpoints for DocuVault — Phase 3 Wave 2.
|
||||
Document API endpoints for DocuVault — Phase 3 Wave 2 / Phase 5 Plan 06.
|
||||
|
||||
Implements the presigned PUT upload flow (D-04, D-05):
|
||||
POST /api/documents/upload-url — create pending Document row, return presigned URL
|
||||
POST /api/documents/{id}/confirm — stat MinIO for authoritative size, atomic quota UPDATE
|
||||
|
||||
Cloud upload path (D-10, D-14, D-15 — Phase 5 Plan 06):
|
||||
POST /api/documents/upload — multipart upload with target_backend parameter;
|
||||
cloud backends bypass presigned URL and use direct put_object()
|
||||
|
||||
Preserved endpoints (auth guards added in Plan 03-03):
|
||||
GET /api/documents — list documents
|
||||
GET /api/documents/{id} — get document metadata
|
||||
DELETE /api/documents/{id} — delete document (decrements quota atomically)
|
||||
POST /api/documents/{id}/classify — reclassify document topics
|
||||
GET /api/documents/{id}/content — stream document bytes (all backends, Phase 5 Plan 06)
|
||||
|
||||
NOTE (Wave 2): No auth guards on any endpoint yet — Plan 03-03 adds get_current_user
|
||||
to all handlers. The doc.user_id=None guard in /confirm is a Wave 2 placeholder.
|
||||
@@ -20,18 +25,20 @@ import uuid
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
|
||||
from fastapi import APIRouter, Depends, Form, HTTPException, Query, Request, UploadFile, File, status
|
||||
from fastapi.responses import StreamingResponse
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy import select, text, func
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from db.models import Document, Quota, Share, User
|
||||
from config import settings
|
||||
from db.models import CloudConnection, Document, Quota, Share, User
|
||||
from deps.auth import get_regular_user
|
||||
from deps.db import get_db
|
||||
from services import classifier, storage
|
||||
from services.audit import write_audit_log
|
||||
from storage import get_storage_backend
|
||||
from storage import get_storage_backend, get_storage_backend_for_document
|
||||
from storage.cloud_utils import decrypt_credentials
|
||||
from tasks.document_tasks import extract_and_classify
|
||||
|
||||
try:
|
||||
@@ -40,6 +47,18 @@ except ImportError:
|
||||
# Fallback for test environments where minio is not installed
|
||||
S3Error = Exception # type: ignore[assignment,misc]
|
||||
|
||||
try:
|
||||
from storage.google_drive_backend import CloudConnectionError
|
||||
except ImportError:
|
||||
# Fallback: define a stub so the except clause compiles even if google deps absent
|
||||
class CloudConnectionError(Exception): # type: ignore[no-redef]
|
||||
def __init__(self, msg: str = "", *, reason: str = "") -> None:
|
||||
super().__init__(msg)
|
||||
self.reason = reason
|
||||
|
||||
# Valid cloud backend slugs (T-05-06-01: validated against allowlist, not user-supplied string)
|
||||
_CLOUD_PROVIDERS = frozenset({"google_drive", "onedrive", "nextcloud", "webdav"})
|
||||
|
||||
router = APIRouter(prefix="/api/documents", tags=["documents"])
|
||||
|
||||
|
||||
@@ -91,6 +110,159 @@ async def request_upload_url(
|
||||
return {"upload_url": upload_url, "document_id": str(doc_id)}
|
||||
|
||||
|
||||
# ── POST /api/documents/upload ────────────────────────────────────────────────
|
||||
|
||||
@router.post("/upload")
|
||||
async def upload_document(
|
||||
file: UploadFile = File(...),
|
||||
target_backend: str = Form("minio"),
|
||||
request: Request = None,
|
||||
session: AsyncSession = Depends(get_db),
|
||||
current_user: User = Depends(get_regular_user),
|
||||
):
|
||||
"""Direct multipart upload endpoint supporting cloud backends (D-10, D-14, D-15).
|
||||
|
||||
If target_backend == "minio": generates a presigned PUT URL (unchanged MinIO flow).
|
||||
If target_backend in ("google_drive", "onedrive", "nextcloud", "webdav"):
|
||||
1. Reads file bytes from UploadFile
|
||||
2. Loads CloudConnection for current_user.id + target_backend; 404 if not found/not ACTIVE
|
||||
3. Decrypts credentials and instantiates the correct backend class
|
||||
4. Calls cloud_backend.put_object() to upload directly to the provider
|
||||
5. Creates Document with storage_backend=target_backend
|
||||
6. Returns {document_id, storage_backend} — no upload_url (cloud upload is synchronous)
|
||||
|
||||
Cloud uploads do NOT use the atomic quota UPDATE — cloud files are not counted
|
||||
against MinIO quota (D-11: separate backends; cloud storage quota is provider-side).
|
||||
|
||||
Security:
|
||||
T-05-06-01: target_backend validated against _CLOUD_PROVIDERS allowlist → 422 on invalid value
|
||||
T-05-06-02: CloudConnectionError detail message never includes provider error detail
|
||||
"""
|
||||
if target_backend == "minio":
|
||||
# MinIO: generate a presigned URL for client-side PUT (existing flow reused)
|
||||
doc_id = uuid.uuid4()
|
||||
suffix = Path(file.filename or "file").suffix.lower()
|
||||
object_key = f"{current_user.id}/{doc_id}/{uuid.uuid4()}{suffix}"
|
||||
|
||||
doc = Document(
|
||||
id=doc_id,
|
||||
user_id=current_user.id,
|
||||
filename=file.filename or "upload",
|
||||
content_type=file.content_type or "application/octet-stream",
|
||||
size_bytes=0,
|
||||
storage_backend="minio",
|
||||
status="pending",
|
||||
object_key=object_key,
|
||||
)
|
||||
session.add(doc)
|
||||
await session.commit()
|
||||
|
||||
upload_url = await get_storage_backend().generate_presigned_put_url(
|
||||
object_key, expires_minutes=15
|
||||
)
|
||||
return {"upload_url": upload_url, "document_id": str(doc_id)}
|
||||
|
||||
# Cloud backend path
|
||||
if target_backend not in _CLOUD_PROVIDERS:
|
||||
raise HTTPException(
|
||||
status_code=422,
|
||||
detail=f"Invalid target_backend '{target_backend}'. Valid values: minio, {', '.join(sorted(_CLOUD_PROVIDERS))}",
|
||||
)
|
||||
|
||||
# Load active CloudConnection for current user + provider (T-05-06-01: user-scoped query)
|
||||
result = await session.execute(
|
||||
select(CloudConnection).where(
|
||||
CloudConnection.user_id == current_user.id,
|
||||
CloudConnection.provider == target_backend,
|
||||
CloudConnection.status == "ACTIVE",
|
||||
)
|
||||
)
|
||||
conn = result.scalar_one_or_none()
|
||||
if conn is None:
|
||||
raise HTTPException(
|
||||
status_code=404,
|
||||
detail=f"No active {target_backend} connection found. Please connect in Settings.",
|
||||
)
|
||||
|
||||
# Decrypt per-user credentials
|
||||
master_key = settings.cloud_creds_key.encode()
|
||||
credentials = decrypt_credentials(master_key, str(current_user.id), conn.credentials_enc)
|
||||
|
||||
# Read file bytes
|
||||
file_bytes = await file.read()
|
||||
filename = file.filename or "upload"
|
||||
content_type = file.content_type or "application/octet-stream"
|
||||
extension = Path(filename).suffix.lower()
|
||||
|
||||
doc_id = uuid.uuid4()
|
||||
|
||||
# Instantiate backend and upload
|
||||
if target_backend == "google_drive":
|
||||
from storage.google_drive_backend import GoogleDriveBackend # lazy import
|
||||
cloud_backend = GoogleDriveBackend(credentials)
|
||||
elif target_backend == "onedrive":
|
||||
from storage.onedrive_backend import OneDriveBackend # lazy import
|
||||
cloud_backend = OneDriveBackend(credentials)
|
||||
elif target_backend == "nextcloud":
|
||||
from storage.nextcloud_backend import NextcloudBackend # lazy import
|
||||
cloud_backend = NextcloudBackend(
|
||||
credentials["server_url"],
|
||||
credentials["username"],
|
||||
credentials["password"],
|
||||
)
|
||||
elif target_backend == "webdav":
|
||||
from storage.webdav_backend import WebDAVBackend # lazy import
|
||||
cloud_backend = WebDAVBackend(
|
||||
credentials["server_url"],
|
||||
credentials["username"],
|
||||
credentials["password"],
|
||||
)
|
||||
|
||||
try:
|
||||
object_key = await cloud_backend.put_object(
|
||||
str(current_user.id),
|
||||
str(doc_id),
|
||||
file_bytes,
|
||||
extension,
|
||||
content_type,
|
||||
)
|
||||
except CloudConnectionError as exc:
|
||||
raise HTTPException(
|
||||
status_code=503,
|
||||
detail="Cloud connection requires re-authentication. Please reconnect in Settings.",
|
||||
) from exc
|
||||
|
||||
doc = Document(
|
||||
id=doc_id,
|
||||
user_id=current_user.id,
|
||||
filename=filename,
|
||||
content_type=content_type,
|
||||
size_bytes=len(file_bytes),
|
||||
storage_backend=target_backend,
|
||||
status="uploaded",
|
||||
object_key=object_key,
|
||||
)
|
||||
session.add(doc)
|
||||
|
||||
_ip = (
|
||||
request.headers.get("X-Forwarded-For") or (request.client.host if request.client else None)
|
||||
) if request else None
|
||||
await write_audit_log(
|
||||
session,
|
||||
event_type="document.uploaded",
|
||||
user_id=current_user.id,
|
||||
actor_id=current_user.id,
|
||||
resource_id=doc.id,
|
||||
ip_address=_ip,
|
||||
metadata_={"size_bytes": len(file_bytes), "storage_backend": target_backend},
|
||||
)
|
||||
await session.commit()
|
||||
|
||||
extract_and_classify.delay(str(doc.id))
|
||||
|
||||
return {"document_id": str(doc.id), "storage_backend": target_backend}
|
||||
|
||||
|
||||
# ── POST /api/documents/{doc_id}/confirm ─────────────────────────────────────
|
||||
|
||||
@router.post("/{doc_id}/confirm")
|
||||
@@ -494,8 +666,17 @@ async def stream_document_content(
|
||||
if share is None:
|
||||
raise HTTPException(status_code=404, detail="Document not found")
|
||||
|
||||
# Fetch bytes directly from MinIO — NEVER via presigned URL (T-04-05-02)
|
||||
file_bytes = await get_storage_backend().get_object(doc.object_key)
|
||||
# Fetch bytes from the correct backend — get_storage_backend_for_document handles
|
||||
# all backends (MinIO, Google Drive, OneDrive, Nextcloud, WebDAV) transparently
|
||||
# (D-15, T-04-05-02). NEVER via presigned URL for cloud backends (D-14).
|
||||
try:
|
||||
storage_backend = await get_storage_backend_for_document(doc, current_user, session)
|
||||
file_bytes = await storage_backend.get_object(doc.object_key)
|
||||
except CloudConnectionError as exc:
|
||||
raise HTTPException(
|
||||
status_code=503,
|
||||
detail="Cloud connection requires re-authentication. Please reconnect in Settings.",
|
||||
) from exc
|
||||
file_size = len(file_bytes)
|
||||
|
||||
headers = {
|
||||
|
||||
Reference in New Issue
Block a user