f89f787656
- Create backend/tasks/audit_tasks.py: audit_log_daily_export task queries yesterday's AuditLog rows, writes CSV, uploads to MinIO audit-logs bucket via put_object_raw(bucket='audit-logs', ...) - All imports deferred inside _run_daily_export() to prevent circular imports - celery_app.py: add crontab import, beat entry at midnight UTC, tasks.audit_tasks.* routed to documents queue
57 lines
2.0 KiB
Python
57 lines
2.0 KiB
Python
"""
|
|
Celery application factory for DocuVault.
|
|
|
|
Kept deliberately minimal to avoid circular imports (Pitfall 7 from RESEARCH.md):
|
|
- DO NOT import from config (triggers pydantic-settings env-loading side effects)
|
|
- DO NOT import from main or any FastAPI router module
|
|
- Only os + celery imported here
|
|
|
|
REDIS_URL is read directly from os.environ so that this module can be imported
|
|
safely by the Celery worker process without pulling in the FastAPI application
|
|
machinery.
|
|
"""
|
|
import os
|
|
from datetime import timedelta as _timedelta
|
|
|
|
from celery import Celery
|
|
from celery.schedules import crontab as _crontab
|
|
|
|
celery_app = Celery("docuvault")
|
|
|
|
# Broker + result backend — read REDIS_URL directly from env (not from config.settings)
|
|
_redis_url = os.environ.get("REDIS_URL", "redis://redis:6379/0")
|
|
celery_app.conf.broker_url = _redis_url
|
|
celery_app.conf.result_backend = _redis_url
|
|
|
|
# JSON-only serialization (safe default; avoids pickle deserialization risks)
|
|
celery_app.conf.task_serializer = "json"
|
|
celery_app.conf.result_serializer = "json"
|
|
celery_app.conf.accept_content = ["json"]
|
|
|
|
# Route document tasks to the dedicated `documents` queue;
|
|
# email tasks to the `email` queue (Phase 2 — D-03);
|
|
# audit tasks to the `documents` queue (D-17 — reuse documents worker)
|
|
celery_app.conf.task_routes = {
|
|
"tasks.document_tasks.*": {"queue": "documents"},
|
|
"tasks.email_tasks.*": {"queue": "email"},
|
|
"tasks.audit_tasks.*": {"queue": "documents"},
|
|
}
|
|
|
|
# Celery beat schedule:
|
|
# cleanup-abandoned-uploads — every 30 minutes (D-06)
|
|
# audit-log-daily-export — midnight UTC daily (D-17)
|
|
celery_app.conf.beat_schedule = {
|
|
"cleanup-abandoned-uploads": {
|
|
"task": "tasks.document_tasks.cleanup_abandoned_uploads",
|
|
"schedule": _timedelta(minutes=30),
|
|
},
|
|
"audit-log-daily-export": {
|
|
"task": "tasks.audit_tasks.audit_log_daily_export",
|
|
"schedule": _crontab(hour=0, minute=0),
|
|
},
|
|
}
|
|
celery_app.conf.timezone = "UTC"
|
|
|
|
# Autodiscover tasks under the `tasks/` package
|
|
celery_app.autodiscover_tasks(["tasks"], force=True)
|