feat(phase-4-06): admin audit log viewer + CSV export (ADMIN-06)
- Create backend/api/audit.py: GET /api/admin/audit-log (paginated, filtered) and GET /api/admin/audit-log/export (streaming CSV) - Both endpoints protected by Depends(get_current_admin) — regular users → 403 - _audit_to_dict() whitelist explicitly excludes filename, extracted_text, password_hash, credentials_enc (T-04-06-02, D-15) - CSV export uses same helper as JSON viewer; Content-Disposition: attachment - Register audit_router in backend/main.py
This commit is contained in:
@@ -0,0 +1,162 @@
|
||||
"""
|
||||
Admin audit log API endpoints for DocuVault.
|
||||
|
||||
All handlers require get_current_admin (ADMIN-06, SEC-07) — regular users
|
||||
receive 403 Forbidden.
|
||||
|
||||
Implements:
|
||||
GET /api/admin/audit-log — paginated, filtered audit log viewer
|
||||
GET /api/admin/audit-log/export — CSV streaming export with same filters
|
||||
|
||||
Security invariants:
|
||||
- Both endpoints use Depends(get_current_admin) — verified by grep
|
||||
- _audit_to_dict() is a pure whitelist: no filename, extracted_text,
|
||||
password_hash, or credentials_enc can appear in responses (ADMIN-06, D-15)
|
||||
- CSV export uses the same _audit_to_dict() helper as the JSON viewer
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import csv
|
||||
import io
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
from fastapi import APIRouter, Depends, Query
|
||||
from fastapi.responses import StreamingResponse
|
||||
from sqlalchemy import func, select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from db.models import AuditLog, User
|
||||
from deps.auth import get_current_admin
|
||||
from deps.db import get_db
|
||||
|
||||
router = APIRouter(prefix="/api/admin", tags=["audit"])
|
||||
|
||||
|
||||
# ── Safe response helper ──────────────────────────────────────────────────────
|
||||
|
||||
def _audit_to_dict(entry: AuditLog) -> dict:
|
||||
"""Safe audit log serializer — never includes filename, extracted_text, or
|
||||
document content (ADMIN-06, D-15).
|
||||
|
||||
Whitelist: id, event_type, user_id, actor_id, resource_id, ip_address,
|
||||
metadata_, created_at. No other keys are possible.
|
||||
"""
|
||||
return {
|
||||
"id": entry.id,
|
||||
"event_type": entry.event_type,
|
||||
"user_id": str(entry.user_id) if entry.user_id else None,
|
||||
"actor_id": str(entry.actor_id) if entry.actor_id else None,
|
||||
"resource_id": str(entry.resource_id) if entry.resource_id else None,
|
||||
"ip_address": str(entry.ip_address) if entry.ip_address else None,
|
||||
"metadata_": entry.metadata_,
|
||||
"created_at": entry.created_at.isoformat(),
|
||||
}
|
||||
|
||||
|
||||
# ── Query builder helper ──────────────────────────────────────────────────────
|
||||
|
||||
def _build_filtered_query(
|
||||
start: Optional[datetime],
|
||||
end: Optional[datetime],
|
||||
user_id: Optional[uuid.UUID],
|
||||
event_type: Optional[str],
|
||||
):
|
||||
"""Return a SQLAlchemy Select for AuditLog with the given filters applied.
|
||||
|
||||
Shared by both the paginated viewer and the CSV export endpoints to ensure
|
||||
consistent filter semantics.
|
||||
"""
|
||||
q = select(AuditLog).order_by(AuditLog.created_at.desc())
|
||||
if start is not None:
|
||||
q = q.where(AuditLog.created_at >= start)
|
||||
if end is not None:
|
||||
q = q.where(AuditLog.created_at <= end)
|
||||
if user_id is not None:
|
||||
q = q.where(AuditLog.user_id == user_id)
|
||||
if event_type is not None:
|
||||
q = q.where(AuditLog.event_type == event_type)
|
||||
return q
|
||||
|
||||
|
||||
# ── Endpoints ─────────────────────────────────────────────────────────────────
|
||||
|
||||
@router.get("/audit-log")
|
||||
async def list_audit_log(
|
||||
start: Optional[datetime] = Query(default=None),
|
||||
end: Optional[datetime] = Query(default=None),
|
||||
user_id: Optional[uuid.UUID] = Query(default=None),
|
||||
event_type: Optional[str] = Query(default=None),
|
||||
page: int = Query(default=1, ge=1),
|
||||
per_page: int = Query(default=50, ge=1, le=500),
|
||||
session: AsyncSession = Depends(get_db),
|
||||
_admin: User = Depends(get_current_admin),
|
||||
) -> dict:
|
||||
"""Return paginated, filtered audit log entries (ADMIN-06).
|
||||
|
||||
Response: { items: [...], total: int, page: int, per_page: int }
|
||||
Entries never contain filename, extracted_text, or document content (D-15).
|
||||
"""
|
||||
base_q = _build_filtered_query(start, end, user_id, event_type)
|
||||
|
||||
# Total count — same filters, no limit/offset
|
||||
count_q = select(func.count()).select_from(base_q.subquery())
|
||||
count_result = await session.execute(count_q)
|
||||
total = count_result.scalar_one()
|
||||
|
||||
# Paginated rows
|
||||
paginated_q = base_q.limit(per_page).offset((page - 1) * per_page)
|
||||
result = await session.execute(paginated_q)
|
||||
entries = result.scalars().all()
|
||||
|
||||
return {
|
||||
"items": [_audit_to_dict(e) for e in entries],
|
||||
"total": total,
|
||||
"page": page,
|
||||
"per_page": per_page,
|
||||
}
|
||||
|
||||
|
||||
@router.get("/audit-log/export")
|
||||
async def export_audit_log(
|
||||
start: Optional[datetime] = Query(default=None),
|
||||
end: Optional[datetime] = Query(default=None),
|
||||
user_id: Optional[uuid.UUID] = Query(default=None),
|
||||
event_type: Optional[str] = Query(default=None),
|
||||
format: str = Query(default="csv"), # noqa: A002
|
||||
session: AsyncSession = Depends(get_db),
|
||||
_admin: User = Depends(get_current_admin),
|
||||
) -> StreamingResponse:
|
||||
"""Stream a CSV export of filtered audit log entries (ADMIN-06).
|
||||
|
||||
Uses the same _audit_to_dict() whitelist as the JSON viewer — no filename,
|
||||
extracted_text, or document content appears in the export (D-15, T-04-06-02).
|
||||
|
||||
Returns StreamingResponse with Content-Disposition: attachment; filename=audit-export.csv.
|
||||
"""
|
||||
q = _build_filtered_query(start, end, user_id, event_type)
|
||||
result = await session.execute(q)
|
||||
entries = result.scalars().all()
|
||||
|
||||
fields = [
|
||||
"id",
|
||||
"event_type",
|
||||
"user_id",
|
||||
"actor_id",
|
||||
"resource_id",
|
||||
"ip_address",
|
||||
"metadata_",
|
||||
"created_at",
|
||||
]
|
||||
output = io.StringIO()
|
||||
writer = csv.DictWriter(output, fieldnames=fields)
|
||||
writer.writeheader()
|
||||
for entry in entries:
|
||||
writer.writerow(_audit_to_dict(entry))
|
||||
|
||||
return StreamingResponse(
|
||||
iter([output.getvalue()]),
|
||||
media_type="text/csv",
|
||||
headers={"Content-Disposition": "attachment; filename=audit-export.csv"},
|
||||
)
|
||||
Reference in New Issue
Block a user