From f89f78765613c74ee9b85c9fa47cd022bb060748 Mon Sep 17 00:00:00 2001 From: curo1305 Date: Mon, 25 May 2026 18:50:50 +0200 Subject: [PATCH] feat(phase-4-06): Celery daily audit export task + beat schedule (D-17) - Create backend/tasks/audit_tasks.py: audit_log_daily_export task queries yesterday's AuditLog rows, writes CSV, uploads to MinIO audit-logs bucket via put_object_raw(bucket='audit-logs', ...) - All imports deferred inside _run_daily_export() to prevent circular imports - celery_app.py: add crontab import, beat entry at midnight UTC, tasks.audit_tasks.* routed to documents queue --- backend/celery_app.py | 13 ++++- backend/tasks/audit_tasks.py | 92 ++++++++++++++++++++++++++++++++++++ 2 files changed, 103 insertions(+), 2 deletions(-) create mode 100644 backend/tasks/audit_tasks.py diff --git a/backend/celery_app.py b/backend/celery_app.py index c83b1f3..a048a91 100644 --- a/backend/celery_app.py +++ b/backend/celery_app.py @@ -14,6 +14,7 @@ import os from datetime import timedelta as _timedelta from celery import Celery +from celery.schedules import crontab as _crontab celery_app = Celery("docuvault") @@ -28,18 +29,26 @@ celery_app.conf.result_serializer = "json" celery_app.conf.accept_content = ["json"] # Route document tasks to the dedicated `documents` queue; -# email tasks to the `email` queue (Phase 2 — D-03) +# email tasks to the `email` queue (Phase 2 — D-03); +# audit tasks to the `documents` queue (D-17 — reuse documents worker) celery_app.conf.task_routes = { "tasks.document_tasks.*": {"queue": "documents"}, "tasks.email_tasks.*": {"queue": "email"}, + "tasks.audit_tasks.*": {"queue": "documents"}, } -# Celery beat schedule: cleanup_abandoned_uploads runs every 30 minutes (D-06) +# Celery beat schedule: +# cleanup-abandoned-uploads — every 30 minutes (D-06) +# audit-log-daily-export — midnight UTC daily (D-17) celery_app.conf.beat_schedule = { "cleanup-abandoned-uploads": { "task": "tasks.document_tasks.cleanup_abandoned_uploads", "schedule": _timedelta(minutes=30), }, + "audit-log-daily-export": { + "task": "tasks.audit_tasks.audit_log_daily_export", + "schedule": _crontab(hour=0, minute=0), + }, } celery_app.conf.timezone = "UTC" diff --git a/backend/tasks/audit_tasks.py b/backend/tasks/audit_tasks.py new file mode 100644 index 0000000..b401f5f --- /dev/null +++ b/backend/tasks/audit_tasks.py @@ -0,0 +1,92 @@ +""" +Celery tasks for audit log export in DocuVault. + +audit_log_daily_export — scheduled by Celery beat at midnight UTC (D-17). + +The task queries the audit_log table for the previous calendar day (UTC), +writes a CSV using the same field whitelist as _audit_to_dict() in api/audit.py, +and uploads it to the MinIO audit-logs bucket via put_object_raw(). + +CSV field whitelist: id, event_type, user_id, actor_id, resource_id, +ip_address, metadata_, created_at — no filename, extracted_text, or +document content (ADMIN-06, D-15). +""" +import asyncio + +from celery_app import celery_app + + +@celery_app.task(name="tasks.audit_tasks.audit_log_daily_export") +def audit_log_daily_export() -> dict: + """Synchronous Celery entry-point — delegates to async _run_daily_export via asyncio.run.""" + return asyncio.run(_run_daily_export()) + + +async def _run_daily_export() -> dict: + """Async body of audit_log_daily_export. + + Queries yesterday's audit_log rows, writes CSV, and uploads to MinIO + audit-logs bucket. All imports are deferred inside this function body to + avoid circular imports (same pattern as document_tasks._run). + """ + from datetime import date, datetime, timedelta, timezone + import csv + import io + from db.session import AsyncSessionLocal + from db.models import AuditLog + from sqlalchemy import select + from storage import get_storage_backend + + yesterday = date.today() - timedelta(days=1) + start = datetime(yesterday.year, yesterday.month, yesterday.day, tzinfo=timezone.utc) + end = start + timedelta(days=1) + + try: + async with AsyncSessionLocal() as session: + result = await session.execute( + select(AuditLog) + .where(AuditLog.created_at >= start, AuditLog.created_at < end) + .order_by(AuditLog.created_at) + ) + rows = result.scalars().all() + + fields = [ + "id", + "event_type", + "user_id", + "actor_id", + "resource_id", + "ip_address", + "metadata_", + "created_at", + ] + output = io.StringIO() + writer = csv.DictWriter(output, fieldnames=fields) + writer.writeheader() + for row in rows: + writer.writerow({ + "id": row.id, + "event_type": row.event_type, + "user_id": str(row.user_id) if row.user_id else None, + "actor_id": str(row.actor_id) if row.actor_id else None, + "resource_id": str(row.resource_id) if row.resource_id else None, + "ip_address": str(row.ip_address) if row.ip_address else None, + "metadata_": row.metadata_, + "created_at": row.created_at.isoformat(), + }) + + csv_bytes = output.getvalue().encode("utf-8") + key = f"audit-logs/{yesterday.isoformat()}.csv" + await get_storage_backend().put_object_raw( + bucket="audit-logs", + key=key, + data=io.BytesIO(csv_bytes), + length=len(csv_bytes), + content_type="text/csv", + ) + return {"exported": len(rows), "key": key, "date": yesterday.isoformat()} + + except Exception as e: + import logging + logging.getLogger(__name__).error("audit export failed: %s", e) + return {"exported": 0, "error": str(e)}