""" Admin audit log API endpoints for DocuVault. All handlers require get_current_admin (ADMIN-06, SEC-07) — regular users receive 403 Forbidden. Implements: GET /api/admin/audit-log — paginated, filtered audit log viewer GET /api/admin/audit-log/export — CSV streaming export with same filters Security invariants: - Both endpoints use Depends(get_current_admin) — verified by grep - _audit_to_dict() is a pure whitelist: no filename, extracted_text, password_hash, or credentials_enc can appear in responses (ADMIN-06, D-15) - CSV export uses the same _audit_to_dict() helper as the JSON viewer """ from __future__ import annotations import csv import io import uuid from datetime import datetime from typing import Optional from fastapi import APIRouter, Depends, Query from fastapi.responses import StreamingResponse from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from db.models import AuditLog, User from deps.auth import get_current_admin from deps.db import get_db router = APIRouter(prefix="/api/admin", tags=["audit"]) # ── Safe response helper ────────────────────────────────────────────────────── def _audit_to_dict(entry: AuditLog) -> dict: """Safe audit log serializer — never includes filename, extracted_text, or document content (ADMIN-06, D-15). Whitelist: id, event_type, user_id, actor_id, resource_id, ip_address, metadata_, created_at. No other keys are possible. """ return { "id": entry.id, "event_type": entry.event_type, "user_id": str(entry.user_id) if entry.user_id else None, "actor_id": str(entry.actor_id) if entry.actor_id else None, "resource_id": str(entry.resource_id) if entry.resource_id else None, "ip_address": str(entry.ip_address) if entry.ip_address else None, "metadata_": entry.metadata_, "created_at": entry.created_at.isoformat(), } # ── Query builder helper ────────────────────────────────────────────────────── def _build_filtered_query( start: Optional[datetime], end: Optional[datetime], user_id: Optional[uuid.UUID], event_type: Optional[str], ): """Return a SQLAlchemy Select for AuditLog with the given filters applied. Shared by both the paginated viewer and the CSV export endpoints to ensure consistent filter semantics. """ q = select(AuditLog).order_by(AuditLog.created_at.desc()) if start is not None: q = q.where(AuditLog.created_at >= start) if end is not None: q = q.where(AuditLog.created_at <= end) if user_id is not None: q = q.where(AuditLog.user_id == user_id) if event_type is not None: q = q.where(AuditLog.event_type == event_type) return q # ── Endpoints ───────────────────────────────────────────────────────────────── @router.get("/audit-log") async def list_audit_log( start: Optional[datetime] = Query(default=None), end: Optional[datetime] = Query(default=None), user_id: Optional[uuid.UUID] = Query(default=None), event_type: Optional[str] = Query(default=None), page: int = Query(default=1, ge=1), per_page: int = Query(default=50, ge=1, le=500), session: AsyncSession = Depends(get_db), _admin: User = Depends(get_current_admin), ) -> dict: """Return paginated, filtered audit log entries (ADMIN-06). Response: { items: [...], total: int, page: int, per_page: int } Entries never contain filename, extracted_text, or document content (D-15). """ base_q = _build_filtered_query(start, end, user_id, event_type) # Total count — same filters, no limit/offset count_q = select(func.count()).select_from(base_q.subquery()) count_result = await session.execute(count_q) total = count_result.scalar_one() # Paginated rows paginated_q = base_q.limit(per_page).offset((page - 1) * per_page) result = await session.execute(paginated_q) entries = result.scalars().all() return { "items": [_audit_to_dict(e) for e in entries], "total": total, "page": page, "per_page": per_page, } @router.get("/audit-log/export") async def export_audit_log( start: Optional[datetime] = Query(default=None), end: Optional[datetime] = Query(default=None), user_id: Optional[uuid.UUID] = Query(default=None), event_type: Optional[str] = Query(default=None), format: str = Query(default="csv"), # noqa: A002 session: AsyncSession = Depends(get_db), _admin: User = Depends(get_current_admin), ) -> StreamingResponse: """Stream a CSV export of filtered audit log entries (ADMIN-06). Uses the same _audit_to_dict() whitelist as the JSON viewer — no filename, extracted_text, or document content appears in the export (D-15, T-04-06-02). Returns StreamingResponse with Content-Disposition: attachment; filename=audit-export.csv. """ q = _build_filtered_query(start, end, user_id, event_type) result = await session.execute(q) entries = result.scalars().all() fields = [ "id", "event_type", "user_id", "actor_id", "resource_id", "ip_address", "metadata_", "created_at", ] output = io.StringIO() writer = csv.DictWriter(output, fieldnames=fields) writer.writeheader() for entry in entries: writer.writerow(_audit_to_dict(entry)) return StreamingResponse( iter([output.getvalue()]), media_type="text/csv", headers={"Content-Disposition": "attachment; filename=audit-export.csv"}, )