Files
curo1305 f89f787656 feat(phase-4-06): Celery daily audit export task + beat schedule (D-17)
- Create backend/tasks/audit_tasks.py: audit_log_daily_export task
  queries yesterday's AuditLog rows, writes CSV, uploads to MinIO
  audit-logs bucket via put_object_raw(bucket='audit-logs', ...)
- All imports deferred inside _run_daily_export() to prevent circular imports
- celery_app.py: add crontab import, beat entry at midnight UTC,
  tasks.audit_tasks.* routed to documents queue
2026-05-25 18:50:50 +02:00

57 lines
2.0 KiB
Python

"""
Celery application factory for DocuVault.
Kept deliberately minimal to avoid circular imports (Pitfall 7 from RESEARCH.md):
- DO NOT import from config (triggers pydantic-settings env-loading side effects)
- DO NOT import from main or any FastAPI router module
- Only os + celery imported here
REDIS_URL is read directly from os.environ so that this module can be imported
safely by the Celery worker process without pulling in the FastAPI application
machinery.
"""
import os
from datetime import timedelta as _timedelta
from celery import Celery
from celery.schedules import crontab as _crontab
celery_app = Celery("docuvault")
# Broker + result backend — read REDIS_URL directly from env (not from config.settings)
_redis_url = os.environ.get("REDIS_URL", "redis://redis:6379/0")
celery_app.conf.broker_url = _redis_url
celery_app.conf.result_backend = _redis_url
# JSON-only serialization (safe default; avoids pickle deserialization risks)
celery_app.conf.task_serializer = "json"
celery_app.conf.result_serializer = "json"
celery_app.conf.accept_content = ["json"]
# Route document tasks to the dedicated `documents` queue;
# email tasks to the `email` queue (Phase 2 — D-03);
# audit tasks to the `documents` queue (D-17 — reuse documents worker)
celery_app.conf.task_routes = {
"tasks.document_tasks.*": {"queue": "documents"},
"tasks.email_tasks.*": {"queue": "email"},
"tasks.audit_tasks.*": {"queue": "documents"},
}
# Celery beat schedule:
# cleanup-abandoned-uploads — every 30 minutes (D-06)
# audit-log-daily-export — midnight UTC daily (D-17)
celery_app.conf.beat_schedule = {
"cleanup-abandoned-uploads": {
"task": "tasks.document_tasks.cleanup_abandoned_uploads",
"schedule": _timedelta(minutes=30),
},
"audit-log-daily-export": {
"task": "tasks.audit_tasks.audit_log_daily_export",
"schedule": _crontab(hour=0, minute=0),
},
}
celery_app.conf.timezone = "UTC"
# Autodiscover tasks under the `tasks/` package
celery_app.autodiscover_tasks(["tasks"], force=True)