Files
kite/backend/celery_app.py
T
curo1305 9fc820d893 feat(02-01): implement services/auth.py full auth service layer and email_tasks.py
- services/auth.py: Argon2 password hashing (pwdlib), constant-time verify (SEC-06)
- JWT create/decode for access tokens and password-reset tokens (typ claim validation, T-02-01)
- Refresh token lifecycle: create, rotate, revoke-all with family revocation (AUTH-07, RFC 9700)
- Family revocation enqueues send_security_alert_email.delay on token reuse (T-02-02)
- TOTP provisioning (pyotp) and verification with Redis replay prevention, valid_window=1 (AUTH-08)
- Backup code generation (8-char hex uppercase), storage (Argon2 hashed), constant-time verify (T-02-03)
- HIBP k-anonymity check via SHA-1 prefix (T-02-05), fail-open on network error (T-02-06)
- Admin bootstrap: idempotent, logs WARNING if env vars missing (D-04/D-05/D-06)
- services/email.py: SMTP send + dev stdout fallback (D-01/D-02)
- tasks/email_tasks.py: send_reset_email and send_security_alert_email Celery tasks
- celery_app.py: add email queue route for tasks.email_tasks.*
- TDD tests: 17 tests covering all auth primitives and family revocation
2026-05-22 19:23:42 +02:00

38 lines
1.3 KiB
Python

"""
Celery application factory for DocuVault.
Kept deliberately minimal to avoid circular imports (Pitfall 7 from RESEARCH.md):
- DO NOT import from config (triggers pydantic-settings env-loading side effects)
- DO NOT import from main or any FastAPI router module
- Only os + celery imported here
REDIS_URL is read directly from os.environ so that this module can be imported
safely by the Celery worker process without pulling in the FastAPI application
machinery.
"""
import os
from celery import Celery
celery_app = Celery("docuvault")
# Broker + result backend — read REDIS_URL directly from env (not from config.settings)
_redis_url = os.environ.get("REDIS_URL", "redis://redis:6379/0")
celery_app.conf.broker_url = _redis_url
celery_app.conf.result_backend = _redis_url
# JSON-only serialization (safe default; avoids pickle deserialization risks)
celery_app.conf.task_serializer = "json"
celery_app.conf.result_serializer = "json"
celery_app.conf.accept_content = ["json"]
# Route document tasks to the dedicated `documents` queue;
# email tasks to the `email` queue (Phase 2 — D-03)
celery_app.conf.task_routes = {
"tasks.document_tasks.*": {"queue": "documents"},
"tasks.email_tasks.*": {"queue": "email"},
}
# Autodiscover tasks under the `tasks/` package
celery_app.autodiscover_tasks(["tasks"], force=True)