""" Celery tasks for audit log export in DocuVault. audit_log_daily_export — scheduled by Celery beat at midnight UTC (D-17). The task queries the audit_log table for the previous calendar day (UTC), writes a CSV using the same field whitelist as _audit_to_dict() in api/audit.py, and uploads it to the MinIO audit-logs bucket via put_object_raw(). CSV field whitelist: id, event_type, user_id, actor_id, resource_id, ip_address, metadata_, created_at — no filename, extracted_text, or document content (ADMIN-06, D-15). """ import asyncio from celery_app import celery_app @celery_app.task(name="tasks.audit_tasks.audit_log_daily_export") def audit_log_daily_export() -> dict: """Synchronous Celery entry-point — delegates to async _run_daily_export via asyncio.run.""" return asyncio.run(_run_daily_export()) async def _run_daily_export() -> dict: """Async body of audit_log_daily_export. Queries yesterday's audit_log rows, writes CSV, and uploads to MinIO audit-logs bucket. All imports are deferred inside this function body to avoid circular imports (same pattern as document_tasks._run). """ from datetime import date, datetime, timedelta, timezone import csv import io from db.session import AsyncSessionLocal from db.models import AuditLog from sqlalchemy import select from storage import get_storage_backend yesterday = date.today() - timedelta(days=1) start = datetime(yesterday.year, yesterday.month, yesterday.day, tzinfo=timezone.utc) end = start + timedelta(days=1) try: async with AsyncSessionLocal() as session: result = await session.execute( select(AuditLog) .where(AuditLog.created_at >= start, AuditLog.created_at < end) .order_by(AuditLog.created_at) ) rows = result.scalars().all() fields = [ "id", "event_type", "user_id", "actor_id", "resource_id", "ip_address", "metadata_", "created_at", ] output = io.StringIO() writer = csv.DictWriter(output, fieldnames=fields) writer.writeheader() for row in rows: writer.writerow({ "id": row.id, "event_type": row.event_type, "user_id": str(row.user_id) if row.user_id else None, "actor_id": str(row.actor_id) if row.actor_id else None, "resource_id": str(row.resource_id) if row.resource_id else None, "ip_address": str(row.ip_address) if row.ip_address else None, "metadata_": row.metadata_, "created_at": row.created_at.isoformat(), }) csv_bytes = output.getvalue().encode("utf-8") key = f"audit-logs/{yesterday.isoformat()}.csv" await get_storage_backend().put_object_raw( bucket="audit-logs", key=key, data=io.BytesIO(csv_bytes), length=len(csv_bytes), content_type="text/csv", ) return {"exported": len(rows), "key": key, "date": yesterday.isoformat()} except Exception as e: import logging logging.getLogger(__name__).error("audit export failed: %s", e) return {"exported": 0, "error": str(e)}