feat(phase-4-06): Celery daily audit export task + beat schedule (D-17)
- Create backend/tasks/audit_tasks.py: audit_log_daily_export task queries yesterday's AuditLog rows, writes CSV, uploads to MinIO audit-logs bucket via put_object_raw(bucket='audit-logs', ...) - All imports deferred inside _run_daily_export() to prevent circular imports - celery_app.py: add crontab import, beat entry at midnight UTC, tasks.audit_tasks.* routed to documents queue
This commit is contained in:
+11
-2
@@ -14,6 +14,7 @@ import os
|
||||
from datetime import timedelta as _timedelta
|
||||
|
||||
from celery import Celery
|
||||
from celery.schedules import crontab as _crontab
|
||||
|
||||
celery_app = Celery("docuvault")
|
||||
|
||||
@@ -28,18 +29,26 @@ celery_app.conf.result_serializer = "json"
|
||||
celery_app.conf.accept_content = ["json"]
|
||||
|
||||
# Route document tasks to the dedicated `documents` queue;
|
||||
# email tasks to the `email` queue (Phase 2 — D-03)
|
||||
# email tasks to the `email` queue (Phase 2 — D-03);
|
||||
# audit tasks to the `documents` queue (D-17 — reuse documents worker)
|
||||
celery_app.conf.task_routes = {
|
||||
"tasks.document_tasks.*": {"queue": "documents"},
|
||||
"tasks.email_tasks.*": {"queue": "email"},
|
||||
"tasks.audit_tasks.*": {"queue": "documents"},
|
||||
}
|
||||
|
||||
# Celery beat schedule: cleanup_abandoned_uploads runs every 30 minutes (D-06)
|
||||
# Celery beat schedule:
|
||||
# cleanup-abandoned-uploads — every 30 minutes (D-06)
|
||||
# audit-log-daily-export — midnight UTC daily (D-17)
|
||||
celery_app.conf.beat_schedule = {
|
||||
"cleanup-abandoned-uploads": {
|
||||
"task": "tasks.document_tasks.cleanup_abandoned_uploads",
|
||||
"schedule": _timedelta(minutes=30),
|
||||
},
|
||||
"audit-log-daily-export": {
|
||||
"task": "tasks.audit_tasks.audit_log_daily_export",
|
||||
"schedule": _crontab(hour=0, minute=0),
|
||||
},
|
||||
}
|
||||
celery_app.conf.timezone = "UTC"
|
||||
|
||||
|
||||
@@ -0,0 +1,92 @@
|
||||
"""
|
||||
Celery tasks for audit log export in DocuVault.
|
||||
|
||||
audit_log_daily_export — scheduled by Celery beat at midnight UTC (D-17).
|
||||
|
||||
The task queries the audit_log table for the previous calendar day (UTC),
|
||||
writes a CSV using the same field whitelist as _audit_to_dict() in api/audit.py,
|
||||
and uploads it to the MinIO audit-logs bucket via put_object_raw().
|
||||
|
||||
CSV field whitelist: id, event_type, user_id, actor_id, resource_id,
|
||||
ip_address, metadata_, created_at — no filename, extracted_text, or
|
||||
document content (ADMIN-06, D-15).
|
||||
"""
|
||||
import asyncio
|
||||
|
||||
from celery_app import celery_app
|
||||
|
||||
|
||||
@celery_app.task(name="tasks.audit_tasks.audit_log_daily_export")
|
||||
def audit_log_daily_export() -> dict:
|
||||
"""Synchronous Celery entry-point — delegates to async _run_daily_export via asyncio.run."""
|
||||
return asyncio.run(_run_daily_export())
|
||||
|
||||
|
||||
async def _run_daily_export() -> dict:
|
||||
"""Async body of audit_log_daily_export.
|
||||
|
||||
Queries yesterday's audit_log rows, writes CSV, and uploads to MinIO
|
||||
audit-logs bucket. All imports are deferred inside this function body to
|
||||
avoid circular imports (same pattern as document_tasks._run).
|
||||
"""
|
||||
from datetime import date, datetime, timedelta, timezone
|
||||
import csv
|
||||
import io
|
||||
from db.session import AsyncSessionLocal
|
||||
from db.models import AuditLog
|
||||
from sqlalchemy import select
|
||||
from storage import get_storage_backend
|
||||
|
||||
yesterday = date.today() - timedelta(days=1)
|
||||
start = datetime(yesterday.year, yesterday.month, yesterday.day, tzinfo=timezone.utc)
|
||||
end = start + timedelta(days=1)
|
||||
|
||||
try:
|
||||
async with AsyncSessionLocal() as session:
|
||||
result = await session.execute(
|
||||
select(AuditLog)
|
||||
.where(AuditLog.created_at >= start, AuditLog.created_at < end)
|
||||
.order_by(AuditLog.created_at)
|
||||
)
|
||||
rows = result.scalars().all()
|
||||
|
||||
fields = [
|
||||
"id",
|
||||
"event_type",
|
||||
"user_id",
|
||||
"actor_id",
|
||||
"resource_id",
|
||||
"ip_address",
|
||||
"metadata_",
|
||||
"created_at",
|
||||
]
|
||||
output = io.StringIO()
|
||||
writer = csv.DictWriter(output, fieldnames=fields)
|
||||
writer.writeheader()
|
||||
for row in rows:
|
||||
writer.writerow({
|
||||
"id": row.id,
|
||||
"event_type": row.event_type,
|
||||
"user_id": str(row.user_id) if row.user_id else None,
|
||||
"actor_id": str(row.actor_id) if row.actor_id else None,
|
||||
"resource_id": str(row.resource_id) if row.resource_id else None,
|
||||
"ip_address": str(row.ip_address) if row.ip_address else None,
|
||||
"metadata_": row.metadata_,
|
||||
"created_at": row.created_at.isoformat(),
|
||||
})
|
||||
|
||||
csv_bytes = output.getvalue().encode("utf-8")
|
||||
key = f"audit-logs/{yesterday.isoformat()}.csv"
|
||||
await get_storage_backend().put_object_raw(
|
||||
bucket="audit-logs",
|
||||
key=key,
|
||||
data=io.BytesIO(csv_bytes),
|
||||
length=len(csv_bytes),
|
||||
content_type="text/csv",
|
||||
)
|
||||
return {"exported": len(rows), "key": key, "date": yesterday.isoformat()}
|
||||
|
||||
except Exception as e:
|
||||
import logging
|
||||
logging.getLogger(__name__).error("audit export failed: %s", e)
|
||||
return {"exported": 0, "error": str(e)}
|
||||
Reference in New Issue
Block a user