Files
kite/backend/celery_app.py
T
curo1305 0d51d023ce feat(03-02): implement presigned upload flow, quota enforcement, cleanup task
- Replace POST /api/documents/upload with POST /api/documents/upload-url + /{id}/confirm
- upload-url: create pending Document row with user_id=None (Wave 2), return presigned PUT URL
- confirm: stat MinIO for authoritative size (T-03-05), atomic quota UPDATE (T-03-06, STORE-03)
- Confirm returns 413 with {used_bytes, limit_bytes, rejected_bytes} on quota exceeded (STORE-05)
- Wave 2 guard: skip quota UPDATE when doc.user_id is None (Plan 03-03 removes this)
- Add GET /api/auth/me/quota to api/auth.py (STORE-04)
- services/storage.py: remove save_upload (D-04); add GREATEST(0, used_bytes-delta) quota decrement to delete_document (STORE-06)
- tasks/document_tasks.py: add cleanup_abandoned_uploads Celery beat task (D-06)
- celery_app.py: add beat_schedule for cleanup-abandoned-uploads every 30 minutes
- tests/test_documents.py: replace legacy /upload tests with xfail; add real test logic for upload-url/confirm/get-quota
- tests/test_quota.py: implement real test logic with xfail for PostgreSQL-specific SQL
2026-05-23 14:32:12 +02:00

48 lines
1.7 KiB
Python

"""
Celery application factory for DocuVault.
Kept deliberately minimal to avoid circular imports (Pitfall 7 from RESEARCH.md):
- DO NOT import from config (triggers pydantic-settings env-loading side effects)
- DO NOT import from main or any FastAPI router module
- Only os + celery imported here
REDIS_URL is read directly from os.environ so that this module can be imported
safely by the Celery worker process without pulling in the FastAPI application
machinery.
"""
import os
from datetime import timedelta as _timedelta
from celery import Celery
celery_app = Celery("docuvault")
# Broker + result backend — read REDIS_URL directly from env (not from config.settings)
_redis_url = os.environ.get("REDIS_URL", "redis://redis:6379/0")
celery_app.conf.broker_url = _redis_url
celery_app.conf.result_backend = _redis_url
# JSON-only serialization (safe default; avoids pickle deserialization risks)
celery_app.conf.task_serializer = "json"
celery_app.conf.result_serializer = "json"
celery_app.conf.accept_content = ["json"]
# Route document tasks to the dedicated `documents` queue;
# email tasks to the `email` queue (Phase 2 — D-03)
celery_app.conf.task_routes = {
"tasks.document_tasks.*": {"queue": "documents"},
"tasks.email_tasks.*": {"queue": "email"},
}
# Celery beat schedule: cleanup_abandoned_uploads runs every 30 minutes (D-06)
celery_app.conf.beat_schedule = {
"cleanup-abandoned-uploads": {
"task": "tasks.document_tasks.cleanup_abandoned_uploads",
"schedule": _timedelta(minutes=30),
},
}
celery_app.conf.timezone = "UTC"
# Autodiscover tasks under the `tasks/` package
celery_app.autodiscover_tasks(["tasks"], force=True)