feat(05-09): PATCH /documents/{id} endpoint + cloud-aware Celery re-analyze
- Add DocumentPatch Pydantic model with filename and folder_id optional fields
- Add PATCH /api/documents/{doc_id} endpoint: ownership guard, model_fields_set
to distinguish absent vs null folder_id, returns updated metadata dict
- Update _run() in document_tasks.py to use get_storage_backend_for_document
for non-MinIO backends instead of hardcoded MinIO path
- CloudConnectionError caught in cloud path: returns extract_failed status
- Update test to use pure unit mocks (no PostgreSQL) for _run() cloud routing
- All 3 plan tests pass; 23 test_cloud.py tests pass
This commit is contained in:
@@ -30,13 +30,17 @@ async def _run(document_id: str) -> dict:
|
||||
|
||||
Opens its own AsyncSession (not shared with the upload request) to avoid
|
||||
cross-thread session contamination.
|
||||
|
||||
Cloud-aware: when doc.storage_backend != 'minio', uses
|
||||
get_storage_backend_for_document() to retrieve bytes from the correct
|
||||
cloud backend instead of hardcoding MinIO.
|
||||
"""
|
||||
import uuid as _uuid
|
||||
|
||||
from db.session import AsyncSessionLocal
|
||||
from db.models import Document
|
||||
from services import extractor, classifier
|
||||
from storage import get_storage_backend
|
||||
from storage import get_storage_backend, get_storage_backend_for_document
|
||||
|
||||
async with AsyncSessionLocal() as session:
|
||||
# ── Step 1: fetch Document row ─────────────────────────────────────────
|
||||
@@ -59,15 +63,39 @@ async def _run(document_id: str) -> dict:
|
||||
ai_provider = (user.ai_provider if user else None) or app_settings.default_ai_provider
|
||||
ai_model = (user.ai_model if user else None) or app_settings.default_ai_model
|
||||
|
||||
# ── Step 2: retrieve bytes from MinIO ──────────────────────────────────
|
||||
# ── Step 2: retrieve bytes from the correct backend ────────────────────
|
||||
# Cloud-aware: routes to cloud backend for non-MinIO documents (Plan 09).
|
||||
# T-05-09-03: cloud credentials are loaded from DB inside this task's own
|
||||
# session — no credentials travel through the Celery broker message.
|
||||
try:
|
||||
backend = get_storage_backend()
|
||||
file_bytes = await backend.get_object(doc.object_key)
|
||||
if doc.storage_backend is None or doc.storage_backend == "minio":
|
||||
backend = get_storage_backend()
|
||||
file_bytes = await backend.get_object(doc.object_key)
|
||||
else:
|
||||
# Cloud path: user must be present (doc.user_id set at upload time)
|
||||
if user is None:
|
||||
return {"document_id": document_id, "status": "missing_user"}
|
||||
|
||||
try:
|
||||
from storage.google_drive_backend import CloudConnectionError
|
||||
except ImportError:
|
||||
class CloudConnectionError(Exception): # type: ignore[no-redef]
|
||||
pass
|
||||
|
||||
try:
|
||||
backend = await get_storage_backend_for_document(doc, user, session)
|
||||
file_bytes = await backend.get_object(doc.object_key)
|
||||
except CloudConnectionError:
|
||||
return {
|
||||
"document_id": document_id,
|
||||
"status": "extract_failed",
|
||||
"error": "cloud backend error",
|
||||
}
|
||||
except Exception as e:
|
||||
return {
|
||||
"document_id": document_id,
|
||||
"status": "extract_failed",
|
||||
"error": f"MinIO retrieval failed: {e}",
|
||||
"error": f"retrieval failed: {e}",
|
||||
}
|
||||
|
||||
# ── Step 3: extract text from bytes ────────────────────────────────────
|
||||
|
||||
Reference in New Issue
Block a user