384 lines
14 KiB
Python
384 lines
14 KiB
Python
"""
|
|
Admin audit log API endpoints for DocuVault.
|
|
|
|
All handlers require get_current_admin (ADMIN-06, SEC-07) — regular users
|
|
receive 403 Forbidden.
|
|
|
|
Implements:
|
|
GET /api/admin/audit-log — paginated, filtered audit log viewer
|
|
GET /api/admin/audit-log/export — CSV streaming export with same filters
|
|
GET /api/admin/audit-log/daily-exports — list available Celery daily export files
|
|
GET /api/admin/audit-log/daily-exports/{date} — stream a specific daily export CSV
|
|
|
|
Security invariants:
|
|
- All endpoints use Depends(get_current_admin) — verified by grep
|
|
- _audit_to_dict() is a pure whitelist: no filename, extracted_text,
|
|
password_hash, or credentials_enc can appear in responses (ADMIN-06, D-15)
|
|
- CSV export uses the same _audit_to_dict_with_handles() helper as the JSON viewer
|
|
- Date path parameter validated against YYYY-MM-DD regex before MinIO key
|
|
construction — prevents path traversal (T-06.2-04-01, Pitfall 6)
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import csv
|
|
import io
|
|
import json
|
|
import re
|
|
import uuid
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
from fastapi.responses import StreamingResponse
|
|
from sqlalchemy import func, select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.orm import aliased
|
|
|
|
from db.models import AuditLog, User
|
|
from deps.auth import get_current_admin
|
|
from deps.db import get_db
|
|
from storage import get_storage_backend
|
|
from storage.minio_backend import MinIOBackend
|
|
|
|
router = APIRouter(prefix="/api/admin", tags=["audit"])
|
|
|
|
|
|
# ── Safe response helpers ─────────────────────────────────────────────────────
|
|
|
|
def _audit_to_dict(entry: AuditLog) -> dict:
|
|
"""Safe audit log serializer — never includes filename, extracted_text, or
|
|
document content (ADMIN-06, D-15).
|
|
|
|
Whitelist: id, event_type, user_id, actor_id, resource_id, ip_address,
|
|
metadata_, created_at. No other keys are possible.
|
|
"""
|
|
return {
|
|
"id": entry.id,
|
|
"event_type": entry.event_type,
|
|
"user_id": str(entry.user_id) if entry.user_id else None,
|
|
"actor_id": str(entry.actor_id) if entry.actor_id else None,
|
|
"resource_id": str(entry.resource_id) if entry.resource_id else None,
|
|
"ip_address": str(entry.ip_address) if entry.ip_address else None,
|
|
"metadata_": entry.metadata_,
|
|
"created_at": entry.created_at.isoformat(),
|
|
}
|
|
|
|
|
|
def _audit_to_dict_with_handles(
|
|
entry: AuditLog,
|
|
user_handle: Optional[str],
|
|
actor_handle: Optional[str],
|
|
) -> dict:
|
|
"""Extended audit log serializer that includes user_handle and actor_handle.
|
|
|
|
Returns the same fields as _audit_to_dict() plus:
|
|
- user_handle: str | None (the handle of the user who owns the entry)
|
|
- actor_handle: str | None (the handle of the actor who performed the event)
|
|
|
|
Used by both the JSON viewer and CSV export endpoints (Pitfall 7 — both
|
|
endpoints must use the enriched function).
|
|
"""
|
|
return {
|
|
"id": entry.id,
|
|
"event_type": entry.event_type,
|
|
"user_id": str(entry.user_id) if entry.user_id else None,
|
|
"actor_id": str(entry.actor_id) if entry.actor_id else None,
|
|
"user_handle": user_handle or None,
|
|
"actor_handle": actor_handle or None,
|
|
"resource_id": str(entry.resource_id) if entry.resource_id else None,
|
|
"ip_address": str(entry.ip_address) if entry.ip_address else None,
|
|
"metadata_": entry.metadata_,
|
|
"created_at": entry.created_at.isoformat(),
|
|
}
|
|
|
|
|
|
# ── Query builder helpers ─────────────────────────────────────────────────────
|
|
|
|
def _build_filtered_query(
|
|
start: Optional[datetime],
|
|
end: Optional[datetime],
|
|
user_id: Optional[uuid.UUID],
|
|
event_type: Optional[str],
|
|
):
|
|
"""Return a SQLAlchemy Select for AuditLog with the given filters applied.
|
|
|
|
Shared by count queries in both the paginated viewer and the CSV export
|
|
endpoints to ensure consistent filter semantics.
|
|
|
|
NOTE: This function selects AuditLog only (no JOIN). It is used for COUNT
|
|
queries to avoid the subquery ambiguity that arises with multi-column JOINs
|
|
(Pitfall 4). Data queries use _build_filtered_query_with_handles() instead.
|
|
"""
|
|
q = select(AuditLog).order_by(AuditLog.created_at.desc())
|
|
if start is not None:
|
|
q = q.where(AuditLog.created_at >= start)
|
|
if end is not None:
|
|
q = q.where(AuditLog.created_at <= end)
|
|
if user_id is not None:
|
|
q = q.where(AuditLog.user_id == user_id)
|
|
if event_type is not None:
|
|
q = q.where(AuditLog.event_type.like(f"{event_type}%"))
|
|
return q
|
|
|
|
|
|
def _build_filtered_query_with_handles(
|
|
start: Optional[datetime],
|
|
end: Optional[datetime],
|
|
user_uuid: Optional[uuid.UUID],
|
|
event_type: Optional[str],
|
|
):
|
|
"""Return a multi-column Select that joins User twice for handle enrichment.
|
|
|
|
Yields (AuditLog, user_handle: str|None, actor_handle: str|None) tuples.
|
|
Uses SQLAlchemy aliased() to join User twice without collision:
|
|
- UserSubject: resolves user_id FK → handle
|
|
- UserActor: resolves actor_id FK → handle
|
|
|
|
outerjoin() ensures entries with NULL user_id or actor_id are still returned.
|
|
"""
|
|
UserSubject = aliased(User)
|
|
UserActor = aliased(User)
|
|
|
|
q = (
|
|
select(
|
|
AuditLog,
|
|
UserSubject.handle.label("user_handle"),
|
|
UserActor.handle.label("actor_handle"),
|
|
)
|
|
.outerjoin(UserSubject, UserSubject.id == AuditLog.user_id)
|
|
.outerjoin(UserActor, UserActor.id == AuditLog.actor_id)
|
|
.order_by(AuditLog.created_at.desc())
|
|
)
|
|
if start is not None:
|
|
q = q.where(AuditLog.created_at >= start)
|
|
if end is not None:
|
|
q = q.where(AuditLog.created_at <= end)
|
|
if user_uuid is not None:
|
|
q = q.where(AuditLog.user_id == user_uuid)
|
|
if event_type is not None:
|
|
q = q.where(AuditLog.event_type.like(f"{event_type}%"))
|
|
return q
|
|
|
|
|
|
# ── Endpoints ─────────────────────────────────────────────────────────────────
|
|
# IMPORTANT: daily-export routes are registered BEFORE /audit-log and
|
|
# /audit-log/export so FastAPI matches the more specific paths first.
|
|
|
|
|
|
@router.get("/audit-log/daily-exports")
|
|
async def list_daily_exports(
|
|
_admin: User = Depends(get_current_admin),
|
|
) -> dict:
|
|
"""List available Celery daily audit export files from MinIO (D-15).
|
|
|
|
Returns: { items: [{ date: "YYYY-MM-DD", key: "audit-logs/YYYY-MM-DD.csv" }] }
|
|
Items are sorted descending by date.
|
|
|
|
Security: requires get_current_admin — regular users receive 403 (T-06.2-04-02).
|
|
Event loop safety: list_objects() is synchronous; wrapped in asyncio.to_thread
|
|
to avoid blocking the event loop (T-06.2-04-05).
|
|
"""
|
|
backend = get_storage_backend()
|
|
if not isinstance(backend, MinIOBackend):
|
|
return {"items": []}
|
|
|
|
def _list() -> list:
|
|
objects = backend._client.list_objects(
|
|
"audit-logs", prefix="audit-logs/", recursive=False
|
|
)
|
|
items = []
|
|
for obj in objects:
|
|
name = obj.object_name or ""
|
|
if name.endswith(".csv"):
|
|
date_str = name.removeprefix("audit-logs/").removesuffix(".csv")
|
|
items.append({"date": date_str, "key": name})
|
|
items.sort(key=lambda x: x["date"], reverse=True)
|
|
return items
|
|
|
|
items = await asyncio.to_thread(_list)
|
|
return {"items": items}
|
|
|
|
|
|
@router.get("/audit-log/daily-exports/{date}")
|
|
async def download_daily_export(
|
|
date: str,
|
|
_admin: User = Depends(get_current_admin),
|
|
) -> StreamingResponse:
|
|
"""Stream a specific Celery daily audit export file from MinIO (D-16).
|
|
|
|
The date path parameter is validated against YYYY-MM-DD regex before
|
|
MinIO key construction to prevent path traversal (T-06.2-04-01, Pitfall 6).
|
|
|
|
Returns: StreamingResponse with Content-Type: text/csv.
|
|
|
|
Security: requires get_current_admin — regular users receive 403 (T-06.2-04-02).
|
|
"""
|
|
if not re.fullmatch(r"\d{4}-\d{2}-\d{2}", date):
|
|
raise HTTPException(status_code=404, detail="Invalid date format")
|
|
|
|
backend = get_storage_backend()
|
|
if not isinstance(backend, MinIOBackend):
|
|
raise HTTPException(status_code=404, detail="Export not found")
|
|
key = f"audit-logs/{date}.csv"
|
|
|
|
def _get() -> bytes:
|
|
response = backend._client.get_object("audit-logs", key)
|
|
try:
|
|
return response.read()
|
|
finally:
|
|
response.close()
|
|
response.release_conn()
|
|
|
|
try:
|
|
csv_bytes = await asyncio.to_thread(_get)
|
|
except Exception:
|
|
raise HTTPException(status_code=404, detail="Export not found")
|
|
|
|
return StreamingResponse(
|
|
iter([csv_bytes]),
|
|
media_type="text/csv",
|
|
headers={"Content-Disposition": f'attachment; filename="audit-{date}.csv"'},
|
|
)
|
|
|
|
|
|
@router.get("/audit-log")
|
|
async def list_audit_log(
|
|
start: Optional[datetime] = Query(default=None),
|
|
end: Optional[datetime] = Query(default=None),
|
|
user_handle: Optional[str] = Query(default=None),
|
|
event_type: Optional[str] = Query(default=None),
|
|
page: int = Query(default=1, ge=1),
|
|
per_page: int = Query(default=50, ge=1, le=500),
|
|
session: AsyncSession = Depends(get_db),
|
|
_admin: User = Depends(get_current_admin),
|
|
) -> dict:
|
|
"""Return paginated, filtered audit log entries (ADMIN-06).
|
|
|
|
Response: { items: [...], total: int, page: int, per_page: int }
|
|
Each item includes user_handle and actor_handle alongside UUID fields (D-11).
|
|
Entries never contain filename, extracted_text, or document content (D-15).
|
|
|
|
user_handle filter: accepts a plain string handle and resolves to UUID
|
|
internally. Returns empty results (not 422) for unknown handles (D-12).
|
|
"""
|
|
# Handle-to-UUID resolution (D-12, Pattern 4)
|
|
user_uuid: Optional[uuid.UUID] = None
|
|
if user_handle:
|
|
handle_result = await session.execute(
|
|
select(User.id).where(User.handle == user_handle)
|
|
)
|
|
uid = handle_result.scalar_one_or_none()
|
|
if uid is None:
|
|
# No user with that handle — return empty results (D-12)
|
|
return {"items": [], "total": 0, "page": page, "per_page": per_page}
|
|
user_uuid = uid
|
|
|
|
# Count query: use the plain _build_filtered_query (no JOIN) to avoid
|
|
# COUNT ambiguity on multi-column subqueries (Pitfall 4)
|
|
count_q = select(func.count(AuditLog.id)).where(True)
|
|
if start is not None:
|
|
count_q = count_q.where(AuditLog.created_at >= start)
|
|
if end is not None:
|
|
count_q = count_q.where(AuditLog.created_at <= end)
|
|
if user_uuid is not None:
|
|
count_q = count_q.where(AuditLog.user_id == user_uuid)
|
|
if event_type is not None:
|
|
count_q = count_q.where(AuditLog.event_type.like(f"{event_type}%"))
|
|
count_result = await session.execute(count_q)
|
|
total = count_result.scalar_one()
|
|
|
|
# Data query: use enriched JOIN for handle fields
|
|
data_q = _build_filtered_query_with_handles(start, end, user_uuid, event_type)
|
|
data_q = data_q.limit(per_page).offset((page - 1) * per_page)
|
|
result = await session.execute(data_q)
|
|
rows = result.all()
|
|
|
|
items = []
|
|
for row in rows:
|
|
entry, user_handle_val, actor_handle_val = row[0], row[1], row[2]
|
|
items.append(_audit_to_dict_with_handles(entry, user_handle_val, actor_handle_val))
|
|
|
|
return {
|
|
"items": items,
|
|
"total": total,
|
|
"page": page,
|
|
"per_page": per_page,
|
|
}
|
|
|
|
|
|
@router.get("/audit-log/export")
|
|
async def export_audit_log(
|
|
start: Optional[datetime] = Query(default=None),
|
|
end: Optional[datetime] = Query(default=None),
|
|
user_handle: Optional[str] = Query(default=None),
|
|
event_type: Optional[str] = Query(default=None),
|
|
format: str = Query(default="csv"), # noqa: A002
|
|
session: AsyncSession = Depends(get_db),
|
|
_admin: User = Depends(get_current_admin),
|
|
) -> StreamingResponse:
|
|
"""Stream a CSV export of filtered audit log entries (ADMIN-06).
|
|
|
|
Uses the same _audit_to_dict_with_handles() whitelist as the JSON viewer —
|
|
includes user_handle and actor_handle; no filename, extracted_text, or
|
|
document content appears in the export (D-15, T-04-06-02, Pitfall 7).
|
|
|
|
Returns StreamingResponse with Content-Disposition: attachment; filename=audit-export.csv.
|
|
|
|
user_handle filter: same handle-to-UUID resolution as the viewer (D-12).
|
|
"""
|
|
# Handle-to-UUID resolution (D-12) — same logic as list_audit_log
|
|
user_uuid: Optional[uuid.UUID] = None
|
|
if user_handle:
|
|
handle_result = await session.execute(
|
|
select(User.id).where(User.handle == user_handle)
|
|
)
|
|
uid = handle_result.scalar_one_or_none()
|
|
if uid is None:
|
|
# Unknown handle — return empty CSV
|
|
empty_output = io.StringIO()
|
|
fields = [
|
|
"id", "event_type", "user_id", "actor_id", "user_handle", "actor_handle",
|
|
"resource_id", "ip_address", "metadata_", "created_at",
|
|
]
|
|
writer = csv.DictWriter(empty_output, fieldnames=fields)
|
|
writer.writeheader()
|
|
return StreamingResponse(
|
|
iter([empty_output.getvalue()]),
|
|
media_type="text/csv",
|
|
headers={"Content-Disposition": "attachment; filename=audit-export.csv"},
|
|
)
|
|
user_uuid = uid
|
|
|
|
# Data query with handle enrichment (Pitfall 7 — export must use enriched function)
|
|
q = _build_filtered_query_with_handles(start, end, user_uuid, event_type)
|
|
result = await session.execute(q)
|
|
rows = result.all()
|
|
|
|
fields = [
|
|
"id",
|
|
"event_type",
|
|
"user_id",
|
|
"actor_id",
|
|
"user_handle",
|
|
"actor_handle",
|
|
"resource_id",
|
|
"ip_address",
|
|
"metadata_",
|
|
"created_at",
|
|
]
|
|
output = io.StringIO()
|
|
writer = csv.DictWriter(output, fieldnames=fields)
|
|
writer.writeheader()
|
|
for row in rows:
|
|
entry, user_handle_val, actor_handle_val = row[0], row[1], row[2]
|
|
record = _audit_to_dict_with_handles(entry, user_handle_val, actor_handle_val)
|
|
record["metadata_"] = json.dumps(record["metadata_"]) if record["metadata_"] is not None else ""
|
|
writer.writerow(record)
|
|
|
|
return StreamingResponse(
|
|
iter([output.getvalue()]),
|
|
media_type="text/csv",
|
|
headers={"Content-Disposition": "attachment; filename=audit-export.csv"},
|
|
)
|