""" Celery tasks for document processing in DocuVault. extract_and_classify — called via .delay(document_id) by the upload handler. The task is a plain sync def (Celery workers have no asyncio event loop); it bridges into the async service layer via asyncio.run(). Flow: 1. Open a fresh AsyncSession (one per task invocation — never share sessions) 2. Look up the Document row to get the MinIO object_key 3. Retrieve file bytes from MinIO via the storage backend 4. Extract text from bytes using services.extractor 5. Persist extracted_text back to the Document row 6. Call services.classifier.classify_document to assign topics 7. Return a result dict (never raises — classification failures are non-fatal) """ import asyncio from celery_app import celery_app @celery_app.task(name="tasks.document_tasks.extract_and_classify") def extract_and_classify(document_id: str) -> dict: """Synchronous Celery entry-point — delegates to async _run via asyncio.run.""" return asyncio.run(_run(document_id)) async def _run(document_id: str) -> dict: """Async body of extract_and_classify. Opens its own AsyncSession (not shared with the upload request) to avoid cross-thread session contamination. """ import uuid as _uuid from db.session import AsyncSessionLocal from db.models import Document from services import extractor, classifier from storage import get_storage_backend async with AsyncSessionLocal() as session: # ── Step 1: fetch Document row ───────────────────────────────────────── try: doc_uuid = _uuid.UUID(document_id) except ValueError: return {"document_id": document_id, "status": "invalid_id"} doc = await session.get(Document, doc_uuid) if doc is None: return {"document_id": document_id, "status": "not_found"} if not doc.object_key: return {"document_id": document_id, "status": "missing_object"} # ── Resolve per-user AI config (D-14, D-15) ──────────────────────────── from db.models import User from config import settings as app_settings user = await session.get(User, doc.user_id) if doc.user_id else None ai_provider = (user.ai_provider if user else None) or app_settings.default_ai_provider ai_model = (user.ai_model if user else None) or app_settings.default_ai_model # ── Step 2: retrieve bytes from MinIO ────────────────────────────────── try: backend = get_storage_backend() file_bytes = await backend.get_object(doc.object_key) except Exception as e: return { "document_id": document_id, "status": "extract_failed", "error": f"MinIO retrieval failed: {e}", } # ── Step 3: extract text from bytes ──────────────────────────────────── try: text = extractor.extract_text_from_bytes(file_bytes, doc.content_type) doc.extracted_text = text await session.commit() except Exception as e: return { "document_id": document_id, "status": "extract_failed", "error": f"Text extraction failed: {e}", } # ── Step 4: classify document (non-fatal) ────────────────────────────── try: topics = await classifier.classify_document(session, document_id, ai_provider=ai_provider, ai_model=ai_model) return { "document_id": document_id, "status": "classified", "topics": topics, } except Exception as e: # Non-fatal — preserve existing convention from api/documents.py doc.status = "classification_failed" await session.commit() return { "document_id": document_id, "status": "classification_failed", "error": str(e), } @celery_app.task(name="tasks.document_tasks.cleanup_abandoned_uploads") def cleanup_abandoned_uploads() -> dict: """Periodic Celery beat task — deletes Document rows with status='pending' older than 1 hour and their MinIO objects (D-06). Enqueued by Celery beat every 30 minutes (celery_app.py beat_schedule). Quota is never reserved for pending rows — no quota cleanup needed. """ return asyncio.run(_cleanup_abandoned()) async def _cleanup_abandoned() -> dict: """Async body for cleanup_abandoned_uploads. Selects Document rows with status='pending' older than 1 hour, removes their MinIO objects (best-effort), then deletes the DB rows. Returns {"cleaned": N} count. """ from datetime import datetime, timezone, timedelta from sqlalchemy import select from db.session import AsyncSessionLocal from db.models import Document from storage import get_storage_backend cutoff = datetime.now(timezone.utc) - timedelta(hours=1) async with AsyncSessionLocal() as session: result = await session.execute( select(Document).where( Document.status == "pending", Document.created_at < cutoff, ) ) docs = result.scalars().all() backend = get_storage_backend() cleaned = 0 for doc in docs: try: if doc.object_key: await backend.delete_object(doc.object_key) except Exception: pass # MinIO object may not exist yet — safe to ignore await session.delete(doc) cleaned += 1 await session.commit() return {"cleaned": cleaned}