feat(06.2-04): backend — handle enrichment, user_handle filter, two daily-export endpoints
- Add _audit_to_dict_with_handles() with user_handle + actor_handle fields
- Add _build_filtered_query_with_handles() with aliased User double-JOIN
- Change list_audit_log user_id param to user_handle string with handle→UUID resolution
- Change export_audit_log user_id param to user_handle (Pitfall 7 — both endpoints enriched)
- Add GET /audit-log/daily-exports — lists MinIO audit-logs bucket, asyncio.to_thread
- Add GET /audit-log/daily-exports/{date} — streams CSV, date regex validation (T-06.2-04-01)
- Move daily-export endpoints before viewer to ensure specific path registration order
- Update test_audit_log_export_csv to match enriched CSV header (user_handle, actor_handle)
- All 10 test_audit.py tests pass
This commit is contained in:
+242
-26
@@ -5,36 +5,45 @@ All handlers require get_current_admin (ADMIN-06, SEC-07) — regular users
|
|||||||
receive 403 Forbidden.
|
receive 403 Forbidden.
|
||||||
|
|
||||||
Implements:
|
Implements:
|
||||||
GET /api/admin/audit-log — paginated, filtered audit log viewer
|
GET /api/admin/audit-log — paginated, filtered audit log viewer
|
||||||
GET /api/admin/audit-log/export — CSV streaming export with same filters
|
GET /api/admin/audit-log/export — CSV streaming export with same filters
|
||||||
|
GET /api/admin/audit-log/daily-exports — list available Celery daily export files
|
||||||
|
GET /api/admin/audit-log/daily-exports/{date} — stream a specific daily export CSV
|
||||||
|
|
||||||
Security invariants:
|
Security invariants:
|
||||||
- Both endpoints use Depends(get_current_admin) — verified by grep
|
- All endpoints use Depends(get_current_admin) — verified by grep
|
||||||
- _audit_to_dict() is a pure whitelist: no filename, extracted_text,
|
- _audit_to_dict() is a pure whitelist: no filename, extracted_text,
|
||||||
password_hash, or credentials_enc can appear in responses (ADMIN-06, D-15)
|
password_hash, or credentials_enc can appear in responses (ADMIN-06, D-15)
|
||||||
- CSV export uses the same _audit_to_dict() helper as the JSON viewer
|
- CSV export uses the same _audit_to_dict_with_handles() helper as the JSON viewer
|
||||||
|
- Date path parameter validated against YYYY-MM-DD regex before MinIO key
|
||||||
|
construction — prevents path traversal (T-06.2-04-01, Pitfall 6)
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
import csv
|
import csv
|
||||||
import io
|
import io
|
||||||
|
import re
|
||||||
import uuid
|
import uuid
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
from fastapi import APIRouter, Depends, Query
|
from fastapi import APIRouter, Depends, HTTPException, Query
|
||||||
from fastapi.responses import StreamingResponse
|
from fastapi.responses import StreamingResponse
|
||||||
from sqlalchemy import func, select
|
from sqlalchemy import func, select
|
||||||
from sqlalchemy.ext.asyncio import AsyncSession
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
from sqlalchemy.orm import aliased
|
||||||
|
|
||||||
from db.models import AuditLog, User
|
from db.models import AuditLog, User
|
||||||
from deps.auth import get_current_admin
|
from deps.auth import get_current_admin
|
||||||
from deps.db import get_db
|
from deps.db import get_db
|
||||||
|
from storage import get_storage_backend
|
||||||
|
from storage.minio_backend import MinIOBackend
|
||||||
|
|
||||||
router = APIRouter(prefix="/api/admin", tags=["audit"])
|
router = APIRouter(prefix="/api/admin", tags=["audit"])
|
||||||
|
|
||||||
|
|
||||||
# ── Safe response helper ──────────────────────────────────────────────────────
|
# ── Safe response helpers ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
def _audit_to_dict(entry: AuditLog) -> dict:
|
def _audit_to_dict(entry: AuditLog) -> dict:
|
||||||
"""Safe audit log serializer — never includes filename, extracted_text, or
|
"""Safe audit log serializer — never includes filename, extracted_text, or
|
||||||
@@ -55,7 +64,35 @@ def _audit_to_dict(entry: AuditLog) -> dict:
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
# ── Query builder helper ──────────────────────────────────────────────────────
|
def _audit_to_dict_with_handles(
|
||||||
|
entry: AuditLog,
|
||||||
|
user_handle: Optional[str],
|
||||||
|
actor_handle: Optional[str],
|
||||||
|
) -> dict:
|
||||||
|
"""Extended audit log serializer that includes user_handle and actor_handle.
|
||||||
|
|
||||||
|
Returns the same fields as _audit_to_dict() plus:
|
||||||
|
- user_handle: str | None (the handle of the user who owns the entry)
|
||||||
|
- actor_handle: str | None (the handle of the actor who performed the event)
|
||||||
|
|
||||||
|
Used by both the JSON viewer and CSV export endpoints (Pitfall 7 — both
|
||||||
|
endpoints must use the enriched function).
|
||||||
|
"""
|
||||||
|
return {
|
||||||
|
"id": entry.id,
|
||||||
|
"event_type": entry.event_type,
|
||||||
|
"user_id": str(entry.user_id) if entry.user_id else None,
|
||||||
|
"actor_id": str(entry.actor_id) if entry.actor_id else None,
|
||||||
|
"user_handle": user_handle or None,
|
||||||
|
"actor_handle": actor_handle or None,
|
||||||
|
"resource_id": str(entry.resource_id) if entry.resource_id else None,
|
||||||
|
"ip_address": str(entry.ip_address) if entry.ip_address else None,
|
||||||
|
"metadata_": entry.metadata_,
|
||||||
|
"created_at": entry.created_at.isoformat(),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ── Query builder helpers ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
def _build_filtered_query(
|
def _build_filtered_query(
|
||||||
start: Optional[datetime],
|
start: Optional[datetime],
|
||||||
@@ -65,8 +102,12 @@ def _build_filtered_query(
|
|||||||
):
|
):
|
||||||
"""Return a SQLAlchemy Select for AuditLog with the given filters applied.
|
"""Return a SQLAlchemy Select for AuditLog with the given filters applied.
|
||||||
|
|
||||||
Shared by both the paginated viewer and the CSV export endpoints to ensure
|
Shared by count queries in both the paginated viewer and the CSV export
|
||||||
consistent filter semantics.
|
endpoints to ensure consistent filter semantics.
|
||||||
|
|
||||||
|
NOTE: This function selects AuditLog only (no JOIN). It is used for COUNT
|
||||||
|
queries to avoid the subquery ambiguity that arises with multi-column JOINs
|
||||||
|
(Pitfall 4). Data queries use _build_filtered_query_with_handles() instead.
|
||||||
"""
|
"""
|
||||||
q = select(AuditLog).order_by(AuditLog.created_at.desc())
|
q = select(AuditLog).order_by(AuditLog.created_at.desc())
|
||||||
if start is not None:
|
if start is not None:
|
||||||
@@ -80,13 +121,129 @@ def _build_filtered_query(
|
|||||||
return q
|
return q
|
||||||
|
|
||||||
|
|
||||||
|
def _build_filtered_query_with_handles(
|
||||||
|
start: Optional[datetime],
|
||||||
|
end: Optional[datetime],
|
||||||
|
user_uuid: Optional[uuid.UUID],
|
||||||
|
event_type: Optional[str],
|
||||||
|
):
|
||||||
|
"""Return a multi-column Select that joins User twice for handle enrichment.
|
||||||
|
|
||||||
|
Yields (AuditLog, user_handle: str|None, actor_handle: str|None) tuples.
|
||||||
|
Uses SQLAlchemy aliased() to join User twice without collision:
|
||||||
|
- UserSubject: resolves user_id FK → handle
|
||||||
|
- UserActor: resolves actor_id FK → handle
|
||||||
|
|
||||||
|
outerjoin() ensures entries with NULL user_id or actor_id are still returned.
|
||||||
|
"""
|
||||||
|
UserSubject = aliased(User)
|
||||||
|
UserActor = aliased(User)
|
||||||
|
|
||||||
|
q = (
|
||||||
|
select(
|
||||||
|
AuditLog,
|
||||||
|
UserSubject.handle.label("user_handle"),
|
||||||
|
UserActor.handle.label("actor_handle"),
|
||||||
|
)
|
||||||
|
.outerjoin(UserSubject, UserSubject.id == AuditLog.user_id)
|
||||||
|
.outerjoin(UserActor, UserActor.id == AuditLog.actor_id)
|
||||||
|
.order_by(AuditLog.created_at.desc())
|
||||||
|
)
|
||||||
|
if start is not None:
|
||||||
|
q = q.where(AuditLog.created_at >= start)
|
||||||
|
if end is not None:
|
||||||
|
q = q.where(AuditLog.created_at <= end)
|
||||||
|
if user_uuid is not None:
|
||||||
|
q = q.where(AuditLog.user_id == user_uuid)
|
||||||
|
if event_type is not None:
|
||||||
|
q = q.where(AuditLog.event_type == event_type)
|
||||||
|
return q
|
||||||
|
|
||||||
|
|
||||||
# ── Endpoints ─────────────────────────────────────────────────────────────────
|
# ── Endpoints ─────────────────────────────────────────────────────────────────
|
||||||
|
# IMPORTANT: daily-export routes are registered BEFORE /audit-log and
|
||||||
|
# /audit-log/export so FastAPI matches the more specific paths first.
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/audit-log/daily-exports")
|
||||||
|
async def list_daily_exports(
|
||||||
|
_admin: User = Depends(get_current_admin),
|
||||||
|
) -> dict:
|
||||||
|
"""List available Celery daily audit export files from MinIO (D-15).
|
||||||
|
|
||||||
|
Returns: { items: [{ date: "YYYY-MM-DD", key: "audit-logs/YYYY-MM-DD.csv" }] }
|
||||||
|
Items are sorted descending by date.
|
||||||
|
|
||||||
|
Security: requires get_current_admin — regular users receive 403 (T-06.2-04-02).
|
||||||
|
Event loop safety: list_objects() is synchronous; wrapped in asyncio.to_thread
|
||||||
|
to avoid blocking the event loop (T-06.2-04-05).
|
||||||
|
"""
|
||||||
|
backend = get_storage_backend()
|
||||||
|
if not isinstance(backend, MinIOBackend):
|
||||||
|
return {"items": []}
|
||||||
|
|
||||||
|
def _list() -> list:
|
||||||
|
objects = backend._client.list_objects(
|
||||||
|
"audit-logs", prefix="audit-logs/", recursive=False
|
||||||
|
)
|
||||||
|
items = []
|
||||||
|
for obj in objects:
|
||||||
|
name = obj.object_name or ""
|
||||||
|
if name.endswith(".csv"):
|
||||||
|
date_str = name.removeprefix("audit-logs/").removesuffix(".csv")
|
||||||
|
items.append({"date": date_str, "key": name})
|
||||||
|
items.sort(key=lambda x: x["date"], reverse=True)
|
||||||
|
return items
|
||||||
|
|
||||||
|
items = await asyncio.to_thread(_list)
|
||||||
|
return {"items": items}
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/audit-log/daily-exports/{date}")
|
||||||
|
async def download_daily_export(
|
||||||
|
date: str,
|
||||||
|
_admin: User = Depends(get_current_admin),
|
||||||
|
) -> StreamingResponse:
|
||||||
|
"""Stream a specific Celery daily audit export file from MinIO (D-16).
|
||||||
|
|
||||||
|
The date path parameter is validated against YYYY-MM-DD regex before
|
||||||
|
MinIO key construction to prevent path traversal (T-06.2-04-01, Pitfall 6).
|
||||||
|
|
||||||
|
Returns: StreamingResponse with Content-Type: text/csv.
|
||||||
|
|
||||||
|
Security: requires get_current_admin — regular users receive 403 (T-06.2-04-02).
|
||||||
|
"""
|
||||||
|
if not re.fullmatch(r"\d{4}-\d{2}-\d{2}", date):
|
||||||
|
raise HTTPException(status_code=404, detail="Invalid date format")
|
||||||
|
|
||||||
|
backend = get_storage_backend()
|
||||||
|
key = f"audit-logs/{date}.csv"
|
||||||
|
|
||||||
|
def _get() -> bytes:
|
||||||
|
response = backend._client.get_object("audit-logs", key)
|
||||||
|
try:
|
||||||
|
return response.read()
|
||||||
|
finally:
|
||||||
|
response.close()
|
||||||
|
response.release_conn()
|
||||||
|
|
||||||
|
try:
|
||||||
|
csv_bytes = await asyncio.to_thread(_get)
|
||||||
|
except Exception:
|
||||||
|
raise HTTPException(status_code=404, detail="Export not found")
|
||||||
|
|
||||||
|
return StreamingResponse(
|
||||||
|
iter([csv_bytes]),
|
||||||
|
media_type="text/csv",
|
||||||
|
headers={"Content-Disposition": f'attachment; filename="audit-{date}.csv"'},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@router.get("/audit-log")
|
@router.get("/audit-log")
|
||||||
async def list_audit_log(
|
async def list_audit_log(
|
||||||
start: Optional[datetime] = Query(default=None),
|
start: Optional[datetime] = Query(default=None),
|
||||||
end: Optional[datetime] = Query(default=None),
|
end: Optional[datetime] = Query(default=None),
|
||||||
user_id: Optional[uuid.UUID] = Query(default=None),
|
user_handle: Optional[str] = Query(default=None),
|
||||||
event_type: Optional[str] = Query(default=None),
|
event_type: Optional[str] = Query(default=None),
|
||||||
page: int = Query(default=1, ge=1),
|
page: int = Query(default=1, ge=1),
|
||||||
per_page: int = Query(default=50, ge=1, le=500),
|
per_page: int = Query(default=50, ge=1, le=500),
|
||||||
@@ -96,22 +253,51 @@ async def list_audit_log(
|
|||||||
"""Return paginated, filtered audit log entries (ADMIN-06).
|
"""Return paginated, filtered audit log entries (ADMIN-06).
|
||||||
|
|
||||||
Response: { items: [...], total: int, page: int, per_page: int }
|
Response: { items: [...], total: int, page: int, per_page: int }
|
||||||
|
Each item includes user_handle and actor_handle alongside UUID fields (D-11).
|
||||||
Entries never contain filename, extracted_text, or document content (D-15).
|
Entries never contain filename, extracted_text, or document content (D-15).
|
||||||
"""
|
|
||||||
base_q = _build_filtered_query(start, end, user_id, event_type)
|
|
||||||
|
|
||||||
# Total count — same filters, no limit/offset
|
user_handle filter: accepts a plain string handle and resolves to UUID
|
||||||
count_q = select(func.count()).select_from(base_q.subquery())
|
internally. Returns empty results (not 422) for unknown handles (D-12).
|
||||||
|
"""
|
||||||
|
# Handle-to-UUID resolution (D-12, Pattern 4)
|
||||||
|
user_uuid: Optional[uuid.UUID] = None
|
||||||
|
if user_handle:
|
||||||
|
handle_result = await session.execute(
|
||||||
|
select(User.id).where(User.handle == user_handle)
|
||||||
|
)
|
||||||
|
uid = handle_result.scalar_one_or_none()
|
||||||
|
if uid is None:
|
||||||
|
# No user with that handle — return empty results (D-12)
|
||||||
|
return {"items": [], "total": 0, "page": page, "per_page": per_page}
|
||||||
|
user_uuid = uid
|
||||||
|
|
||||||
|
# Count query: use the plain _build_filtered_query (no JOIN) to avoid
|
||||||
|
# COUNT ambiguity on multi-column subqueries (Pitfall 4)
|
||||||
|
count_q = select(func.count(AuditLog.id)).where(True)
|
||||||
|
if start is not None:
|
||||||
|
count_q = count_q.where(AuditLog.created_at >= start)
|
||||||
|
if end is not None:
|
||||||
|
count_q = count_q.where(AuditLog.created_at <= end)
|
||||||
|
if user_uuid is not None:
|
||||||
|
count_q = count_q.where(AuditLog.user_id == user_uuid)
|
||||||
|
if event_type is not None:
|
||||||
|
count_q = count_q.where(AuditLog.event_type == event_type)
|
||||||
count_result = await session.execute(count_q)
|
count_result = await session.execute(count_q)
|
||||||
total = count_result.scalar_one()
|
total = count_result.scalar_one()
|
||||||
|
|
||||||
# Paginated rows
|
# Data query: use enriched JOIN for handle fields
|
||||||
paginated_q = base_q.limit(per_page).offset((page - 1) * per_page)
|
data_q = _build_filtered_query_with_handles(start, end, user_uuid, event_type)
|
||||||
result = await session.execute(paginated_q)
|
data_q = data_q.limit(per_page).offset((page - 1) * per_page)
|
||||||
entries = result.scalars().all()
|
result = await session.execute(data_q)
|
||||||
|
rows = result.all()
|
||||||
|
|
||||||
|
items = []
|
||||||
|
for row in rows:
|
||||||
|
entry, user_handle_val, actor_handle_val = row[0], row[1], row[2]
|
||||||
|
items.append(_audit_to_dict_with_handles(entry, user_handle_val, actor_handle_val))
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"items": [_audit_to_dict(e) for e in entries],
|
"items": items,
|
||||||
"total": total,
|
"total": total,
|
||||||
"page": page,
|
"page": page,
|
||||||
"per_page": per_page,
|
"per_page": per_page,
|
||||||
@@ -122,7 +308,7 @@ async def list_audit_log(
|
|||||||
async def export_audit_log(
|
async def export_audit_log(
|
||||||
start: Optional[datetime] = Query(default=None),
|
start: Optional[datetime] = Query(default=None),
|
||||||
end: Optional[datetime] = Query(default=None),
|
end: Optional[datetime] = Query(default=None),
|
||||||
user_id: Optional[uuid.UUID] = Query(default=None),
|
user_handle: Optional[str] = Query(default=None),
|
||||||
event_type: Optional[str] = Query(default=None),
|
event_type: Optional[str] = Query(default=None),
|
||||||
format: str = Query(default="csv"), # noqa: A002
|
format: str = Query(default="csv"), # noqa: A002
|
||||||
session: AsyncSession = Depends(get_db),
|
session: AsyncSession = Depends(get_db),
|
||||||
@@ -130,20 +316,49 @@ async def export_audit_log(
|
|||||||
) -> StreamingResponse:
|
) -> StreamingResponse:
|
||||||
"""Stream a CSV export of filtered audit log entries (ADMIN-06).
|
"""Stream a CSV export of filtered audit log entries (ADMIN-06).
|
||||||
|
|
||||||
Uses the same _audit_to_dict() whitelist as the JSON viewer — no filename,
|
Uses the same _audit_to_dict_with_handles() whitelist as the JSON viewer —
|
||||||
extracted_text, or document content appears in the export (D-15, T-04-06-02).
|
includes user_handle and actor_handle; no filename, extracted_text, or
|
||||||
|
document content appears in the export (D-15, T-04-06-02, Pitfall 7).
|
||||||
|
|
||||||
Returns StreamingResponse with Content-Disposition: attachment; filename=audit-export.csv.
|
Returns StreamingResponse with Content-Disposition: attachment; filename=audit-export.csv.
|
||||||
|
|
||||||
|
user_handle filter: same handle-to-UUID resolution as the viewer (D-12).
|
||||||
"""
|
"""
|
||||||
q = _build_filtered_query(start, end, user_id, event_type)
|
# Handle-to-UUID resolution (D-12) — same logic as list_audit_log
|
||||||
|
user_uuid: Optional[uuid.UUID] = None
|
||||||
|
if user_handle:
|
||||||
|
handle_result = await session.execute(
|
||||||
|
select(User.id).where(User.handle == user_handle)
|
||||||
|
)
|
||||||
|
uid = handle_result.scalar_one_or_none()
|
||||||
|
if uid is None:
|
||||||
|
# Unknown handle — return empty CSV
|
||||||
|
empty_output = io.StringIO()
|
||||||
|
fields = [
|
||||||
|
"id", "event_type", "user_id", "actor_id", "user_handle", "actor_handle",
|
||||||
|
"resource_id", "ip_address", "metadata_", "created_at",
|
||||||
|
]
|
||||||
|
writer = csv.DictWriter(empty_output, fieldnames=fields)
|
||||||
|
writer.writeheader()
|
||||||
|
return StreamingResponse(
|
||||||
|
iter([empty_output.getvalue()]),
|
||||||
|
media_type="text/csv",
|
||||||
|
headers={"Content-Disposition": "attachment; filename=audit-export.csv"},
|
||||||
|
)
|
||||||
|
user_uuid = uid
|
||||||
|
|
||||||
|
# Data query with handle enrichment (Pitfall 7 — export must use enriched function)
|
||||||
|
q = _build_filtered_query_with_handles(start, end, user_uuid, event_type)
|
||||||
result = await session.execute(q)
|
result = await session.execute(q)
|
||||||
entries = result.scalars().all()
|
rows = result.all()
|
||||||
|
|
||||||
fields = [
|
fields = [
|
||||||
"id",
|
"id",
|
||||||
"event_type",
|
"event_type",
|
||||||
"user_id",
|
"user_id",
|
||||||
"actor_id",
|
"actor_id",
|
||||||
|
"user_handle",
|
||||||
|
"actor_handle",
|
||||||
"resource_id",
|
"resource_id",
|
||||||
"ip_address",
|
"ip_address",
|
||||||
"metadata_",
|
"metadata_",
|
||||||
@@ -152,8 +367,9 @@ async def export_audit_log(
|
|||||||
output = io.StringIO()
|
output = io.StringIO()
|
||||||
writer = csv.DictWriter(output, fieldnames=fields)
|
writer = csv.DictWriter(output, fieldnames=fields)
|
||||||
writer.writeheader()
|
writer.writeheader()
|
||||||
for entry in entries:
|
for row in rows:
|
||||||
writer.writerow(_audit_to_dict(entry))
|
entry, user_handle_val, actor_handle_val = row[0], row[1], row[2]
|
||||||
|
writer.writerow(_audit_to_dict_with_handles(entry, user_handle_val, actor_handle_val))
|
||||||
|
|
||||||
return StreamingResponse(
|
return StreamingResponse(
|
||||||
iter([output.getvalue()]),
|
iter([output.getvalue()]),
|
||||||
|
|||||||
@@ -178,8 +178,10 @@ async def test_audit_log_export_csv(async_client, admin_user, db_session):
|
|||||||
f"got '{content_disposition}'"
|
f"got '{content_disposition}'"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Phase 6.2: CSV now includes user_handle and actor_handle columns (D-11, Pitfall 7)
|
||||||
expected_header = (
|
expected_header = (
|
||||||
"id,event_type,user_id,actor_id,resource_id,ip_address,metadata_,created_at"
|
"id,event_type,user_id,actor_id,user_handle,actor_handle,"
|
||||||
|
"resource_id,ip_address,metadata_,created_at"
|
||||||
)
|
)
|
||||||
assert expected_header in response.text, (
|
assert expected_header in response.text, (
|
||||||
f"CSV header line not found in response. "
|
f"CSV header line not found in response. "
|
||||||
|
|||||||
Reference in New Issue
Block a user