""" Security invariant tests — Phase 4 gaps SEC-08 and SEC-09. SEC-08: credentials_enc must be absent from all user-facing API responses. SEC-09: Admin DELETE /api/admin/users/{id} must call storage.delete_object for each document the target user owned before removing the DB row. """ from __future__ import annotations import uuid import pytest import pytest_asyncio from httpx import ASGITransport, AsyncClient from sqlalchemy.ext.asyncio import AsyncSession from db.models import Document, Quota, User from deps.auth import get_current_admin from deps.db import get_db from services.auth import hash_password, create_access_token # ── Shared helpers ──────────────────────────────────────────────────────────── async def _make_admin(session: AsyncSession) -> User: """Insert an admin User + Quota row; password = 'AdminPass1!Secret'.""" user = User( id=uuid.uuid4(), handle=f"admin_{uuid.uuid4().hex[:6]}", email=f"admin_{uuid.uuid4().hex[:6]}@example.com", password_hash=hash_password("AdminPass1!Secret"), role="admin", is_active=True, totp_enabled=False, password_must_change=False, ) session.add(user) session.add(Quota(user_id=user.id, limit_bytes=104857600, used_bytes=0)) await session.flush() return user async def _make_regular_user(session: AsyncSession) -> User: """Insert a regular User + Quota row.""" user = User( id=uuid.uuid4(), handle=f"user_{uuid.uuid4().hex[:6]}", email=f"user_{uuid.uuid4().hex[:6]}@example.com", password_hash=hash_password("UserPass1!Secret"), role="user", is_active=True, totp_enabled=False, password_must_change=False, ) session.add(user) session.add(Quota(user_id=user.id, limit_bytes=104857600, used_bytes=0)) await session.flush() return user async def _make_doc(session: AsyncSession, user: User, filename: str = "test.pdf") -> Document: """Insert a minimal Document row owned by *user*.""" doc_id = uuid.uuid4() doc = Document( id=doc_id, user_id=user.id, filename=filename, content_type="application/pdf", size_bytes=1024, storage_backend="minio", status="uploaded", object_key=f"{user.id}/{doc_id}/{uuid.uuid4()}.pdf", ) session.add(doc) await session.flush() return doc @pytest_asyncio.fixture async def admin_client(db_session: AsyncSession): """Async client with get_current_admin overridden; yields (client, admin, session).""" from main import app admin = await _make_admin(db_session) app.dependency_overrides[get_db] = lambda: db_session app.dependency_overrides[get_current_admin] = lambda: admin async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c: yield c, admin, db_session app.dependency_overrides.clear() # --------------------------------------------------------------------------- # SEC-08: credentials_enc absent from all user-facing API responses (task 4-07-01) # --------------------------------------------------------------------------- async def test_credentials_enc_not_in_response(async_client, auth_user, db_session): """No user-facing API response includes credentials_enc. SEC-08: checks GET /api/documents (list) and GET /api/documents/{id} (single doc). The implementation's _doc_to_dict whitelist must never include credentials_enc. """ doc = await _make_doc(db_session, auth_user["user"], "sec08_test.pdf") await db_session.commit() # 1. List endpoint list_resp = await async_client.get("/api/documents", headers=auth_user["headers"]) assert list_resp.status_code == 200, list_resp.text list_data = list_resp.json() assert list_data["total"] >= 1 for item in list_data["items"]: assert "credentials_enc" not in item, ( f"credentials_enc found in list item: {item}" ) # 2. Single document endpoint single_resp = await async_client.get( f"/api/documents/{doc.id}", headers=auth_user["headers"] ) assert single_resp.status_code == 200, single_resp.text single_data = single_resp.json() assert "credentials_enc" not in single_data, ( f"credentials_enc found in single doc response: {single_data}" ) # --------------------------------------------------------------------------- # SEC-09: Admin delete user triggers MinIO object deletion before DB removal # (task 4-07-02) # --------------------------------------------------------------------------- async def test_delete_user_cleans_files(admin_client, monkeypatch): """DELETE /api/admin/users/{id} calls delete_object for each user doc before DB removal. SEC-09: MinIO objects deleted BEFORE user row is removed from the database. Verifies: 1. delete_object is called at least once per document owned by the user. 2. The call happens (and is tracked) before the 204 response is returned. 3. The user is gone from the DB after the call. """ from unittest.mock import AsyncMock, patch client, admin, session = admin_client # Create a target regular user with 2 MinIO documents target = await _make_regular_user(session) doc1 = await _make_doc(session, target, "file1.pdf") doc2 = await _make_doc(session, target, "file2.pdf") await session.commit() deleted_keys: list[str] = [] async def _fake_delete_object(self_inst, object_key: str) -> None: deleted_keys.append(object_key) # Patch the MinIO backend's delete_object so we can observe calls. # The fake must accept self as first positional arg because it replaces # an instance method on the class. from storage.minio_backend import MinIOBackend monkeypatch.setattr(MinIOBackend, "delete_object", _fake_delete_object, raising=False) resp = await client.request( "DELETE", f"/api/admin/users/{target.id}", json={"admin_password": "AdminPass1!Secret"}, ) assert resp.status_code == 204, f"Expected 204, got {resp.status_code}: {resp.text}" # delete_object must have been called for BOTH documents assert doc1.object_key in deleted_keys, ( f"delete_object not called for doc1.object_key={doc1.object_key!r}; " f"called keys: {deleted_keys}" ) assert doc2.object_key in deleted_keys, ( f"delete_object not called for doc2.object_key={doc2.object_key!r}; " f"called keys: {deleted_keys}" ) # Confirm the user is gone from the DB from sqlalchemy import select as sa_select result = await session.execute(sa_select(User).where(User.id == target.id)) assert result.scalar_one_or_none() is None, ( "User row still present in DB after admin delete" )