Files
kite/backend/api/audit.py
T

384 lines
14 KiB
Python

"""
Admin audit log API endpoints for DocuVault.
All handlers require get_current_admin (ADMIN-06, SEC-07) — regular users
receive 403 Forbidden.
Implements:
GET /api/admin/audit-log — paginated, filtered audit log viewer
GET /api/admin/audit-log/export — CSV streaming export with same filters
GET /api/admin/audit-log/daily-exports — list available Celery daily export files
GET /api/admin/audit-log/daily-exports/{date} — stream a specific daily export CSV
Security invariants:
- All endpoints use Depends(get_current_admin) — verified by grep
- _audit_to_dict() is a pure whitelist: no filename, extracted_text,
password_hash, or credentials_enc can appear in responses (ADMIN-06, D-15)
- CSV export uses the same _audit_to_dict_with_handles() helper as the JSON viewer
- Date path parameter validated against YYYY-MM-DD regex before MinIO key
construction — prevents path traversal (T-06.2-04-01, Pitfall 6)
"""
from __future__ import annotations
import asyncio
import csv
import io
import json
import re
import uuid
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi.responses import StreamingResponse
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import aliased
from db.models import AuditLog, User
from deps.auth import get_current_admin
from deps.db import get_db
from storage import get_storage_backend
from storage.minio_backend import MinIOBackend
router = APIRouter(prefix="/api/admin", tags=["audit"])
# ── Safe response helpers ─────────────────────────────────────────────────────
def _audit_to_dict(entry: AuditLog) -> dict:
"""Safe audit log serializer — never includes filename, extracted_text, or
document content (ADMIN-06, D-15).
Whitelist: id, event_type, user_id, actor_id, resource_id, ip_address,
metadata_, created_at. No other keys are possible.
"""
return {
"id": entry.id,
"event_type": entry.event_type,
"user_id": str(entry.user_id) if entry.user_id else None,
"actor_id": str(entry.actor_id) if entry.actor_id else None,
"resource_id": str(entry.resource_id) if entry.resource_id else None,
"ip_address": str(entry.ip_address) if entry.ip_address else None,
"metadata_": entry.metadata_,
"created_at": entry.created_at.isoformat(),
}
def _audit_to_dict_with_handles(
entry: AuditLog,
user_handle: Optional[str],
actor_handle: Optional[str],
) -> dict:
"""Extended audit log serializer that includes user_handle and actor_handle.
Returns the same fields as _audit_to_dict() plus:
- user_handle: str | None (the handle of the user who owns the entry)
- actor_handle: str | None (the handle of the actor who performed the event)
Used by both the JSON viewer and CSV export endpoints (Pitfall 7 — both
endpoints must use the enriched function).
"""
return {
"id": entry.id,
"event_type": entry.event_type,
"user_id": str(entry.user_id) if entry.user_id else None,
"actor_id": str(entry.actor_id) if entry.actor_id else None,
"user_handle": user_handle or None,
"actor_handle": actor_handle or None,
"resource_id": str(entry.resource_id) if entry.resource_id else None,
"ip_address": str(entry.ip_address) if entry.ip_address else None,
"metadata_": entry.metadata_,
"created_at": entry.created_at.isoformat(),
}
# ── Query builder helpers ─────────────────────────────────────────────────────
def _build_filtered_query(
start: Optional[datetime],
end: Optional[datetime],
user_id: Optional[uuid.UUID],
event_type: Optional[str],
):
"""Return a SQLAlchemy Select for AuditLog with the given filters applied.
Shared by count queries in both the paginated viewer and the CSV export
endpoints to ensure consistent filter semantics.
NOTE: This function selects AuditLog only (no JOIN). It is used for COUNT
queries to avoid the subquery ambiguity that arises with multi-column JOINs
(Pitfall 4). Data queries use _build_filtered_query_with_handles() instead.
"""
q = select(AuditLog).order_by(AuditLog.created_at.desc())
if start is not None:
q = q.where(AuditLog.created_at >= start)
if end is not None:
q = q.where(AuditLog.created_at <= end)
if user_id is not None:
q = q.where(AuditLog.user_id == user_id)
if event_type is not None:
q = q.where(AuditLog.event_type.like(f"{event_type}%"))
return q
def _build_filtered_query_with_handles(
start: Optional[datetime],
end: Optional[datetime],
user_uuid: Optional[uuid.UUID],
event_type: Optional[str],
):
"""Return a multi-column Select that joins User twice for handle enrichment.
Yields (AuditLog, user_handle: str|None, actor_handle: str|None) tuples.
Uses SQLAlchemy aliased() to join User twice without collision:
- UserSubject: resolves user_id FK → handle
- UserActor: resolves actor_id FK → handle
outerjoin() ensures entries with NULL user_id or actor_id are still returned.
"""
UserSubject = aliased(User)
UserActor = aliased(User)
q = (
select(
AuditLog,
UserSubject.handle.label("user_handle"),
UserActor.handle.label("actor_handle"),
)
.outerjoin(UserSubject, UserSubject.id == AuditLog.user_id)
.outerjoin(UserActor, UserActor.id == AuditLog.actor_id)
.order_by(AuditLog.created_at.desc())
)
if start is not None:
q = q.where(AuditLog.created_at >= start)
if end is not None:
q = q.where(AuditLog.created_at <= end)
if user_uuid is not None:
q = q.where(AuditLog.user_id == user_uuid)
if event_type is not None:
q = q.where(AuditLog.event_type.like(f"{event_type}%"))
return q
# ── Endpoints ─────────────────────────────────────────────────────────────────
# IMPORTANT: daily-export routes are registered BEFORE /audit-log and
# /audit-log/export so FastAPI matches the more specific paths first.
@router.get("/audit-log/daily-exports")
async def list_daily_exports(
_admin: User = Depends(get_current_admin),
) -> dict:
"""List available Celery daily audit export files from MinIO (D-15).
Returns: { items: [{ date: "YYYY-MM-DD", key: "audit-logs/YYYY-MM-DD.csv" }] }
Items are sorted descending by date.
Security: requires get_current_admin — regular users receive 403 (T-06.2-04-02).
Event loop safety: list_objects() is synchronous; wrapped in asyncio.to_thread
to avoid blocking the event loop (T-06.2-04-05).
"""
backend = get_storage_backend()
if not isinstance(backend, MinIOBackend):
return {"items": []}
def _list() -> list:
objects = backend._client.list_objects(
"audit-logs", prefix="audit-logs/", recursive=False
)
items = []
for obj in objects:
name = obj.object_name or ""
if name.endswith(".csv"):
date_str = name.removeprefix("audit-logs/").removesuffix(".csv")
items.append({"date": date_str, "key": name})
items.sort(key=lambda x: x["date"], reverse=True)
return items
items = await asyncio.to_thread(_list)
return {"items": items}
@router.get("/audit-log/daily-exports/{date}")
async def download_daily_export(
date: str,
_admin: User = Depends(get_current_admin),
) -> StreamingResponse:
"""Stream a specific Celery daily audit export file from MinIO (D-16).
The date path parameter is validated against YYYY-MM-DD regex before
MinIO key construction to prevent path traversal (T-06.2-04-01, Pitfall 6).
Returns: StreamingResponse with Content-Type: text/csv.
Security: requires get_current_admin — regular users receive 403 (T-06.2-04-02).
"""
if not re.fullmatch(r"\d{4}-\d{2}-\d{2}", date):
raise HTTPException(status_code=404, detail="Invalid date format")
backend = get_storage_backend()
if not isinstance(backend, MinIOBackend):
raise HTTPException(status_code=404, detail="Export not found")
key = f"audit-logs/{date}.csv"
def _get() -> bytes:
response = backend._client.get_object("audit-logs", key)
try:
return response.read()
finally:
response.close()
response.release_conn()
try:
csv_bytes = await asyncio.to_thread(_get)
except Exception:
raise HTTPException(status_code=404, detail="Export not found")
return StreamingResponse(
iter([csv_bytes]),
media_type="text/csv",
headers={"Content-Disposition": f'attachment; filename="audit-{date}.csv"'},
)
@router.get("/audit-log")
async def list_audit_log(
start: Optional[datetime] = Query(default=None),
end: Optional[datetime] = Query(default=None),
user_handle: Optional[str] = Query(default=None),
event_type: Optional[str] = Query(default=None),
page: int = Query(default=1, ge=1),
per_page: int = Query(default=50, ge=1, le=500),
session: AsyncSession = Depends(get_db),
_admin: User = Depends(get_current_admin),
) -> dict:
"""Return paginated, filtered audit log entries (ADMIN-06).
Response: { items: [...], total: int, page: int, per_page: int }
Each item includes user_handle and actor_handle alongside UUID fields (D-11).
Entries never contain filename, extracted_text, or document content (D-15).
user_handle filter: accepts a plain string handle and resolves to UUID
internally. Returns empty results (not 422) for unknown handles (D-12).
"""
# Handle-to-UUID resolution (D-12, Pattern 4)
user_uuid: Optional[uuid.UUID] = None
if user_handle:
handle_result = await session.execute(
select(User.id).where(User.handle == user_handle)
)
uid = handle_result.scalar_one_or_none()
if uid is None:
# No user with that handle — return empty results (D-12)
return {"items": [], "total": 0, "page": page, "per_page": per_page}
user_uuid = uid
# Count query: use the plain _build_filtered_query (no JOIN) to avoid
# COUNT ambiguity on multi-column subqueries (Pitfall 4)
count_q = select(func.count(AuditLog.id)).where(True)
if start is not None:
count_q = count_q.where(AuditLog.created_at >= start)
if end is not None:
count_q = count_q.where(AuditLog.created_at <= end)
if user_uuid is not None:
count_q = count_q.where(AuditLog.user_id == user_uuid)
if event_type is not None:
count_q = count_q.where(AuditLog.event_type.like(f"{event_type}%"))
count_result = await session.execute(count_q)
total = count_result.scalar_one()
# Data query: use enriched JOIN for handle fields
data_q = _build_filtered_query_with_handles(start, end, user_uuid, event_type)
data_q = data_q.limit(per_page).offset((page - 1) * per_page)
result = await session.execute(data_q)
rows = result.all()
items = []
for row in rows:
entry, user_handle_val, actor_handle_val = row[0], row[1], row[2]
items.append(_audit_to_dict_with_handles(entry, user_handle_val, actor_handle_val))
return {
"items": items,
"total": total,
"page": page,
"per_page": per_page,
}
@router.get("/audit-log/export")
async def export_audit_log(
start: Optional[datetime] = Query(default=None),
end: Optional[datetime] = Query(default=None),
user_handle: Optional[str] = Query(default=None),
event_type: Optional[str] = Query(default=None),
format: str = Query(default="csv"), # noqa: A002
session: AsyncSession = Depends(get_db),
_admin: User = Depends(get_current_admin),
) -> StreamingResponse:
"""Stream a CSV export of filtered audit log entries (ADMIN-06).
Uses the same _audit_to_dict_with_handles() whitelist as the JSON viewer —
includes user_handle and actor_handle; no filename, extracted_text, or
document content appears in the export (D-15, T-04-06-02, Pitfall 7).
Returns StreamingResponse with Content-Disposition: attachment; filename=audit-export.csv.
user_handle filter: same handle-to-UUID resolution as the viewer (D-12).
"""
# Handle-to-UUID resolution (D-12) — same logic as list_audit_log
user_uuid: Optional[uuid.UUID] = None
if user_handle:
handle_result = await session.execute(
select(User.id).where(User.handle == user_handle)
)
uid = handle_result.scalar_one_or_none()
if uid is None:
# Unknown handle — return empty CSV
empty_output = io.StringIO()
fields = [
"id", "event_type", "user_id", "actor_id", "user_handle", "actor_handle",
"resource_id", "ip_address", "metadata_", "created_at",
]
writer = csv.DictWriter(empty_output, fieldnames=fields)
writer.writeheader()
return StreamingResponse(
iter([empty_output.getvalue()]),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=audit-export.csv"},
)
user_uuid = uid
# Data query with handle enrichment (Pitfall 7 — export must use enriched function)
q = _build_filtered_query_with_handles(start, end, user_uuid, event_type)
result = await session.execute(q)
rows = result.all()
fields = [
"id",
"event_type",
"user_id",
"actor_id",
"user_handle",
"actor_handle",
"resource_id",
"ip_address",
"metadata_",
"created_at",
]
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=fields)
writer.writeheader()
for row in rows:
entry, user_handle_val, actor_handle_val = row[0], row[1], row[2]
record = _audit_to_dict_with_handles(entry, user_handle_val, actor_handle_val)
record["metadata_"] = json.dumps(record["metadata_"]) if record["metadata_"] is not None else ""
writer.writerow(record)
return StreamingResponse(
iter([output.getvalue()]),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=audit-export.csv"},
)