""" Admin audit log API endpoints for DocuVault. All handlers require get_current_admin (ADMIN-06, SEC-07) — regular users receive 403 Forbidden. Implements: GET /api/admin/audit-log — paginated, filtered audit log viewer GET /api/admin/audit-log/export — CSV streaming export with same filters GET /api/admin/audit-log/daily-exports — list available Celery daily export files GET /api/admin/audit-log/daily-exports/{date} — stream a specific daily export CSV Security invariants: - All endpoints use Depends(get_current_admin) — verified by grep - _audit_to_dict() is a pure whitelist: no filename, extracted_text, password_hash, or credentials_enc can appear in responses (ADMIN-06, D-15) - CSV export uses the same _audit_to_dict_with_handles() helper as the JSON viewer - Date path parameter validated against YYYY-MM-DD regex before MinIO key construction — prevents path traversal (T-06.2-04-01, Pitfall 6) """ from __future__ import annotations import asyncio import csv import io import json import re import uuid from datetime import datetime from typing import Optional from fastapi import APIRouter, Depends, HTTPException, Query from fastapi.responses import StreamingResponse from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import aliased from db.models import AuditLog, User from deps.auth import get_current_admin from deps.db import get_db from storage import get_storage_backend from storage.minio_backend import MinIOBackend router = APIRouter(prefix="/api/admin", tags=["audit"]) # ── Safe response helpers ───────────────────────────────────────────────────── def _audit_to_dict(entry: AuditLog) -> dict: """Safe audit log serializer — never includes filename, extracted_text, or document content (ADMIN-06, D-15). Whitelist: id, event_type, user_id, actor_id, resource_id, ip_address, metadata_, created_at. No other keys are possible. """ return { "id": entry.id, "event_type": entry.event_type, "user_id": str(entry.user_id) if entry.user_id else None, "actor_id": str(entry.actor_id) if entry.actor_id else None, "resource_id": str(entry.resource_id) if entry.resource_id else None, "ip_address": str(entry.ip_address) if entry.ip_address else None, "metadata_": entry.metadata_, "created_at": entry.created_at.isoformat(), } def _audit_to_dict_with_handles( entry: AuditLog, user_handle: Optional[str], actor_handle: Optional[str], ) -> dict: """Extended audit log serializer that includes user_handle and actor_handle. Returns the same fields as _audit_to_dict() plus: - user_handle: str | None (the handle of the user who owns the entry) - actor_handle: str | None (the handle of the actor who performed the event) Used by both the JSON viewer and CSV export endpoints (Pitfall 7 — both endpoints must use the enriched function). """ return { "id": entry.id, "event_type": entry.event_type, "user_id": str(entry.user_id) if entry.user_id else None, "actor_id": str(entry.actor_id) if entry.actor_id else None, "user_handle": user_handle or None, "actor_handle": actor_handle or None, "resource_id": str(entry.resource_id) if entry.resource_id else None, "ip_address": str(entry.ip_address) if entry.ip_address else None, "metadata_": entry.metadata_, "created_at": entry.created_at.isoformat(), } # ── Query builder helpers ───────────────────────────────────────────────────── def _build_filtered_query( start: Optional[datetime], end: Optional[datetime], user_id: Optional[uuid.UUID], event_type: Optional[str], ): """Return a SQLAlchemy Select for AuditLog with the given filters applied. Shared by count queries in both the paginated viewer and the CSV export endpoints to ensure consistent filter semantics. NOTE: This function selects AuditLog only (no JOIN). It is used for COUNT queries to avoid the subquery ambiguity that arises with multi-column JOINs (Pitfall 4). Data queries use _build_filtered_query_with_handles() instead. """ q = select(AuditLog).order_by(AuditLog.created_at.desc()) if start is not None: q = q.where(AuditLog.created_at >= start) if end is not None: q = q.where(AuditLog.created_at <= end) if user_id is not None: q = q.where(AuditLog.user_id == user_id) if event_type is not None: q = q.where(AuditLog.event_type.like(f"{event_type}%")) return q def _build_filtered_query_with_handles( start: Optional[datetime], end: Optional[datetime], user_uuid: Optional[uuid.UUID], event_type: Optional[str], ): """Return a multi-column Select that joins User twice for handle enrichment. Yields (AuditLog, user_handle: str|None, actor_handle: str|None) tuples. Uses SQLAlchemy aliased() to join User twice without collision: - UserSubject: resolves user_id FK → handle - UserActor: resolves actor_id FK → handle outerjoin() ensures entries with NULL user_id or actor_id are still returned. """ UserSubject = aliased(User) UserActor = aliased(User) q = ( select( AuditLog, UserSubject.handle.label("user_handle"), UserActor.handle.label("actor_handle"), ) .outerjoin(UserSubject, UserSubject.id == AuditLog.user_id) .outerjoin(UserActor, UserActor.id == AuditLog.actor_id) .order_by(AuditLog.created_at.desc()) ) if start is not None: q = q.where(AuditLog.created_at >= start) if end is not None: q = q.where(AuditLog.created_at <= end) if user_uuid is not None: q = q.where(AuditLog.user_id == user_uuid) if event_type is not None: q = q.where(AuditLog.event_type.like(f"{event_type}%")) return q # ── Endpoints ───────────────────────────────────────────────────────────────── # IMPORTANT: daily-export routes are registered BEFORE /audit-log and # /audit-log/export so FastAPI matches the more specific paths first. @router.get("/audit-log/daily-exports") async def list_daily_exports( _admin: User = Depends(get_current_admin), ) -> dict: """List available Celery daily audit export files from MinIO (D-15). Returns: { items: [{ date: "YYYY-MM-DD", key: "audit-logs/YYYY-MM-DD.csv" }] } Items are sorted descending by date. Security: requires get_current_admin — regular users receive 403 (T-06.2-04-02). Event loop safety: list_objects() is synchronous; wrapped in asyncio.to_thread to avoid blocking the event loop (T-06.2-04-05). """ backend = get_storage_backend() if not isinstance(backend, MinIOBackend): return {"items": []} def _list() -> list: objects = backend._client.list_objects( "audit-logs", prefix="audit-logs/", recursive=False ) items = [] for obj in objects: name = obj.object_name or "" if name.endswith(".csv"): date_str = name.removeprefix("audit-logs/").removesuffix(".csv") items.append({"date": date_str, "key": name}) items.sort(key=lambda x: x["date"], reverse=True) return items items = await asyncio.to_thread(_list) return {"items": items} @router.get("/audit-log/daily-exports/{date}") async def download_daily_export( date: str, _admin: User = Depends(get_current_admin), ) -> StreamingResponse: """Stream a specific Celery daily audit export file from MinIO (D-16). The date path parameter is validated against YYYY-MM-DD regex before MinIO key construction to prevent path traversal (T-06.2-04-01, Pitfall 6). Returns: StreamingResponse with Content-Type: text/csv. Security: requires get_current_admin — regular users receive 403 (T-06.2-04-02). """ if not re.fullmatch(r"\d{4}-\d{2}-\d{2}", date): raise HTTPException(status_code=404, detail="Invalid date format") backend = get_storage_backend() if not isinstance(backend, MinIOBackend): raise HTTPException(status_code=404, detail="Export not found") key = f"audit-logs/{date}.csv" def _get() -> bytes: response = backend._client.get_object("audit-logs", key) try: return response.read() finally: response.close() response.release_conn() try: csv_bytes = await asyncio.to_thread(_get) except Exception: raise HTTPException(status_code=404, detail="Export not found") return StreamingResponse( iter([csv_bytes]), media_type="text/csv", headers={"Content-Disposition": f'attachment; filename="audit-{date}.csv"'}, ) @router.get("/audit-log") async def list_audit_log( start: Optional[datetime] = Query(default=None), end: Optional[datetime] = Query(default=None), user_handle: Optional[str] = Query(default=None), event_type: Optional[str] = Query(default=None), page: int = Query(default=1, ge=1), per_page: int = Query(default=50, ge=1, le=500), session: AsyncSession = Depends(get_db), _admin: User = Depends(get_current_admin), ) -> dict: """Return paginated, filtered audit log entries (ADMIN-06). Response: { items: [...], total: int, page: int, per_page: int } Each item includes user_handle and actor_handle alongside UUID fields (D-11). Entries never contain filename, extracted_text, or document content (D-15). user_handle filter: accepts a plain string handle and resolves to UUID internally. Returns empty results (not 422) for unknown handles (D-12). """ # Handle-to-UUID resolution (D-12, Pattern 4) user_uuid: Optional[uuid.UUID] = None if user_handle: handle_result = await session.execute( select(User.id).where(User.handle == user_handle) ) uid = handle_result.scalar_one_or_none() if uid is None: # No user with that handle — return empty results (D-12) return {"items": [], "total": 0, "page": page, "per_page": per_page} user_uuid = uid # Count query: use the plain _build_filtered_query (no JOIN) to avoid # COUNT ambiguity on multi-column subqueries (Pitfall 4) count_q = select(func.count(AuditLog.id)).where(True) if start is not None: count_q = count_q.where(AuditLog.created_at >= start) if end is not None: count_q = count_q.where(AuditLog.created_at <= end) if user_uuid is not None: count_q = count_q.where(AuditLog.user_id == user_uuid) if event_type is not None: count_q = count_q.where(AuditLog.event_type.like(f"{event_type}%")) count_result = await session.execute(count_q) total = count_result.scalar_one() # Data query: use enriched JOIN for handle fields data_q = _build_filtered_query_with_handles(start, end, user_uuid, event_type) data_q = data_q.limit(per_page).offset((page - 1) * per_page) result = await session.execute(data_q) rows = result.all() items = [] for row in rows: entry, user_handle_val, actor_handle_val = row[0], row[1], row[2] items.append(_audit_to_dict_with_handles(entry, user_handle_val, actor_handle_val)) return { "items": items, "total": total, "page": page, "per_page": per_page, } @router.get("/audit-log/export") async def export_audit_log( start: Optional[datetime] = Query(default=None), end: Optional[datetime] = Query(default=None), user_handle: Optional[str] = Query(default=None), event_type: Optional[str] = Query(default=None), format: str = Query(default="csv"), # noqa: A002 session: AsyncSession = Depends(get_db), _admin: User = Depends(get_current_admin), ) -> StreamingResponse: """Stream a CSV export of filtered audit log entries (ADMIN-06). Uses the same _audit_to_dict_with_handles() whitelist as the JSON viewer — includes user_handle and actor_handle; no filename, extracted_text, or document content appears in the export (D-15, T-04-06-02, Pitfall 7). Returns StreamingResponse with Content-Disposition: attachment; filename=audit-export.csv. user_handle filter: same handle-to-UUID resolution as the viewer (D-12). """ # Handle-to-UUID resolution (D-12) — same logic as list_audit_log user_uuid: Optional[uuid.UUID] = None if user_handle: handle_result = await session.execute( select(User.id).where(User.handle == user_handle) ) uid = handle_result.scalar_one_or_none() if uid is None: # Unknown handle — return empty CSV empty_output = io.StringIO() fields = [ "id", "event_type", "user_id", "actor_id", "user_handle", "actor_handle", "resource_id", "ip_address", "metadata_", "created_at", ] writer = csv.DictWriter(empty_output, fieldnames=fields) writer.writeheader() return StreamingResponse( iter([empty_output.getvalue()]), media_type="text/csv", headers={"Content-Disposition": "attachment; filename=audit-export.csv"}, ) user_uuid = uid # Data query with handle enrichment (Pitfall 7 — export must use enriched function) q = _build_filtered_query_with_handles(start, end, user_uuid, event_type) result = await session.execute(q) rows = result.all() fields = [ "id", "event_type", "user_id", "actor_id", "user_handle", "actor_handle", "resource_id", "ip_address", "metadata_", "created_at", ] output = io.StringIO() writer = csv.DictWriter(output, fieldnames=fields) writer.writeheader() for row in rows: entry, user_handle_val, actor_handle_val = row[0], row[1], row[2] record = _audit_to_dict_with_handles(entry, user_handle_val, actor_handle_val) record["metadata_"] = json.dumps(record["metadata_"]) if record["metadata_"] is not None else "" writer.writerow(record) return StreamingResponse( iter([output.getvalue()]), media_type="text/csv", headers={"Content-Disposition": "attachment; filename=audit-export.csv"}, )