0d51d023ce
- Replace POST /api/documents/upload with POST /api/documents/upload-url + /{id}/confirm
- upload-url: create pending Document row with user_id=None (Wave 2), return presigned PUT URL
- confirm: stat MinIO for authoritative size (T-03-05), atomic quota UPDATE (T-03-06, STORE-03)
- Confirm returns 413 with {used_bytes, limit_bytes, rejected_bytes} on quota exceeded (STORE-05)
- Wave 2 guard: skip quota UPDATE when doc.user_id is None (Plan 03-03 removes this)
- Add GET /api/auth/me/quota to api/auth.py (STORE-04)
- services/storage.py: remove save_upload (D-04); add GREATEST(0, used_bytes-delta) quota decrement to delete_document (STORE-06)
- tasks/document_tasks.py: add cleanup_abandoned_uploads Celery beat task (D-06)
- celery_app.py: add beat_schedule for cleanup-abandoned-uploads every 30 minutes
- tests/test_documents.py: replace legacy /upload tests with xfail; add real test logic for upload-url/confirm/get-quota
- tests/test_quota.py: implement real test logic with xfail for PostgreSQL-specific SQL
48 lines
1.7 KiB
Python
48 lines
1.7 KiB
Python
"""
|
|
Celery application factory for DocuVault.
|
|
|
|
Kept deliberately minimal to avoid circular imports (Pitfall 7 from RESEARCH.md):
|
|
- DO NOT import from config (triggers pydantic-settings env-loading side effects)
|
|
- DO NOT import from main or any FastAPI router module
|
|
- Only os + celery imported here
|
|
|
|
REDIS_URL is read directly from os.environ so that this module can be imported
|
|
safely by the Celery worker process without pulling in the FastAPI application
|
|
machinery.
|
|
"""
|
|
import os
|
|
from datetime import timedelta as _timedelta
|
|
|
|
from celery import Celery
|
|
|
|
celery_app = Celery("docuvault")
|
|
|
|
# Broker + result backend — read REDIS_URL directly from env (not from config.settings)
|
|
_redis_url = os.environ.get("REDIS_URL", "redis://redis:6379/0")
|
|
celery_app.conf.broker_url = _redis_url
|
|
celery_app.conf.result_backend = _redis_url
|
|
|
|
# JSON-only serialization (safe default; avoids pickle deserialization risks)
|
|
celery_app.conf.task_serializer = "json"
|
|
celery_app.conf.result_serializer = "json"
|
|
celery_app.conf.accept_content = ["json"]
|
|
|
|
# Route document tasks to the dedicated `documents` queue;
|
|
# email tasks to the `email` queue (Phase 2 — D-03)
|
|
celery_app.conf.task_routes = {
|
|
"tasks.document_tasks.*": {"queue": "documents"},
|
|
"tasks.email_tasks.*": {"queue": "email"},
|
|
}
|
|
|
|
# Celery beat schedule: cleanup_abandoned_uploads runs every 30 minutes (D-06)
|
|
celery_app.conf.beat_schedule = {
|
|
"cleanup-abandoned-uploads": {
|
|
"task": "tasks.document_tasks.cleanup_abandoned_uploads",
|
|
"schedule": _timedelta(minutes=30),
|
|
},
|
|
}
|
|
celery_app.conf.timezone = "UTC"
|
|
|
|
# Autodiscover tasks under the `tasks/` package
|
|
celery_app.autodiscover_tasks(["tasks"], force=True)
|