""" Audit log API tests — ADMIN-06. Requirement: ADMIN-06 — admin audit log viewer, no doc content, export CSV. Tests: - test_audit_log_viewer: paginated JSON viewer returns seeded entry - test_audit_log_no_doc_content: response items never expose filename / extracted_text - test_audit_log_regular_user_403: regular users are blocked with 403 - test_audit_log_export_csv: CSV export returns correct headers and CSV structure """ from __future__ import annotations import pytest pytestmark = pytest.mark.asyncio # --------------------------------------------------------------------------- # Seed helper # --------------------------------------------------------------------------- async def _seed_audit(db_session, user_id) -> None: """Insert one AuditLog row and commit. Imports write_audit_log inside the function body to avoid top-level import ordering issues when conftest patches db models before this module loads. """ from services.audit import write_audit_log await write_audit_log( session=db_session, event_type="document.uploaded", user_id=user_id, actor_id=user_id, resource_id=None, ip_address=None, metadata_={"size_bytes": 100}, ) await db_session.commit() # --------------------------------------------------------------------------- # ADMIN-06: Audit log viewer # --------------------------------------------------------------------------- async def test_audit_log_viewer(async_client, admin_user, db_session): """GET /api/admin/audit-log returns paginated entries with correct shape.""" await _seed_audit(db_session, admin_user["user"].id) response = await async_client.get( "/api/admin/audit-log", headers=admin_user["headers"], ) assert response.status_code == 200 body = response.json() # Pagination envelope keys for key in ("items", "total", "page", "per_page"): assert key in body, f"missing key '{key}' in response body" assert body["total"] >= 1, "expected at least 1 audit entry after seeding" items = body["items"] assert isinstance(items, list), "items must be a list" assert len(items) >= 1, "items list must be non-empty" first = items[0] for key in ("id", "event_type", "user_id", "created_at"): assert key in first, f"missing key '{key}' in audit item" async def test_audit_log_no_doc_content(async_client, admin_user, db_session): """Audit log items must never contain filename, extracted_text, password_hash, or credentials_enc in any field — including nested inside metadata_ (ADMIN-06, D-15).""" await _seed_audit(db_session, admin_user["user"].id) response = await async_client.get( "/api/admin/audit-log", headers=admin_user["headers"], ) assert response.status_code == 200 items = response.json()["items"] forbidden_keys = {"filename", "extracted_text", "password_hash", "credentials_enc"} for item in items: # Top-level key check for key in forbidden_keys: assert key not in item, ( f"forbidden key '{key}' found at top level of audit item" ) # Nested metadata_ check — same forbidden set as top-level (WR-01) meta = item.get("metadata_") if isinstance(meta, dict): for key in forbidden_keys: assert key not in meta, ( f"forbidden key '{key}' found inside metadata_ of audit item" ) async def test_audit_log_filter_by_event_type(async_client, admin_user, db_session): """GET /api/admin/audit-log?event_type=X returns only matching entries (ADMIN-06, SC3).""" from services.audit import write_audit_log # Seed two entries with distinct event types await write_audit_log( session=db_session, event_type="document.uploaded", user_id=admin_user["user"].id, actor_id=admin_user["user"].id, resource_id=None, ip_address=None, metadata_={"size_bytes": 100}, ) await write_audit_log( session=db_session, event_type="share.granted", user_id=admin_user["user"].id, actor_id=admin_user["user"].id, resource_id=None, ip_address=None, metadata_={"recipient_id": "test"}, ) await db_session.commit() response = await async_client.get( "/api/admin/audit-log", params={"event_type": "document.uploaded"}, headers=admin_user["headers"], ) assert response.status_code == 200 body = response.json() assert body["total"] >= 1, "expected at least one filtered result" # Every returned item must match the filter for item in body["items"]: assert item["event_type"] == "document.uploaded", ( f"filter returned unexpected event_type: {item['event_type']}" ) async def test_audit_log_regular_user_403(async_client, auth_user): """GET /api/admin/audit-log with a regular user token must return 403.""" response = await async_client.get( "/api/admin/audit-log", headers=auth_user["headers"], ) assert response.status_code == 403 async def test_audit_log_export_csv(async_client, admin_user, db_session): """GET /api/admin/audit-log/export?format=csv returns CSV with correct headers.""" await _seed_audit(db_session, admin_user["user"].id) response = await async_client.get( "/api/admin/audit-log/export", params={"format": "csv"}, headers=admin_user["headers"], ) assert response.status_code == 200 content_type = response.headers.get("content-type", "") assert content_type.startswith("text/csv"), ( f"expected content-type to start with 'text/csv', got '{content_type}'" ) content_disposition = response.headers.get("content-disposition", "") assert "audit-export.csv" in content_disposition, ( f"expected content-disposition to contain 'audit-export.csv', " f"got '{content_disposition}'" ) # Phase 6.2: CSV now includes user_handle and actor_handle columns (D-11, Pitfall 7) expected_header = ( "id,event_type,user_id,actor_id,user_handle,actor_handle," "resource_id,ip_address,metadata_,created_at" ) assert expected_header in response.text, ( f"CSV header line not found in response. " f"First 200 chars: {response.text[:200]!r}" ) # D-15: CSV export must not contain document content or sensitive fields (WR-02) forbidden_csv = ("filename", "extracted_text", "password_hash", "credentials_enc") for key in forbidden_csv: assert key not in response.text, ( f"forbidden field '{key}' found in CSV export body" ) # --------------------------------------------------------------------------- # Phase 6.2 — ADMIN-06 audit enrichment + daily exports (promoted stubs) # --------------------------------------------------------------------------- async def test_audit_log_includes_user_handle(async_client, admin_user, db_session): """Audit log items include user_handle and actor_handle strings (D-11)""" await _seed_audit(db_session, admin_user["user"].id) response = await async_client.get( "/api/admin/audit-log", headers=admin_user["headers"], ) assert response.status_code == 200 body = response.json() items = body["items"] assert len(items) >= 1, "expected at least one seeded audit entry" first = items[0] assert "user_handle" in first, "missing key 'user_handle' in audit item" assert "actor_handle" in first, "missing key 'actor_handle' in audit item" # The seeded entry was created for admin_user — handle must match assert first["user_handle"] == admin_user["user"].handle, ( f"expected user_handle={admin_user['user'].handle!r}, got {first['user_handle']!r}" ) async def test_audit_log_filter_by_handle(async_client, admin_user, db_session, second_auth_user): """GET /api/admin/audit-log?user_handle=X filters to matching entries (D-12)""" # Seed one entry for admin_user and one for second_auth_user await _seed_audit(db_session, admin_user["user"].id) await _seed_audit(db_session, second_auth_user["user"].id) response = await async_client.get( "/api/admin/audit-log", params={"user_handle": admin_user["user"].handle}, headers=admin_user["headers"], ) assert response.status_code == 200 body = response.json() items = body["items"] assert len(items) >= 1, "expected at least one filtered entry for admin_user" for item in items: assert item["user_handle"] == admin_user["user"].handle, ( f"filter returned entry for wrong user: {item['user_handle']!r}" ) # Second user's entry must not appear second_handle = second_auth_user["user"].handle assert not any(item["user_handle"] == second_handle for item in items), ( f"second user's entry appeared in filtered results" ) async def test_audit_log_filter_unknown_handle(async_client, admin_user, db_session): """GET /api/admin/audit-log?user_handle=unknown returns empty items list, not 422 (D-12)""" response = await async_client.get( "/api/admin/audit-log", params={"user_handle": "definitely_does_not_exist"}, headers=admin_user["headers"], ) assert response.status_code == 200, ( f"expected 200 for unknown handle, got {response.status_code}: {response.text[:200]}" ) body = response.json() assert body["items"] == [], f"expected empty items list, got {body['items']}" assert body["total"] == 0, f"expected total=0, got {body['total']}" async def test_daily_exports_list(async_client, admin_user): """GET /api/admin/audit-log/daily-exports returns {items: [...]} (D-15)""" from unittest.mock import MagicMock, patch # Create fake MinIO objects fake_obj1 = MagicMock() fake_obj1.object_name = "audit-logs/2026-05-30.csv" fake_obj1.is_dir = False fake_obj2 = MagicMock() fake_obj2.object_name = "audit-logs/2026-05-29.csv" fake_obj2.is_dir = False mock_client = MagicMock() mock_client.list_objects.return_value = iter([fake_obj1, fake_obj2]) mock_backend = MagicMock() mock_backend._client = mock_client from storage.minio_backend import MinIOBackend with patch("api.audit.get_storage_backend", return_value=mock_backend), \ patch("api.audit.MinIOBackend", MinIOBackend): response = await async_client.get( "/api/admin/audit-log/daily-exports", headers=admin_user["headers"], ) assert response.status_code == 200 body = response.json() assert "items" in body, f"expected 'items' key in response, got: {body}" items = body["items"] assert isinstance(items, list) # Items must be sorted descending by date if len(items) >= 2: dates = [item["date"] for item in items] assert dates == sorted(dates, reverse=True), ( f"expected dates sorted descending, got {dates}" ) async def test_daily_export_download(async_client, admin_user): """GET /api/admin/audit-log/daily-exports/{date} returns CSV bytes with Content-Disposition (D-16)""" from unittest.mock import MagicMock, patch from storage.minio_backend import MinIOBackend fake_csv = b"id,event_type,user_id\n1,document.uploaded,abc\n" mock_response = MagicMock() mock_response.read.return_value = fake_csv mock_response.close.return_value = None mock_response.release_conn.return_value = None mock_client = MagicMock() mock_client.get_object.return_value = mock_response mock_backend = MagicMock(spec=MinIOBackend) mock_backend._client = mock_client with patch("api.audit.get_storage_backend", return_value=mock_backend): response = await async_client.get( "/api/admin/audit-log/daily-exports/2026-05-30", headers=admin_user["headers"], ) assert response.status_code == 200 content_type = response.headers.get("content-type", "") assert "text/csv" in content_type, ( f"expected content-type text/csv, got {content_type!r}" ) content_disposition = response.headers.get("content-disposition", "") assert "2026-05-30" in content_disposition, ( f"expected '2026-05-30' in Content-Disposition, got {content_disposition!r}" ) # Invalid date must return 404 with patch("api.audit.get_storage_backend", return_value=mock_backend): bad_response = await async_client.get( "/api/admin/audit-log/daily-exports/invalid-date", headers=admin_user["headers"], ) assert bad_response.status_code == 404, ( f"expected 404 for invalid date, got {bad_response.status_code}" )