Files
kite/backend/celery_app.py
T
curo1305 32d67de1ca feat(01-05): introduce celery_app + tasks/document_tasks + session-aware classifier
- Add backend/celery_app.py: Celery("docuvault") with Redis broker, JSON
  serialization, and tasks.document_tasks.* routed to documents queue;
  reads REDIS_URL directly from os.environ (no config import — Pitfall 7)
- Add backend/tasks/__init__.py: empty package marker
- Add backend/tasks/document_tasks.py: sync extract_and_classify Celery task
  that calls asyncio.run(_run()) to retrieve bytes from MinIO, extract text
  via extractor, and classify via classifier; classification failure is non-fatal
- Update backend/services/classifier.py: classify_document and
  suggest_topics_for_document now accept session: AsyncSession as first arg;
  all storage.* calls updated to async session-injection pattern
- Add extract_text_from_bytes helper to services/extractor.py for bytes-based
  extraction (used by Celery worker, which retrieves bytes from MinIO)
2026-05-22 09:45:33 +02:00

36 lines
1.2 KiB
Python

"""
Celery application factory for DocuVault.
Kept deliberately minimal to avoid circular imports (Pitfall 7 from RESEARCH.md):
- DO NOT import from config (triggers pydantic-settings env-loading side effects)
- DO NOT import from main or any FastAPI router module
- Only os + celery imported here
REDIS_URL is read directly from os.environ so that this module can be imported
safely by the Celery worker process without pulling in the FastAPI application
machinery.
"""
import os
from celery import Celery
celery_app = Celery("docuvault")
# Broker + result backend — read REDIS_URL directly from env (not from config.settings)
_redis_url = os.environ.get("REDIS_URL", "redis://redis:6379/0")
celery_app.conf.broker_url = _redis_url
celery_app.conf.result_backend = _redis_url
# JSON-only serialization (safe default; avoids pickle deserialization risks)
celery_app.conf.task_serializer = "json"
celery_app.conf.result_serializer = "json"
celery_app.conf.accept_content = ["json"]
# Route document tasks to the dedicated `documents` queue
celery_app.conf.task_routes = {
"tasks.document_tasks.*": {"queue": "documents"},
}
# Autodiscover tasks under the `tasks/` package
celery_app.autodiscover_tasks(["tasks"], force=True)