feat(06.2-04): backend — handle enrichment, user_handle filter, two daily-export endpoints
- Add _audit_to_dict_with_handles() with user_handle + actor_handle fields
- Add _build_filtered_query_with_handles() with aliased User double-JOIN
- Change list_audit_log user_id param to user_handle string with handle→UUID resolution
- Change export_audit_log user_id param to user_handle (Pitfall 7 — both endpoints enriched)
- Add GET /audit-log/daily-exports — lists MinIO audit-logs bucket, asyncio.to_thread
- Add GET /audit-log/daily-exports/{date} — streams CSV, date regex validation (T-06.2-04-01)
- Move daily-export endpoints before viewer to ensure specific path registration order
- Update test_audit_log_export_csv to match enriched CSV header (user_handle, actor_handle)
- All 10 test_audit.py tests pass
This commit is contained in:
+240
-24
@@ -7,34 +7,43 @@ receive 403 Forbidden.
|
||||
Implements:
|
||||
GET /api/admin/audit-log — paginated, filtered audit log viewer
|
||||
GET /api/admin/audit-log/export — CSV streaming export with same filters
|
||||
GET /api/admin/audit-log/daily-exports — list available Celery daily export files
|
||||
GET /api/admin/audit-log/daily-exports/{date} — stream a specific daily export CSV
|
||||
|
||||
Security invariants:
|
||||
- Both endpoints use Depends(get_current_admin) — verified by grep
|
||||
- All endpoints use Depends(get_current_admin) — verified by grep
|
||||
- _audit_to_dict() is a pure whitelist: no filename, extracted_text,
|
||||
password_hash, or credentials_enc can appear in responses (ADMIN-06, D-15)
|
||||
- CSV export uses the same _audit_to_dict() helper as the JSON viewer
|
||||
- CSV export uses the same _audit_to_dict_with_handles() helper as the JSON viewer
|
||||
- Date path parameter validated against YYYY-MM-DD regex before MinIO key
|
||||
construction — prevents path traversal (T-06.2-04-01, Pitfall 6)
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import csv
|
||||
import io
|
||||
import re
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
from fastapi import APIRouter, Depends, Query
|
||||
from fastapi import APIRouter, Depends, HTTPException, Query
|
||||
from fastapi.responses import StreamingResponse
|
||||
from sqlalchemy import func, select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.orm import aliased
|
||||
|
||||
from db.models import AuditLog, User
|
||||
from deps.auth import get_current_admin
|
||||
from deps.db import get_db
|
||||
from storage import get_storage_backend
|
||||
from storage.minio_backend import MinIOBackend
|
||||
|
||||
router = APIRouter(prefix="/api/admin", tags=["audit"])
|
||||
|
||||
|
||||
# ── Safe response helper ──────────────────────────────────────────────────────
|
||||
# ── Safe response helpers ─────────────────────────────────────────────────────
|
||||
|
||||
def _audit_to_dict(entry: AuditLog) -> dict:
|
||||
"""Safe audit log serializer — never includes filename, extracted_text, or
|
||||
@@ -55,7 +64,35 @@ def _audit_to_dict(entry: AuditLog) -> dict:
|
||||
}
|
||||
|
||||
|
||||
# ── Query builder helper ──────────────────────────────────────────────────────
|
||||
def _audit_to_dict_with_handles(
|
||||
entry: AuditLog,
|
||||
user_handle: Optional[str],
|
||||
actor_handle: Optional[str],
|
||||
) -> dict:
|
||||
"""Extended audit log serializer that includes user_handle and actor_handle.
|
||||
|
||||
Returns the same fields as _audit_to_dict() plus:
|
||||
- user_handle: str | None (the handle of the user who owns the entry)
|
||||
- actor_handle: str | None (the handle of the actor who performed the event)
|
||||
|
||||
Used by both the JSON viewer and CSV export endpoints (Pitfall 7 — both
|
||||
endpoints must use the enriched function).
|
||||
"""
|
||||
return {
|
||||
"id": entry.id,
|
||||
"event_type": entry.event_type,
|
||||
"user_id": str(entry.user_id) if entry.user_id else None,
|
||||
"actor_id": str(entry.actor_id) if entry.actor_id else None,
|
||||
"user_handle": user_handle or None,
|
||||
"actor_handle": actor_handle or None,
|
||||
"resource_id": str(entry.resource_id) if entry.resource_id else None,
|
||||
"ip_address": str(entry.ip_address) if entry.ip_address else None,
|
||||
"metadata_": entry.metadata_,
|
||||
"created_at": entry.created_at.isoformat(),
|
||||
}
|
||||
|
||||
|
||||
# ── Query builder helpers ─────────────────────────────────────────────────────
|
||||
|
||||
def _build_filtered_query(
|
||||
start: Optional[datetime],
|
||||
@@ -65,8 +102,12 @@ def _build_filtered_query(
|
||||
):
|
||||
"""Return a SQLAlchemy Select for AuditLog with the given filters applied.
|
||||
|
||||
Shared by both the paginated viewer and the CSV export endpoints to ensure
|
||||
consistent filter semantics.
|
||||
Shared by count queries in both the paginated viewer and the CSV export
|
||||
endpoints to ensure consistent filter semantics.
|
||||
|
||||
NOTE: This function selects AuditLog only (no JOIN). It is used for COUNT
|
||||
queries to avoid the subquery ambiguity that arises with multi-column JOINs
|
||||
(Pitfall 4). Data queries use _build_filtered_query_with_handles() instead.
|
||||
"""
|
||||
q = select(AuditLog).order_by(AuditLog.created_at.desc())
|
||||
if start is not None:
|
||||
@@ -80,13 +121,129 @@ def _build_filtered_query(
|
||||
return q
|
||||
|
||||
|
||||
def _build_filtered_query_with_handles(
|
||||
start: Optional[datetime],
|
||||
end: Optional[datetime],
|
||||
user_uuid: Optional[uuid.UUID],
|
||||
event_type: Optional[str],
|
||||
):
|
||||
"""Return a multi-column Select that joins User twice for handle enrichment.
|
||||
|
||||
Yields (AuditLog, user_handle: str|None, actor_handle: str|None) tuples.
|
||||
Uses SQLAlchemy aliased() to join User twice without collision:
|
||||
- UserSubject: resolves user_id FK → handle
|
||||
- UserActor: resolves actor_id FK → handle
|
||||
|
||||
outerjoin() ensures entries with NULL user_id or actor_id are still returned.
|
||||
"""
|
||||
UserSubject = aliased(User)
|
||||
UserActor = aliased(User)
|
||||
|
||||
q = (
|
||||
select(
|
||||
AuditLog,
|
||||
UserSubject.handle.label("user_handle"),
|
||||
UserActor.handle.label("actor_handle"),
|
||||
)
|
||||
.outerjoin(UserSubject, UserSubject.id == AuditLog.user_id)
|
||||
.outerjoin(UserActor, UserActor.id == AuditLog.actor_id)
|
||||
.order_by(AuditLog.created_at.desc())
|
||||
)
|
||||
if start is not None:
|
||||
q = q.where(AuditLog.created_at >= start)
|
||||
if end is not None:
|
||||
q = q.where(AuditLog.created_at <= end)
|
||||
if user_uuid is not None:
|
||||
q = q.where(AuditLog.user_id == user_uuid)
|
||||
if event_type is not None:
|
||||
q = q.where(AuditLog.event_type == event_type)
|
||||
return q
|
||||
|
||||
|
||||
# ── Endpoints ─────────────────────────────────────────────────────────────────
|
||||
# IMPORTANT: daily-export routes are registered BEFORE /audit-log and
|
||||
# /audit-log/export so FastAPI matches the more specific paths first.
|
||||
|
||||
|
||||
@router.get("/audit-log/daily-exports")
|
||||
async def list_daily_exports(
|
||||
_admin: User = Depends(get_current_admin),
|
||||
) -> dict:
|
||||
"""List available Celery daily audit export files from MinIO (D-15).
|
||||
|
||||
Returns: { items: [{ date: "YYYY-MM-DD", key: "audit-logs/YYYY-MM-DD.csv" }] }
|
||||
Items are sorted descending by date.
|
||||
|
||||
Security: requires get_current_admin — regular users receive 403 (T-06.2-04-02).
|
||||
Event loop safety: list_objects() is synchronous; wrapped in asyncio.to_thread
|
||||
to avoid blocking the event loop (T-06.2-04-05).
|
||||
"""
|
||||
backend = get_storage_backend()
|
||||
if not isinstance(backend, MinIOBackend):
|
||||
return {"items": []}
|
||||
|
||||
def _list() -> list:
|
||||
objects = backend._client.list_objects(
|
||||
"audit-logs", prefix="audit-logs/", recursive=False
|
||||
)
|
||||
items = []
|
||||
for obj in objects:
|
||||
name = obj.object_name or ""
|
||||
if name.endswith(".csv"):
|
||||
date_str = name.removeprefix("audit-logs/").removesuffix(".csv")
|
||||
items.append({"date": date_str, "key": name})
|
||||
items.sort(key=lambda x: x["date"], reverse=True)
|
||||
return items
|
||||
|
||||
items = await asyncio.to_thread(_list)
|
||||
return {"items": items}
|
||||
|
||||
|
||||
@router.get("/audit-log/daily-exports/{date}")
|
||||
async def download_daily_export(
|
||||
date: str,
|
||||
_admin: User = Depends(get_current_admin),
|
||||
) -> StreamingResponse:
|
||||
"""Stream a specific Celery daily audit export file from MinIO (D-16).
|
||||
|
||||
The date path parameter is validated against YYYY-MM-DD regex before
|
||||
MinIO key construction to prevent path traversal (T-06.2-04-01, Pitfall 6).
|
||||
|
||||
Returns: StreamingResponse with Content-Type: text/csv.
|
||||
|
||||
Security: requires get_current_admin — regular users receive 403 (T-06.2-04-02).
|
||||
"""
|
||||
if not re.fullmatch(r"\d{4}-\d{2}-\d{2}", date):
|
||||
raise HTTPException(status_code=404, detail="Invalid date format")
|
||||
|
||||
backend = get_storage_backend()
|
||||
key = f"audit-logs/{date}.csv"
|
||||
|
||||
def _get() -> bytes:
|
||||
response = backend._client.get_object("audit-logs", key)
|
||||
try:
|
||||
return response.read()
|
||||
finally:
|
||||
response.close()
|
||||
response.release_conn()
|
||||
|
||||
try:
|
||||
csv_bytes = await asyncio.to_thread(_get)
|
||||
except Exception:
|
||||
raise HTTPException(status_code=404, detail="Export not found")
|
||||
|
||||
return StreamingResponse(
|
||||
iter([csv_bytes]),
|
||||
media_type="text/csv",
|
||||
headers={"Content-Disposition": f'attachment; filename="audit-{date}.csv"'},
|
||||
)
|
||||
|
||||
|
||||
@router.get("/audit-log")
|
||||
async def list_audit_log(
|
||||
start: Optional[datetime] = Query(default=None),
|
||||
end: Optional[datetime] = Query(default=None),
|
||||
user_id: Optional[uuid.UUID] = Query(default=None),
|
||||
user_handle: Optional[str] = Query(default=None),
|
||||
event_type: Optional[str] = Query(default=None),
|
||||
page: int = Query(default=1, ge=1),
|
||||
per_page: int = Query(default=50, ge=1, le=500),
|
||||
@@ -96,22 +253,51 @@ async def list_audit_log(
|
||||
"""Return paginated, filtered audit log entries (ADMIN-06).
|
||||
|
||||
Response: { items: [...], total: int, page: int, per_page: int }
|
||||
Each item includes user_handle and actor_handle alongside UUID fields (D-11).
|
||||
Entries never contain filename, extracted_text, or document content (D-15).
|
||||
"""
|
||||
base_q = _build_filtered_query(start, end, user_id, event_type)
|
||||
|
||||
# Total count — same filters, no limit/offset
|
||||
count_q = select(func.count()).select_from(base_q.subquery())
|
||||
user_handle filter: accepts a plain string handle and resolves to UUID
|
||||
internally. Returns empty results (not 422) for unknown handles (D-12).
|
||||
"""
|
||||
# Handle-to-UUID resolution (D-12, Pattern 4)
|
||||
user_uuid: Optional[uuid.UUID] = None
|
||||
if user_handle:
|
||||
handle_result = await session.execute(
|
||||
select(User.id).where(User.handle == user_handle)
|
||||
)
|
||||
uid = handle_result.scalar_one_or_none()
|
||||
if uid is None:
|
||||
# No user with that handle — return empty results (D-12)
|
||||
return {"items": [], "total": 0, "page": page, "per_page": per_page}
|
||||
user_uuid = uid
|
||||
|
||||
# Count query: use the plain _build_filtered_query (no JOIN) to avoid
|
||||
# COUNT ambiguity on multi-column subqueries (Pitfall 4)
|
||||
count_q = select(func.count(AuditLog.id)).where(True)
|
||||
if start is not None:
|
||||
count_q = count_q.where(AuditLog.created_at >= start)
|
||||
if end is not None:
|
||||
count_q = count_q.where(AuditLog.created_at <= end)
|
||||
if user_uuid is not None:
|
||||
count_q = count_q.where(AuditLog.user_id == user_uuid)
|
||||
if event_type is not None:
|
||||
count_q = count_q.where(AuditLog.event_type == event_type)
|
||||
count_result = await session.execute(count_q)
|
||||
total = count_result.scalar_one()
|
||||
|
||||
# Paginated rows
|
||||
paginated_q = base_q.limit(per_page).offset((page - 1) * per_page)
|
||||
result = await session.execute(paginated_q)
|
||||
entries = result.scalars().all()
|
||||
# Data query: use enriched JOIN for handle fields
|
||||
data_q = _build_filtered_query_with_handles(start, end, user_uuid, event_type)
|
||||
data_q = data_q.limit(per_page).offset((page - 1) * per_page)
|
||||
result = await session.execute(data_q)
|
||||
rows = result.all()
|
||||
|
||||
items = []
|
||||
for row in rows:
|
||||
entry, user_handle_val, actor_handle_val = row[0], row[1], row[2]
|
||||
items.append(_audit_to_dict_with_handles(entry, user_handle_val, actor_handle_val))
|
||||
|
||||
return {
|
||||
"items": [_audit_to_dict(e) for e in entries],
|
||||
"items": items,
|
||||
"total": total,
|
||||
"page": page,
|
||||
"per_page": per_page,
|
||||
@@ -122,7 +308,7 @@ async def list_audit_log(
|
||||
async def export_audit_log(
|
||||
start: Optional[datetime] = Query(default=None),
|
||||
end: Optional[datetime] = Query(default=None),
|
||||
user_id: Optional[uuid.UUID] = Query(default=None),
|
||||
user_handle: Optional[str] = Query(default=None),
|
||||
event_type: Optional[str] = Query(default=None),
|
||||
format: str = Query(default="csv"), # noqa: A002
|
||||
session: AsyncSession = Depends(get_db),
|
||||
@@ -130,20 +316,49 @@ async def export_audit_log(
|
||||
) -> StreamingResponse:
|
||||
"""Stream a CSV export of filtered audit log entries (ADMIN-06).
|
||||
|
||||
Uses the same _audit_to_dict() whitelist as the JSON viewer — no filename,
|
||||
extracted_text, or document content appears in the export (D-15, T-04-06-02).
|
||||
Uses the same _audit_to_dict_with_handles() whitelist as the JSON viewer —
|
||||
includes user_handle and actor_handle; no filename, extracted_text, or
|
||||
document content appears in the export (D-15, T-04-06-02, Pitfall 7).
|
||||
|
||||
Returns StreamingResponse with Content-Disposition: attachment; filename=audit-export.csv.
|
||||
|
||||
user_handle filter: same handle-to-UUID resolution as the viewer (D-12).
|
||||
"""
|
||||
q = _build_filtered_query(start, end, user_id, event_type)
|
||||
# Handle-to-UUID resolution (D-12) — same logic as list_audit_log
|
||||
user_uuid: Optional[uuid.UUID] = None
|
||||
if user_handle:
|
||||
handle_result = await session.execute(
|
||||
select(User.id).where(User.handle == user_handle)
|
||||
)
|
||||
uid = handle_result.scalar_one_or_none()
|
||||
if uid is None:
|
||||
# Unknown handle — return empty CSV
|
||||
empty_output = io.StringIO()
|
||||
fields = [
|
||||
"id", "event_type", "user_id", "actor_id", "user_handle", "actor_handle",
|
||||
"resource_id", "ip_address", "metadata_", "created_at",
|
||||
]
|
||||
writer = csv.DictWriter(empty_output, fieldnames=fields)
|
||||
writer.writeheader()
|
||||
return StreamingResponse(
|
||||
iter([empty_output.getvalue()]),
|
||||
media_type="text/csv",
|
||||
headers={"Content-Disposition": "attachment; filename=audit-export.csv"},
|
||||
)
|
||||
user_uuid = uid
|
||||
|
||||
# Data query with handle enrichment (Pitfall 7 — export must use enriched function)
|
||||
q = _build_filtered_query_with_handles(start, end, user_uuid, event_type)
|
||||
result = await session.execute(q)
|
||||
entries = result.scalars().all()
|
||||
rows = result.all()
|
||||
|
||||
fields = [
|
||||
"id",
|
||||
"event_type",
|
||||
"user_id",
|
||||
"actor_id",
|
||||
"user_handle",
|
||||
"actor_handle",
|
||||
"resource_id",
|
||||
"ip_address",
|
||||
"metadata_",
|
||||
@@ -152,8 +367,9 @@ async def export_audit_log(
|
||||
output = io.StringIO()
|
||||
writer = csv.DictWriter(output, fieldnames=fields)
|
||||
writer.writeheader()
|
||||
for entry in entries:
|
||||
writer.writerow(_audit_to_dict(entry))
|
||||
for row in rows:
|
||||
entry, user_handle_val, actor_handle_val = row[0], row[1], row[2]
|
||||
writer.writerow(_audit_to_dict_with_handles(entry, user_handle_val, actor_handle_val))
|
||||
|
||||
return StreamingResponse(
|
||||
iter([output.getvalue()]),
|
||||
|
||||
@@ -178,8 +178,10 @@ async def test_audit_log_export_csv(async_client, admin_user, db_session):
|
||||
f"got '{content_disposition}'"
|
||||
)
|
||||
|
||||
# Phase 6.2: CSV now includes user_handle and actor_handle columns (D-11, Pitfall 7)
|
||||
expected_header = (
|
||||
"id,event_type,user_id,actor_id,resource_id,ip_address,metadata_,created_at"
|
||||
"id,event_type,user_id,actor_id,user_handle,actor_handle,"
|
||||
"resource_id,ip_address,metadata_,created_at"
|
||||
)
|
||||
assert expected_header in response.text, (
|
||||
f"CSV header line not found in response. "
|
||||
|
||||
Reference in New Issue
Block a user