Files
kite/backend/tests/test_audit.py
T
curo1305 7271eeb53c test(06.2-01): add xfail stubs for ADMIN-06 audit enrichment + daily exports (Task 3)
- test_audit_log_includes_user_handle: user_handle and actor_handle in audit items (D-11)
- test_audit_log_filter_by_handle: user_handle query param filters entries (D-12)
- test_audit_log_filter_unknown_handle: unknown handle returns empty list, not 422 (D-12)
- test_daily_exports_list: daily-exports listing endpoint returns {items} (D-15)
- test_daily_export_download: daily export download returns CSV bytes with Content-Disposition (D-16)
2026-05-31 11:58:05 +02:00

225 lines
7.8 KiB
Python

"""
Audit log API tests — ADMIN-06.
Requirement: ADMIN-06 — admin audit log viewer, no doc content, export CSV.
Tests:
- test_audit_log_viewer: paginated JSON viewer returns seeded entry
- test_audit_log_no_doc_content: response items never expose filename / extracted_text
- test_audit_log_regular_user_403: regular users are blocked with 403
- test_audit_log_export_csv: CSV export returns correct headers and CSV structure
"""
from __future__ import annotations
import pytest
pytestmark = pytest.mark.asyncio
# ---------------------------------------------------------------------------
# Seed helper
# ---------------------------------------------------------------------------
async def _seed_audit(db_session, user_id) -> None:
"""Insert one AuditLog row and commit.
Imports write_audit_log inside the function body to avoid top-level import
ordering issues when conftest patches db models before this module loads.
"""
from services.audit import write_audit_log
await write_audit_log(
session=db_session,
event_type="document.uploaded",
user_id=user_id,
actor_id=user_id,
resource_id=None,
ip_address=None,
metadata_={"size_bytes": 100},
)
await db_session.commit()
# ---------------------------------------------------------------------------
# ADMIN-06: Audit log viewer
# ---------------------------------------------------------------------------
async def test_audit_log_viewer(async_client, admin_user, db_session):
"""GET /api/admin/audit-log returns paginated entries with correct shape."""
await _seed_audit(db_session, admin_user["user"].id)
response = await async_client.get(
"/api/admin/audit-log",
headers=admin_user["headers"],
)
assert response.status_code == 200
body = response.json()
# Pagination envelope keys
for key in ("items", "total", "page", "per_page"):
assert key in body, f"missing key '{key}' in response body"
assert body["total"] >= 1, "expected at least 1 audit entry after seeding"
items = body["items"]
assert isinstance(items, list), "items must be a list"
assert len(items) >= 1, "items list must be non-empty"
first = items[0]
for key in ("id", "event_type", "user_id", "created_at"):
assert key in first, f"missing key '{key}' in audit item"
async def test_audit_log_no_doc_content(async_client, admin_user, db_session):
"""Audit log items must never contain filename, extracted_text, password_hash,
or credentials_enc in any field — including nested inside metadata_ (ADMIN-06, D-15)."""
await _seed_audit(db_session, admin_user["user"].id)
response = await async_client.get(
"/api/admin/audit-log",
headers=admin_user["headers"],
)
assert response.status_code == 200
items = response.json()["items"]
forbidden_keys = {"filename", "extracted_text", "password_hash", "credentials_enc"}
for item in items:
# Top-level key check
for key in forbidden_keys:
assert key not in item, (
f"forbidden key '{key}' found at top level of audit item"
)
# Nested metadata_ check — same forbidden set as top-level (WR-01)
meta = item.get("metadata_")
if isinstance(meta, dict):
for key in forbidden_keys:
assert key not in meta, (
f"forbidden key '{key}' found inside metadata_ of audit item"
)
async def test_audit_log_filter_by_event_type(async_client, admin_user, db_session):
"""GET /api/admin/audit-log?event_type=X returns only matching entries (ADMIN-06, SC3)."""
from services.audit import write_audit_log
# Seed two entries with distinct event types
await write_audit_log(
session=db_session,
event_type="document.uploaded",
user_id=admin_user["user"].id,
actor_id=admin_user["user"].id,
resource_id=None,
ip_address=None,
metadata_={"size_bytes": 100},
)
await write_audit_log(
session=db_session,
event_type="share.granted",
user_id=admin_user["user"].id,
actor_id=admin_user["user"].id,
resource_id=None,
ip_address=None,
metadata_={"recipient_id": "test"},
)
await db_session.commit()
response = await async_client.get(
"/api/admin/audit-log",
params={"event_type": "document.uploaded"},
headers=admin_user["headers"],
)
assert response.status_code == 200
body = response.json()
assert body["total"] >= 1, "expected at least one filtered result"
# Every returned item must match the filter
for item in body["items"]:
assert item["event_type"] == "document.uploaded", (
f"filter returned unexpected event_type: {item['event_type']}"
)
async def test_audit_log_regular_user_403(async_client, auth_user):
"""GET /api/admin/audit-log with a regular user token must return 403."""
response = await async_client.get(
"/api/admin/audit-log",
headers=auth_user["headers"],
)
assert response.status_code == 403
async def test_audit_log_export_csv(async_client, admin_user, db_session):
"""GET /api/admin/audit-log/export?format=csv returns CSV with correct headers."""
await _seed_audit(db_session, admin_user["user"].id)
response = await async_client.get(
"/api/admin/audit-log/export",
params={"format": "csv"},
headers=admin_user["headers"],
)
assert response.status_code == 200
content_type = response.headers.get("content-type", "")
assert content_type.startswith("text/csv"), (
f"expected content-type to start with 'text/csv', got '{content_type}'"
)
content_disposition = response.headers.get("content-disposition", "")
assert "audit-export.csv" in content_disposition, (
f"expected content-disposition to contain 'audit-export.csv', "
f"got '{content_disposition}'"
)
expected_header = (
"id,event_type,user_id,actor_id,resource_id,ip_address,metadata_,created_at"
)
assert expected_header in response.text, (
f"CSV header line not found in response. "
f"First 200 chars: {response.text[:200]!r}"
)
# D-15: CSV export must not contain document content or sensitive fields (WR-02)
forbidden_csv = ("filename", "extracted_text", "password_hash", "credentials_enc")
for key in forbidden_csv:
assert key not in response.text, (
f"forbidden field '{key}' found in CSV export body"
)
# ---------------------------------------------------------------------------
# Phase 6.2 Wave 0 xfail stubs — ADMIN-06 audit enrichment + daily exports
# ---------------------------------------------------------------------------
async def test_audit_log_includes_user_handle(async_client, admin_user, db_session):
"""Audit log items include user_handle and actor_handle strings (D-11)"""
pytest.xfail("Phase 6.2 — not implemented yet")
async def test_audit_log_filter_by_handle(async_client, admin_user, db_session):
"""GET /api/admin/audit-log?user_handle=X filters to matching entries (D-12)"""
pytest.xfail("Phase 6.2 — not implemented yet")
async def test_audit_log_filter_unknown_handle(async_client, admin_user, db_session):
"""GET /api/admin/audit-log?user_handle=unknown returns empty items list, not 422 (D-12)"""
pytest.xfail("Phase 6.2 — not implemented yet")
async def test_daily_exports_list(async_client, admin_user):
"""GET /api/admin/audit-log/daily-exports returns {items: [...]} (D-15)"""
pytest.xfail("Phase 6.2 — not implemented yet")
async def test_daily_export_download(async_client, admin_user):
"""GET /api/admin/audit-log/daily-exports/{date} returns CSV bytes with Content-Disposition (D-16)"""
pytest.xfail("Phase 6.2 — not implemented yet")