feat(phase-4-06): admin audit log viewer + CSV export (ADMIN-06)

- Create backend/api/audit.py: GET /api/admin/audit-log (paginated, filtered)
  and GET /api/admin/audit-log/export (streaming CSV)
- Both endpoints protected by Depends(get_current_admin) — regular users → 403
- _audit_to_dict() whitelist explicitly excludes filename, extracted_text,
  password_hash, credentials_enc (T-04-06-02, D-15)
- CSV export uses same helper as JSON viewer; Content-Disposition: attachment
- Register audit_router in backend/main.py
This commit is contained in:
curo1305
2026-05-25 18:48:02 +02:00
parent 8e6cb6e7d0
commit 364447d0bc
2 changed files with 166 additions and 0 deletions
+162
View File
@@ -0,0 +1,162 @@
"""
Admin audit log API endpoints for DocuVault.
All handlers require get_current_admin (ADMIN-06, SEC-07) — regular users
receive 403 Forbidden.
Implements:
GET /api/admin/audit-log — paginated, filtered audit log viewer
GET /api/admin/audit-log/export — CSV streaming export with same filters
Security invariants:
- Both endpoints use Depends(get_current_admin) — verified by grep
- _audit_to_dict() is a pure whitelist: no filename, extracted_text,
password_hash, or credentials_enc can appear in responses (ADMIN-06, D-15)
- CSV export uses the same _audit_to_dict() helper as the JSON viewer
"""
from __future__ import annotations
import csv
import io
import uuid
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Depends, Query
from fastapi.responses import StreamingResponse
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from db.models import AuditLog, User
from deps.auth import get_current_admin
from deps.db import get_db
router = APIRouter(prefix="/api/admin", tags=["audit"])
# ── Safe response helper ──────────────────────────────────────────────────────
def _audit_to_dict(entry: AuditLog) -> dict:
"""Safe audit log serializer — never includes filename, extracted_text, or
document content (ADMIN-06, D-15).
Whitelist: id, event_type, user_id, actor_id, resource_id, ip_address,
metadata_, created_at. No other keys are possible.
"""
return {
"id": entry.id,
"event_type": entry.event_type,
"user_id": str(entry.user_id) if entry.user_id else None,
"actor_id": str(entry.actor_id) if entry.actor_id else None,
"resource_id": str(entry.resource_id) if entry.resource_id else None,
"ip_address": str(entry.ip_address) if entry.ip_address else None,
"metadata_": entry.metadata_,
"created_at": entry.created_at.isoformat(),
}
# ── Query builder helper ──────────────────────────────────────────────────────
def _build_filtered_query(
start: Optional[datetime],
end: Optional[datetime],
user_id: Optional[uuid.UUID],
event_type: Optional[str],
):
"""Return a SQLAlchemy Select for AuditLog with the given filters applied.
Shared by both the paginated viewer and the CSV export endpoints to ensure
consistent filter semantics.
"""
q = select(AuditLog).order_by(AuditLog.created_at.desc())
if start is not None:
q = q.where(AuditLog.created_at >= start)
if end is not None:
q = q.where(AuditLog.created_at <= end)
if user_id is not None:
q = q.where(AuditLog.user_id == user_id)
if event_type is not None:
q = q.where(AuditLog.event_type == event_type)
return q
# ── Endpoints ─────────────────────────────────────────────────────────────────
@router.get("/audit-log")
async def list_audit_log(
start: Optional[datetime] = Query(default=None),
end: Optional[datetime] = Query(default=None),
user_id: Optional[uuid.UUID] = Query(default=None),
event_type: Optional[str] = Query(default=None),
page: int = Query(default=1, ge=1),
per_page: int = Query(default=50, ge=1, le=500),
session: AsyncSession = Depends(get_db),
_admin: User = Depends(get_current_admin),
) -> dict:
"""Return paginated, filtered audit log entries (ADMIN-06).
Response: { items: [...], total: int, page: int, per_page: int }
Entries never contain filename, extracted_text, or document content (D-15).
"""
base_q = _build_filtered_query(start, end, user_id, event_type)
# Total count — same filters, no limit/offset
count_q = select(func.count()).select_from(base_q.subquery())
count_result = await session.execute(count_q)
total = count_result.scalar_one()
# Paginated rows
paginated_q = base_q.limit(per_page).offset((page - 1) * per_page)
result = await session.execute(paginated_q)
entries = result.scalars().all()
return {
"items": [_audit_to_dict(e) for e in entries],
"total": total,
"page": page,
"per_page": per_page,
}
@router.get("/audit-log/export")
async def export_audit_log(
start: Optional[datetime] = Query(default=None),
end: Optional[datetime] = Query(default=None),
user_id: Optional[uuid.UUID] = Query(default=None),
event_type: Optional[str] = Query(default=None),
format: str = Query(default="csv"), # noqa: A002
session: AsyncSession = Depends(get_db),
_admin: User = Depends(get_current_admin),
) -> StreamingResponse:
"""Stream a CSV export of filtered audit log entries (ADMIN-06).
Uses the same _audit_to_dict() whitelist as the JSON viewer — no filename,
extracted_text, or document content appears in the export (D-15, T-04-06-02).
Returns StreamingResponse with Content-Disposition: attachment; filename=audit-export.csv.
"""
q = _build_filtered_query(start, end, user_id, event_type)
result = await session.execute(q)
entries = result.scalars().all()
fields = [
"id",
"event_type",
"user_id",
"actor_id",
"resource_id",
"ip_address",
"metadata_",
"created_at",
]
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=fields)
writer.writeheader()
for entry in entries:
writer.writerow(_audit_to_dict(entry))
return StreamingResponse(
iter([output.getvalue()]),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=audit-export.csv"},
)
+4
View File
@@ -187,3 +187,7 @@ app.include_router(document_move_router)
# Phase 4: shares router (SHARE-01..05) # Phase 4: shares router (SHARE-01..05)
from api.shares import router as shares_router # noqa: E402 from api.shares import router as shares_router # noqa: E402
app.include_router(shares_router) app.include_router(shares_router)
# Phase 4: audit log viewer + CSV export (ADMIN-06)
from api.audit import router as audit_router # noqa: E402
app.include_router(audit_router)