364447d0bc
- Create backend/api/audit.py: GET /api/admin/audit-log (paginated, filtered) and GET /api/admin/audit-log/export (streaming CSV) - Both endpoints protected by Depends(get_current_admin) — regular users → 403 - _audit_to_dict() whitelist explicitly excludes filename, extracted_text, password_hash, credentials_enc (T-04-06-02, D-15) - CSV export uses same helper as JSON viewer; Content-Disposition: attachment - Register audit_router in backend/main.py
163 lines
5.8 KiB
Python
163 lines
5.8 KiB
Python
"""
|
|
Admin audit log API endpoints for DocuVault.
|
|
|
|
All handlers require get_current_admin (ADMIN-06, SEC-07) — regular users
|
|
receive 403 Forbidden.
|
|
|
|
Implements:
|
|
GET /api/admin/audit-log — paginated, filtered audit log viewer
|
|
GET /api/admin/audit-log/export — CSV streaming export with same filters
|
|
|
|
Security invariants:
|
|
- Both endpoints use Depends(get_current_admin) — verified by grep
|
|
- _audit_to_dict() is a pure whitelist: no filename, extracted_text,
|
|
password_hash, or credentials_enc can appear in responses (ADMIN-06, D-15)
|
|
- CSV export uses the same _audit_to_dict() helper as the JSON viewer
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import csv
|
|
import io
|
|
import uuid
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, Depends, Query
|
|
from fastapi.responses import StreamingResponse
|
|
from sqlalchemy import func, select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from db.models import AuditLog, User
|
|
from deps.auth import get_current_admin
|
|
from deps.db import get_db
|
|
|
|
router = APIRouter(prefix="/api/admin", tags=["audit"])
|
|
|
|
|
|
# ── Safe response helper ──────────────────────────────────────────────────────
|
|
|
|
def _audit_to_dict(entry: AuditLog) -> dict:
|
|
"""Safe audit log serializer — never includes filename, extracted_text, or
|
|
document content (ADMIN-06, D-15).
|
|
|
|
Whitelist: id, event_type, user_id, actor_id, resource_id, ip_address,
|
|
metadata_, created_at. No other keys are possible.
|
|
"""
|
|
return {
|
|
"id": entry.id,
|
|
"event_type": entry.event_type,
|
|
"user_id": str(entry.user_id) if entry.user_id else None,
|
|
"actor_id": str(entry.actor_id) if entry.actor_id else None,
|
|
"resource_id": str(entry.resource_id) if entry.resource_id else None,
|
|
"ip_address": str(entry.ip_address) if entry.ip_address else None,
|
|
"metadata_": entry.metadata_,
|
|
"created_at": entry.created_at.isoformat(),
|
|
}
|
|
|
|
|
|
# ── Query builder helper ──────────────────────────────────────────────────────
|
|
|
|
def _build_filtered_query(
|
|
start: Optional[datetime],
|
|
end: Optional[datetime],
|
|
user_id: Optional[uuid.UUID],
|
|
event_type: Optional[str],
|
|
):
|
|
"""Return a SQLAlchemy Select for AuditLog with the given filters applied.
|
|
|
|
Shared by both the paginated viewer and the CSV export endpoints to ensure
|
|
consistent filter semantics.
|
|
"""
|
|
q = select(AuditLog).order_by(AuditLog.created_at.desc())
|
|
if start is not None:
|
|
q = q.where(AuditLog.created_at >= start)
|
|
if end is not None:
|
|
q = q.where(AuditLog.created_at <= end)
|
|
if user_id is not None:
|
|
q = q.where(AuditLog.user_id == user_id)
|
|
if event_type is not None:
|
|
q = q.where(AuditLog.event_type == event_type)
|
|
return q
|
|
|
|
|
|
# ── Endpoints ─────────────────────────────────────────────────────────────────
|
|
|
|
@router.get("/audit-log")
|
|
async def list_audit_log(
|
|
start: Optional[datetime] = Query(default=None),
|
|
end: Optional[datetime] = Query(default=None),
|
|
user_id: Optional[uuid.UUID] = Query(default=None),
|
|
event_type: Optional[str] = Query(default=None),
|
|
page: int = Query(default=1, ge=1),
|
|
per_page: int = Query(default=50, ge=1, le=500),
|
|
session: AsyncSession = Depends(get_db),
|
|
_admin: User = Depends(get_current_admin),
|
|
) -> dict:
|
|
"""Return paginated, filtered audit log entries (ADMIN-06).
|
|
|
|
Response: { items: [...], total: int, page: int, per_page: int }
|
|
Entries never contain filename, extracted_text, or document content (D-15).
|
|
"""
|
|
base_q = _build_filtered_query(start, end, user_id, event_type)
|
|
|
|
# Total count — same filters, no limit/offset
|
|
count_q = select(func.count()).select_from(base_q.subquery())
|
|
count_result = await session.execute(count_q)
|
|
total = count_result.scalar_one()
|
|
|
|
# Paginated rows
|
|
paginated_q = base_q.limit(per_page).offset((page - 1) * per_page)
|
|
result = await session.execute(paginated_q)
|
|
entries = result.scalars().all()
|
|
|
|
return {
|
|
"items": [_audit_to_dict(e) for e in entries],
|
|
"total": total,
|
|
"page": page,
|
|
"per_page": per_page,
|
|
}
|
|
|
|
|
|
@router.get("/audit-log/export")
|
|
async def export_audit_log(
|
|
start: Optional[datetime] = Query(default=None),
|
|
end: Optional[datetime] = Query(default=None),
|
|
user_id: Optional[uuid.UUID] = Query(default=None),
|
|
event_type: Optional[str] = Query(default=None),
|
|
format: str = Query(default="csv"), # noqa: A002
|
|
session: AsyncSession = Depends(get_db),
|
|
_admin: User = Depends(get_current_admin),
|
|
) -> StreamingResponse:
|
|
"""Stream a CSV export of filtered audit log entries (ADMIN-06).
|
|
|
|
Uses the same _audit_to_dict() whitelist as the JSON viewer — no filename,
|
|
extracted_text, or document content appears in the export (D-15, T-04-06-02).
|
|
|
|
Returns StreamingResponse with Content-Disposition: attachment; filename=audit-export.csv.
|
|
"""
|
|
q = _build_filtered_query(start, end, user_id, event_type)
|
|
result = await session.execute(q)
|
|
entries = result.scalars().all()
|
|
|
|
fields = [
|
|
"id",
|
|
"event_type",
|
|
"user_id",
|
|
"actor_id",
|
|
"resource_id",
|
|
"ip_address",
|
|
"metadata_",
|
|
"created_at",
|
|
]
|
|
output = io.StringIO()
|
|
writer = csv.DictWriter(output, fieldnames=fields)
|
|
writer.writeheader()
|
|
for entry in entries:
|
|
writer.writerow(_audit_to_dict(entry))
|
|
|
|
return StreamingResponse(
|
|
iter([output.getvalue()]),
|
|
media_type="text/csv",
|
|
headers={"Content-Disposition": "attachment; filename=audit-export.csv"},
|
|
)
|