6849ebd1e6
- config.py: Remove SETTINGS_FILE, DEFAULT_SYSTEM_PROMPT, DEFAULT_SETTINGS constants; add system_prompt, default_ai_provider, default_ai_model to Settings - services/classifier.py: Add _DEFAULT_SYSTEM_PROMPT module constant; classify_document and suggest_topics_for_document accept ai_provider/ai_model kwargs; no longer calls storage.load_settings() — uses app_settings defaults with DB-supplied overrides (D-14, D-15) - services/storage.py: Delete load_settings, save_settings, mask_api_key, settings_masked; remove from __all__; remove import copy, json, DEFAULT_SETTINGS, SETTINGS_FILE (D-12) - tasks/document_tasks.py: _run resolves user.ai_provider/ai_model via session.get(User, doc.user_id) and passes through to classifier; task signature unchanged (T-03-19) - api/settings.py: Deleted — /api/settings endpoint removed (D-12) - main.py: Remove settings_router import and include_router call - tests/test_settings.py: Replace all tests with test_settings_endpoint_removed (404, green) - tests/test_classifier.py: Implement test_per_user_provider, test_celery_task_uses_user_provider, test_default_provider_fallback; remove xfail markers (DOC-03, DOC-05)
150 lines
6.0 KiB
Python
150 lines
6.0 KiB
Python
"""
|
|
Celery tasks for document processing in DocuVault.
|
|
|
|
extract_and_classify — called via .delay(document_id) by the upload handler.
|
|
The task is a plain sync def (Celery workers have no asyncio event loop); it
|
|
bridges into the async service layer via asyncio.run().
|
|
|
|
Flow:
|
|
1. Open a fresh AsyncSession (one per task invocation — never share sessions)
|
|
2. Look up the Document row to get the MinIO object_key
|
|
3. Retrieve file bytes from MinIO via the storage backend
|
|
4. Extract text from bytes using services.extractor
|
|
5. Persist extracted_text back to the Document row
|
|
6. Call services.classifier.classify_document to assign topics
|
|
7. Return a result dict (never raises — classification failures are non-fatal)
|
|
"""
|
|
import asyncio
|
|
|
|
from celery_app import celery_app
|
|
|
|
|
|
@celery_app.task(name="tasks.document_tasks.extract_and_classify")
|
|
def extract_and_classify(document_id: str) -> dict:
|
|
"""Synchronous Celery entry-point — delegates to async _run via asyncio.run."""
|
|
return asyncio.run(_run(document_id))
|
|
|
|
|
|
async def _run(document_id: str) -> dict:
|
|
"""Async body of extract_and_classify.
|
|
|
|
Opens its own AsyncSession (not shared with the upload request) to avoid
|
|
cross-thread session contamination.
|
|
"""
|
|
import uuid as _uuid
|
|
|
|
from db.session import AsyncSessionLocal
|
|
from db.models import Document
|
|
from services import extractor, classifier
|
|
from storage import get_storage_backend
|
|
|
|
async with AsyncSessionLocal() as session:
|
|
# ── Step 1: fetch Document row ─────────────────────────────────────────
|
|
try:
|
|
doc_uuid = _uuid.UUID(document_id)
|
|
except ValueError:
|
|
return {"document_id": document_id, "status": "invalid_id"}
|
|
|
|
doc = await session.get(Document, doc_uuid)
|
|
if doc is None:
|
|
return {"document_id": document_id, "status": "not_found"}
|
|
|
|
if not doc.object_key:
|
|
return {"document_id": document_id, "status": "missing_object"}
|
|
|
|
# ── Resolve per-user AI config (D-14, D-15) ────────────────────────────
|
|
from db.models import User
|
|
from config import settings as app_settings
|
|
user = await session.get(User, doc.user_id) if doc.user_id else None
|
|
ai_provider = (user.ai_provider if user else None) or app_settings.default_ai_provider
|
|
ai_model = (user.ai_model if user else None) or app_settings.default_ai_model
|
|
|
|
# ── Step 2: retrieve bytes from MinIO ──────────────────────────────────
|
|
try:
|
|
backend = get_storage_backend()
|
|
file_bytes = await backend.get_object(doc.object_key)
|
|
except Exception as e:
|
|
return {
|
|
"document_id": document_id,
|
|
"status": "extract_failed",
|
|
"error": f"MinIO retrieval failed: {e}",
|
|
}
|
|
|
|
# ── Step 3: extract text from bytes ────────────────────────────────────
|
|
try:
|
|
text = extractor.extract_text_from_bytes(file_bytes, doc.content_type)
|
|
doc.extracted_text = text
|
|
await session.commit()
|
|
except Exception as e:
|
|
return {
|
|
"document_id": document_id,
|
|
"status": "extract_failed",
|
|
"error": f"Text extraction failed: {e}",
|
|
}
|
|
|
|
# ── Step 4: classify document (non-fatal) ──────────────────────────────
|
|
try:
|
|
topics = await classifier.classify_document(session, document_id, ai_provider=ai_provider, ai_model=ai_model)
|
|
return {
|
|
"document_id": document_id,
|
|
"status": "classified",
|
|
"topics": topics,
|
|
}
|
|
except Exception as e:
|
|
# Non-fatal — preserve existing convention from api/documents.py
|
|
doc.status = "classification_failed"
|
|
await session.commit()
|
|
return {
|
|
"document_id": document_id,
|
|
"status": "classification_failed",
|
|
"error": str(e),
|
|
}
|
|
|
|
|
|
@celery_app.task(name="tasks.document_tasks.cleanup_abandoned_uploads")
|
|
def cleanup_abandoned_uploads() -> dict:
|
|
"""Periodic Celery beat task — deletes Document rows with status='pending'
|
|
older than 1 hour and their MinIO objects (D-06).
|
|
|
|
Enqueued by Celery beat every 30 minutes (celery_app.py beat_schedule).
|
|
Quota is never reserved for pending rows — no quota cleanup needed.
|
|
"""
|
|
return asyncio.run(_cleanup_abandoned())
|
|
|
|
|
|
async def _cleanup_abandoned() -> dict:
|
|
"""Async body for cleanup_abandoned_uploads.
|
|
|
|
Selects Document rows with status='pending' older than 1 hour,
|
|
removes their MinIO objects (best-effort), then deletes the DB rows.
|
|
Returns {"cleaned": N} count.
|
|
"""
|
|
from datetime import datetime, timezone, timedelta
|
|
from sqlalchemy import select
|
|
|
|
from db.session import AsyncSessionLocal
|
|
from db.models import Document
|
|
from storage import get_storage_backend
|
|
|
|
cutoff = datetime.now(timezone.utc) - timedelta(hours=1)
|
|
async with AsyncSessionLocal() as session:
|
|
result = await session.execute(
|
|
select(Document).where(
|
|
Document.status == "pending",
|
|
Document.created_at < cutoff,
|
|
)
|
|
)
|
|
docs = result.scalars().all()
|
|
backend = get_storage_backend()
|
|
cleaned = 0
|
|
for doc in docs:
|
|
try:
|
|
if doc.object_key:
|
|
await backend.delete_object(doc.object_key)
|
|
except Exception:
|
|
pass # MinIO object may not exist yet — safe to ignore
|
|
await session.delete(doc)
|
|
cleaned += 1
|
|
await session.commit()
|
|
return {"cleaned": cleaned}
|