f89f787656
- Create backend/tasks/audit_tasks.py: audit_log_daily_export task queries yesterday's AuditLog rows, writes CSV, uploads to MinIO audit-logs bucket via put_object_raw(bucket='audit-logs', ...) - All imports deferred inside _run_daily_export() to prevent circular imports - celery_app.py: add crontab import, beat entry at midnight UTC, tasks.audit_tasks.* routed to documents queue
93 lines
3.3 KiB
Python
93 lines
3.3 KiB
Python
"""
|
|
Celery tasks for audit log export in DocuVault.
|
|
|
|
audit_log_daily_export — scheduled by Celery beat at midnight UTC (D-17).
|
|
|
|
The task queries the audit_log table for the previous calendar day (UTC),
|
|
writes a CSV using the same field whitelist as _audit_to_dict() in api/audit.py,
|
|
and uploads it to the MinIO audit-logs bucket via put_object_raw().
|
|
|
|
CSV field whitelist: id, event_type, user_id, actor_id, resource_id,
|
|
ip_address, metadata_, created_at — no filename, extracted_text, or
|
|
document content (ADMIN-06, D-15).
|
|
"""
|
|
import asyncio
|
|
|
|
from celery_app import celery_app
|
|
|
|
|
|
@celery_app.task(name="tasks.audit_tasks.audit_log_daily_export")
|
|
def audit_log_daily_export() -> dict:
|
|
"""Synchronous Celery entry-point — delegates to async _run_daily_export via asyncio.run."""
|
|
return asyncio.run(_run_daily_export())
|
|
|
|
|
|
async def _run_daily_export() -> dict:
|
|
"""Async body of audit_log_daily_export.
|
|
|
|
Queries yesterday's audit_log rows, writes CSV, and uploads to MinIO
|
|
audit-logs bucket. All imports are deferred inside this function body to
|
|
avoid circular imports (same pattern as document_tasks._run).
|
|
"""
|
|
from datetime import date, datetime, timedelta, timezone
|
|
import csv
|
|
import io
|
|
from db.session import AsyncSessionLocal
|
|
from db.models import AuditLog
|
|
from sqlalchemy import select
|
|
from storage import get_storage_backend
|
|
|
|
yesterday = date.today() - timedelta(days=1)
|
|
start = datetime(yesterday.year, yesterday.month, yesterday.day, tzinfo=timezone.utc)
|
|
end = start + timedelta(days=1)
|
|
|
|
try:
|
|
async with AsyncSessionLocal() as session:
|
|
result = await session.execute(
|
|
select(AuditLog)
|
|
.where(AuditLog.created_at >= start, AuditLog.created_at < end)
|
|
.order_by(AuditLog.created_at)
|
|
)
|
|
rows = result.scalars().all()
|
|
|
|
fields = [
|
|
"id",
|
|
"event_type",
|
|
"user_id",
|
|
"actor_id",
|
|
"resource_id",
|
|
"ip_address",
|
|
"metadata_",
|
|
"created_at",
|
|
]
|
|
output = io.StringIO()
|
|
writer = csv.DictWriter(output, fieldnames=fields)
|
|
writer.writeheader()
|
|
for row in rows:
|
|
writer.writerow({
|
|
"id": row.id,
|
|
"event_type": row.event_type,
|
|
"user_id": str(row.user_id) if row.user_id else None,
|
|
"actor_id": str(row.actor_id) if row.actor_id else None,
|
|
"resource_id": str(row.resource_id) if row.resource_id else None,
|
|
"ip_address": str(row.ip_address) if row.ip_address else None,
|
|
"metadata_": row.metadata_,
|
|
"created_at": row.created_at.isoformat(),
|
|
})
|
|
|
|
csv_bytes = output.getvalue().encode("utf-8")
|
|
key = f"audit-logs/{yesterday.isoformat()}.csv"
|
|
await get_storage_backend().put_object_raw(
|
|
bucket="audit-logs",
|
|
key=key,
|
|
data=io.BytesIO(csv_bytes),
|
|
length=len(csv_bytes),
|
|
content_type="text/csv",
|
|
)
|
|
return {"exported": len(rows), "key": key, "date": yesterday.isoformat()}
|
|
|
|
except Exception as e:
|
|
import logging
|
|
logging.getLogger(__name__).error("audit export failed: %s", e)
|
|
return {"exported": 0, "error": str(e)}
|