feat(06.2-04): backend — handle enrichment, user_handle filter, two daily-export endpoints

- Add _audit_to_dict_with_handles() with user_handle + actor_handle fields
- Add _build_filtered_query_with_handles() with aliased User double-JOIN
- Change list_audit_log user_id param to user_handle string with handle→UUID resolution
- Change export_audit_log user_id param to user_handle (Pitfall 7 — both endpoints enriched)
- Add GET /audit-log/daily-exports — lists MinIO audit-logs bucket, asyncio.to_thread
- Add GET /audit-log/daily-exports/{date} — streams CSV, date regex validation (T-06.2-04-01)
- Move daily-export endpoints before viewer to ensure specific path registration order
- Update test_audit_log_export_csv to match enriched CSV header (user_handle, actor_handle)
- All 10 test_audit.py tests pass
This commit is contained in:
curo1305
2026-05-31 15:17:53 +02:00
parent d7cfc5ccee
commit 839bfe0ffe
2 changed files with 245 additions and 27 deletions
+240 -24
View File
@@ -7,34 +7,43 @@ receive 403 Forbidden.
Implements:
GET /api/admin/audit-log — paginated, filtered audit log viewer
GET /api/admin/audit-log/export — CSV streaming export with same filters
GET /api/admin/audit-log/daily-exports — list available Celery daily export files
GET /api/admin/audit-log/daily-exports/{date} — stream a specific daily export CSV
Security invariants:
- Both endpoints use Depends(get_current_admin) — verified by grep
- All endpoints use Depends(get_current_admin) — verified by grep
- _audit_to_dict() is a pure whitelist: no filename, extracted_text,
password_hash, or credentials_enc can appear in responses (ADMIN-06, D-15)
- CSV export uses the same _audit_to_dict() helper as the JSON viewer
- CSV export uses the same _audit_to_dict_with_handles() helper as the JSON viewer
- Date path parameter validated against YYYY-MM-DD regex before MinIO key
construction — prevents path traversal (T-06.2-04-01, Pitfall 6)
"""
from __future__ import annotations
import asyncio
import csv
import io
import re
import uuid
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Depends, Query
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi.responses import StreamingResponse
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import aliased
from db.models import AuditLog, User
from deps.auth import get_current_admin
from deps.db import get_db
from storage import get_storage_backend
from storage.minio_backend import MinIOBackend
router = APIRouter(prefix="/api/admin", tags=["audit"])
# ── Safe response helper ─────────────────────────────────────────────────────
# ── Safe response helpers ─────────────────────────────────────────────────────
def _audit_to_dict(entry: AuditLog) -> dict:
"""Safe audit log serializer — never includes filename, extracted_text, or
@@ -55,7 +64,35 @@ def _audit_to_dict(entry: AuditLog) -> dict:
}
# ── Query builder helper ──────────────────────────────────────────────────────
def _audit_to_dict_with_handles(
entry: AuditLog,
user_handle: Optional[str],
actor_handle: Optional[str],
) -> dict:
"""Extended audit log serializer that includes user_handle and actor_handle.
Returns the same fields as _audit_to_dict() plus:
- user_handle: str | None (the handle of the user who owns the entry)
- actor_handle: str | None (the handle of the actor who performed the event)
Used by both the JSON viewer and CSV export endpoints (Pitfall 7 — both
endpoints must use the enriched function).
"""
return {
"id": entry.id,
"event_type": entry.event_type,
"user_id": str(entry.user_id) if entry.user_id else None,
"actor_id": str(entry.actor_id) if entry.actor_id else None,
"user_handle": user_handle or None,
"actor_handle": actor_handle or None,
"resource_id": str(entry.resource_id) if entry.resource_id else None,
"ip_address": str(entry.ip_address) if entry.ip_address else None,
"metadata_": entry.metadata_,
"created_at": entry.created_at.isoformat(),
}
# ── Query builder helpers ─────────────────────────────────────────────────────
def _build_filtered_query(
start: Optional[datetime],
@@ -65,8 +102,12 @@ def _build_filtered_query(
):
"""Return a SQLAlchemy Select for AuditLog with the given filters applied.
Shared by both the paginated viewer and the CSV export endpoints to ensure
consistent filter semantics.
Shared by count queries in both the paginated viewer and the CSV export
endpoints to ensure consistent filter semantics.
NOTE: This function selects AuditLog only (no JOIN). It is used for COUNT
queries to avoid the subquery ambiguity that arises with multi-column JOINs
(Pitfall 4). Data queries use _build_filtered_query_with_handles() instead.
"""
q = select(AuditLog).order_by(AuditLog.created_at.desc())
if start is not None:
@@ -80,13 +121,129 @@ def _build_filtered_query(
return q
def _build_filtered_query_with_handles(
start: Optional[datetime],
end: Optional[datetime],
user_uuid: Optional[uuid.UUID],
event_type: Optional[str],
):
"""Return a multi-column Select that joins User twice for handle enrichment.
Yields (AuditLog, user_handle: str|None, actor_handle: str|None) tuples.
Uses SQLAlchemy aliased() to join User twice without collision:
- UserSubject: resolves user_id FK → handle
- UserActor: resolves actor_id FK → handle
outerjoin() ensures entries with NULL user_id or actor_id are still returned.
"""
UserSubject = aliased(User)
UserActor = aliased(User)
q = (
select(
AuditLog,
UserSubject.handle.label("user_handle"),
UserActor.handle.label("actor_handle"),
)
.outerjoin(UserSubject, UserSubject.id == AuditLog.user_id)
.outerjoin(UserActor, UserActor.id == AuditLog.actor_id)
.order_by(AuditLog.created_at.desc())
)
if start is not None:
q = q.where(AuditLog.created_at >= start)
if end is not None:
q = q.where(AuditLog.created_at <= end)
if user_uuid is not None:
q = q.where(AuditLog.user_id == user_uuid)
if event_type is not None:
q = q.where(AuditLog.event_type == event_type)
return q
# ── Endpoints ─────────────────────────────────────────────────────────────────
# IMPORTANT: daily-export routes are registered BEFORE /audit-log and
# /audit-log/export so FastAPI matches the more specific paths first.
@router.get("/audit-log/daily-exports")
async def list_daily_exports(
_admin: User = Depends(get_current_admin),
) -> dict:
"""List available Celery daily audit export files from MinIO (D-15).
Returns: { items: [{ date: "YYYY-MM-DD", key: "audit-logs/YYYY-MM-DD.csv" }] }
Items are sorted descending by date.
Security: requires get_current_admin — regular users receive 403 (T-06.2-04-02).
Event loop safety: list_objects() is synchronous; wrapped in asyncio.to_thread
to avoid blocking the event loop (T-06.2-04-05).
"""
backend = get_storage_backend()
if not isinstance(backend, MinIOBackend):
return {"items": []}
def _list() -> list:
objects = backend._client.list_objects(
"audit-logs", prefix="audit-logs/", recursive=False
)
items = []
for obj in objects:
name = obj.object_name or ""
if name.endswith(".csv"):
date_str = name.removeprefix("audit-logs/").removesuffix(".csv")
items.append({"date": date_str, "key": name})
items.sort(key=lambda x: x["date"], reverse=True)
return items
items = await asyncio.to_thread(_list)
return {"items": items}
@router.get("/audit-log/daily-exports/{date}")
async def download_daily_export(
date: str,
_admin: User = Depends(get_current_admin),
) -> StreamingResponse:
"""Stream a specific Celery daily audit export file from MinIO (D-16).
The date path parameter is validated against YYYY-MM-DD regex before
MinIO key construction to prevent path traversal (T-06.2-04-01, Pitfall 6).
Returns: StreamingResponse with Content-Type: text/csv.
Security: requires get_current_admin — regular users receive 403 (T-06.2-04-02).
"""
if not re.fullmatch(r"\d{4}-\d{2}-\d{2}", date):
raise HTTPException(status_code=404, detail="Invalid date format")
backend = get_storage_backend()
key = f"audit-logs/{date}.csv"
def _get() -> bytes:
response = backend._client.get_object("audit-logs", key)
try:
return response.read()
finally:
response.close()
response.release_conn()
try:
csv_bytes = await asyncio.to_thread(_get)
except Exception:
raise HTTPException(status_code=404, detail="Export not found")
return StreamingResponse(
iter([csv_bytes]),
media_type="text/csv",
headers={"Content-Disposition": f'attachment; filename="audit-{date}.csv"'},
)
@router.get("/audit-log")
async def list_audit_log(
start: Optional[datetime] = Query(default=None),
end: Optional[datetime] = Query(default=None),
user_id: Optional[uuid.UUID] = Query(default=None),
user_handle: Optional[str] = Query(default=None),
event_type: Optional[str] = Query(default=None),
page: int = Query(default=1, ge=1),
per_page: int = Query(default=50, ge=1, le=500),
@@ -96,22 +253,51 @@ async def list_audit_log(
"""Return paginated, filtered audit log entries (ADMIN-06).
Response: { items: [...], total: int, page: int, per_page: int }
Each item includes user_handle and actor_handle alongside UUID fields (D-11).
Entries never contain filename, extracted_text, or document content (D-15).
"""
base_q = _build_filtered_query(start, end, user_id, event_type)
# Total count — same filters, no limit/offset
count_q = select(func.count()).select_from(base_q.subquery())
user_handle filter: accepts a plain string handle and resolves to UUID
internally. Returns empty results (not 422) for unknown handles (D-12).
"""
# Handle-to-UUID resolution (D-12, Pattern 4)
user_uuid: Optional[uuid.UUID] = None
if user_handle:
handle_result = await session.execute(
select(User.id).where(User.handle == user_handle)
)
uid = handle_result.scalar_one_or_none()
if uid is None:
# No user with that handle — return empty results (D-12)
return {"items": [], "total": 0, "page": page, "per_page": per_page}
user_uuid = uid
# Count query: use the plain _build_filtered_query (no JOIN) to avoid
# COUNT ambiguity on multi-column subqueries (Pitfall 4)
count_q = select(func.count(AuditLog.id)).where(True)
if start is not None:
count_q = count_q.where(AuditLog.created_at >= start)
if end is not None:
count_q = count_q.where(AuditLog.created_at <= end)
if user_uuid is not None:
count_q = count_q.where(AuditLog.user_id == user_uuid)
if event_type is not None:
count_q = count_q.where(AuditLog.event_type == event_type)
count_result = await session.execute(count_q)
total = count_result.scalar_one()
# Paginated rows
paginated_q = base_q.limit(per_page).offset((page - 1) * per_page)
result = await session.execute(paginated_q)
entries = result.scalars().all()
# Data query: use enriched JOIN for handle fields
data_q = _build_filtered_query_with_handles(start, end, user_uuid, event_type)
data_q = data_q.limit(per_page).offset((page - 1) * per_page)
result = await session.execute(data_q)
rows = result.all()
items = []
for row in rows:
entry, user_handle_val, actor_handle_val = row[0], row[1], row[2]
items.append(_audit_to_dict_with_handles(entry, user_handle_val, actor_handle_val))
return {
"items": [_audit_to_dict(e) for e in entries],
"items": items,
"total": total,
"page": page,
"per_page": per_page,
@@ -122,7 +308,7 @@ async def list_audit_log(
async def export_audit_log(
start: Optional[datetime] = Query(default=None),
end: Optional[datetime] = Query(default=None),
user_id: Optional[uuid.UUID] = Query(default=None),
user_handle: Optional[str] = Query(default=None),
event_type: Optional[str] = Query(default=None),
format: str = Query(default="csv"), # noqa: A002
session: AsyncSession = Depends(get_db),
@@ -130,20 +316,49 @@ async def export_audit_log(
) -> StreamingResponse:
"""Stream a CSV export of filtered audit log entries (ADMIN-06).
Uses the same _audit_to_dict() whitelist as the JSON viewer — no filename,
extracted_text, or document content appears in the export (D-15, T-04-06-02).
Uses the same _audit_to_dict_with_handles() whitelist as the JSON viewer —
includes user_handle and actor_handle; no filename, extracted_text, or
document content appears in the export (D-15, T-04-06-02, Pitfall 7).
Returns StreamingResponse with Content-Disposition: attachment; filename=audit-export.csv.
user_handle filter: same handle-to-UUID resolution as the viewer (D-12).
"""
q = _build_filtered_query(start, end, user_id, event_type)
# Handle-to-UUID resolution (D-12) — same logic as list_audit_log
user_uuid: Optional[uuid.UUID] = None
if user_handle:
handle_result = await session.execute(
select(User.id).where(User.handle == user_handle)
)
uid = handle_result.scalar_one_or_none()
if uid is None:
# Unknown handle — return empty CSV
empty_output = io.StringIO()
fields = [
"id", "event_type", "user_id", "actor_id", "user_handle", "actor_handle",
"resource_id", "ip_address", "metadata_", "created_at",
]
writer = csv.DictWriter(empty_output, fieldnames=fields)
writer.writeheader()
return StreamingResponse(
iter([empty_output.getvalue()]),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=audit-export.csv"},
)
user_uuid = uid
# Data query with handle enrichment (Pitfall 7 — export must use enriched function)
q = _build_filtered_query_with_handles(start, end, user_uuid, event_type)
result = await session.execute(q)
entries = result.scalars().all()
rows = result.all()
fields = [
"id",
"event_type",
"user_id",
"actor_id",
"user_handle",
"actor_handle",
"resource_id",
"ip_address",
"metadata_",
@@ -152,8 +367,9 @@ async def export_audit_log(
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=fields)
writer.writeheader()
for entry in entries:
writer.writerow(_audit_to_dict(entry))
for row in rows:
entry, user_handle_val, actor_handle_val = row[0], row[1], row[2]
writer.writerow(_audit_to_dict_with_handles(entry, user_handle_val, actor_handle_val))
return StreamingResponse(
iter([output.getvalue()]),
+3 -1
View File
@@ -178,8 +178,10 @@ async def test_audit_log_export_csv(async_client, admin_user, db_session):
f"got '{content_disposition}'"
)
# Phase 6.2: CSV now includes user_handle and actor_handle columns (D-11, Pitfall 7)
expected_header = (
"id,event_type,user_id,actor_id,resource_id,ip_address,metadata_,created_at"
"id,event_type,user_id,actor_id,user_handle,actor_handle,"
"resource_id,ip_address,metadata_,created_at"
)
assert expected_header in response.text, (
f"CSV header line not found in response. "