Files
kite/backend/tasks/document_tasks.py
T
curo1305 a548266461 refactor(backend): extract shared helper modules per architecture rules
- Add backend/ai/utils.py — parse_classification, parse_suggestions, strip_code_fences
  shared by all AI providers; removes duplicated private functions from
  anthropic_provider.py and openai_provider.py
- Add backend/deps/utils.py — get_client_ip, parse_uuid request-parsing helpers;
  removes local _ip() variants from admin.py, auth.py, shares.py, folders.py
- Add backend/storage/exceptions.py — canonical CloudConnectionError definition;
  all routers and backends import from here instead of redefining
- Move validate_password_strength to backend/services/auth.py; removes duplicated
  _validate_password_strength from admin.py and auth.py

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-02 16:10:35 +02:00

173 lines
7.2 KiB
Python

"""
Celery tasks for document processing in DocuVault.
extract_and_classify — called via .delay(document_id) by the upload handler.
The task is a plain sync def (Celery workers have no asyncio event loop); it
bridges into the async service layer via asyncio.run().
Flow:
1. Open a fresh AsyncSession (one per task invocation — never share sessions)
2. Look up the Document row to get the MinIO object_key
3. Retrieve file bytes from MinIO via the storage backend
4. Extract text from bytes using services.extractor
5. Persist extracted_text back to the Document row
6. Call services.classifier.classify_document to assign topics
7. Return a result dict (never raises — classification failures are non-fatal)
"""
import asyncio
from celery_app import celery_app
@celery_app.task(name="tasks.document_tasks.extract_and_classify")
def extract_and_classify(document_id: str) -> dict:
"""Synchronous Celery entry-point — delegates to async _run via asyncio.run."""
return asyncio.run(_run(document_id))
async def _run(document_id: str) -> dict:
"""Async body of extract_and_classify.
Opens its own AsyncSession (not shared with the upload request) to avoid
cross-thread session contamination.
Cloud-aware: when doc.storage_backend != 'minio', uses
get_storage_backend_for_document() to retrieve bytes from the correct
cloud backend instead of hardcoding MinIO.
"""
import uuid as _uuid
from db.session import AsyncSessionLocal
from db.models import Document
from services import extractor, classifier
from storage import get_storage_backend, get_storage_backend_for_document
async with AsyncSessionLocal() as session:
# ── Step 1: fetch Document row ─────────────────────────────────────────
try:
doc_uuid = _uuid.UUID(document_id)
except ValueError:
return {"document_id": document_id, "status": "invalid_id"}
doc = await session.get(Document, doc_uuid)
if doc is None:
return {"document_id": document_id, "status": "not_found"}
if not doc.object_key:
return {"document_id": document_id, "status": "missing_object"}
# ── Resolve per-user AI config (D-14, D-15) ────────────────────────────
from db.models import User
from config import settings as app_settings
user = await session.get(User, doc.user_id) if doc.user_id else None
ai_provider = (user.ai_provider if user else None) or app_settings.default_ai_provider
ai_model = (user.ai_model if user else None) or app_settings.default_ai_model
# ── Step 2: retrieve bytes from the correct backend ────────────────────
# Cloud-aware: routes to cloud backend for non-MinIO documents (Plan 09).
# T-05-09-03: cloud credentials are loaded from DB inside this task's own
# session — no credentials travel through the Celery broker message.
try:
if doc.storage_backend is None or doc.storage_backend == "minio":
backend = get_storage_backend()
file_bytes = await backend.get_object(doc.object_key)
else:
# Cloud path: user must be present (doc.user_id set at upload time)
if user is None:
return {"document_id": document_id, "status": "missing_user"}
from storage.exceptions import CloudConnectionError
try:
backend = await get_storage_backend_for_document(doc, user, session)
file_bytes = await backend.get_object(doc.object_key)
except CloudConnectionError:
return {
"document_id": document_id,
"status": "extract_failed",
"error": "cloud backend error",
}
except Exception as e:
return {
"document_id": document_id,
"status": "extract_failed",
"error": f"retrieval failed: {e}",
}
# ── Step 3: extract text from bytes ────────────────────────────────────
try:
text = extractor.extract_text_from_bytes(file_bytes, doc.content_type)
doc.extracted_text = text
await session.commit()
except Exception as e:
return {
"document_id": document_id,
"status": "extract_failed",
"error": f"Text extraction failed: {e}",
}
# ── Step 4: classify document (non-fatal) ──────────────────────────────
try:
topics = await classifier.classify_document(session, document_id, ai_provider=ai_provider, ai_model=ai_model)
return {
"document_id": document_id,
"status": "classified",
"topics": topics,
}
except Exception as e:
# Non-fatal — preserve existing convention from api/documents.py
doc.status = "classification_failed"
await session.commit()
return {
"document_id": document_id,
"status": "classification_failed",
"error": str(e),
}
@celery_app.task(name="tasks.document_tasks.cleanup_abandoned_uploads")
def cleanup_abandoned_uploads() -> dict:
"""Periodic Celery beat task — deletes Document rows with status='pending'
older than 1 hour and their MinIO objects (D-06).
Enqueued by Celery beat every 30 minutes (celery_app.py beat_schedule).
Quota is never reserved for pending rows — no quota cleanup needed.
"""
return asyncio.run(_cleanup_abandoned())
async def _cleanup_abandoned() -> dict:
"""Async body for cleanup_abandoned_uploads.
Selects Document rows with status='pending' older than 1 hour,
removes their MinIO objects (best-effort), then deletes the DB rows.
Returns {"cleaned": N} count.
"""
from datetime import datetime, timezone, timedelta
from sqlalchemy import select
from db.session import AsyncSessionLocal
from db.models import Document
from storage import get_storage_backend
cutoff = datetime.now(timezone.utc) - timedelta(hours=1)
async with AsyncSessionLocal() as session:
result = await session.execute(
select(Document).where(
Document.status == "pending",
Document.created_at < cutoff,
)
)
docs = result.scalars().all()
backend = get_storage_backend()
cleaned = 0
for doc in docs:
try:
if doc.object_key:
await backend.delete_object(doc.object_key)
except Exception:
pass # MinIO object may not exist yet — safe to ignore
await session.delete(doc)
cleaned += 1
await session.commit()
return {"cleaned": cleaned}