From cbad9acac10ae3926c2ad9fbe356384757eb089d Mon Sep 17 00:00:00 2001 From: curo1305 Date: Fri, 22 May 2026 19:59:16 +0200 Subject: [PATCH] =?UTF-8?q?test(02-04):=20RED=20phase=20=E2=80=94=20admin?= =?UTF-8?q?=20API=20test=20suite=20(11=20tests,=20expect=20fail=20until=20?= =?UTF-8?q?admin.py=20exists)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/tests/test_admin_api.py | 342 ++++++++++++++++++++++++++++++++ 1 file changed, 342 insertions(+) create mode 100644 backend/tests/test_admin_api.py diff --git a/backend/tests/test_admin_api.py b/backend/tests/test_admin_api.py new file mode 100644 index 0000000..111c5eb --- /dev/null +++ b/backend/tests/test_admin_api.py @@ -0,0 +1,342 @@ +""" +Admin API tests for DocuVault — TDD RED phase. + +Tests cover: + - GET /api/admin/users (list users, requires admin) + - POST /api/admin/users (create user, password_must_change=True, weak pw 422) + - PATCH /api/admin/users/{id}/status (deactivate/reactivate) + - POST /api/admin/users/{id}/password-reset (sends email, no impersonation) + - GET /api/admin/users/{id}/quota (quota details) + - PATCH /api/admin/users/{id}/quota (quota adjust, warning when limit User: + """Insert an admin User + Quota row and return the ORM object.""" + user = User( + id=uuid.uuid4(), + handle=f"admin_{uuid.uuid4().hex[:6]}", + email=f"admin_{uuid.uuid4().hex[:6]}@example.com", + password_hash=hash_password("AdminPass1!Secret"), + role="admin", + is_active=True, + totp_enabled=False, + password_must_change=False, + ) + session.add(user) + quota = Quota(user_id=user.id, limit_bytes=104857600, used_bytes=0) + session.add(quota) + await session.flush() + return user + + +async def make_regular_user(session: AsyncSession) -> User: + """Insert a regular User + Quota row and return the ORM object.""" + user = User( + id=uuid.uuid4(), + handle=f"user_{uuid.uuid4().hex[:6]}", + email=f"user_{uuid.uuid4().hex[:6]}@example.com", + password_hash=hash_password("UserPass1!Secret"), + role="user", + is_active=True, + totp_enabled=False, + password_must_change=False, + ) + session.add(user) + quota = Quota(user_id=user.id, limit_bytes=104857600, used_bytes=0) + session.add(quota) + await session.flush() + return user + + +@pytest_asyncio.fixture +async def admin_client(db_session: AsyncSession): + """Async client with get_current_admin overridden to an admin user.""" + from main import app + + admin = await make_admin_user(db_session) + app.dependency_overrides[get_db] = lambda: db_session + app.dependency_overrides[get_current_admin] = lambda: admin + + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c: + yield c, admin, db_session + + app.dependency_overrides.clear() + + +# ── Tests ───────────────────────────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_list_users_requires_admin(async_client: AsyncClient): + """GET /api/admin/users without admin dep → 403 (or 401 for missing token).""" + # async_client does NOT override get_current_admin, so the real dep fires. + # Without a Bearer token the HTTPBearer dependency returns 403. + resp = await async_client.get("/api/admin/users") + assert resp.status_code in {401, 403} + + +@pytest.mark.asyncio +async def test_list_users_as_admin(admin_client): + """GET /api/admin/users with admin override → 200 with 'items' key.""" + client, _admin, _session = admin_client + resp = await client.get("/api/admin/users") + assert resp.status_code == 200 + data = resp.json() + assert "items" in data + + +@pytest.mark.asyncio +async def test_create_user_as_admin(admin_client): + """POST /api/admin/users with valid body → 201; response has id/email; no password_hash.""" + client, _admin, _session = admin_client + body = { + "handle": "newuser_abc", + "email": "newuser_abc@example.com", + "password": "ValidP@ss1word!", + "role": "user", + } + resp = await client.post("/api/admin/users", json=body) + assert resp.status_code == 201 + data = resp.json() + assert "id" in data + assert "email" in data + assert "password_hash" not in data + + +@pytest.mark.asyncio +async def test_create_user_sets_password_must_change(admin_client): + """POST /api/admin/users → 201; DB confirms user.password_must_change is True.""" + client, _admin, session = admin_client + body = { + "handle": "mustchange_user", + "email": "mustchange@example.com", + "password": "MustChange1@Secret", + "role": "user", + } + resp = await client.post("/api/admin/users", json=body) + assert resp.status_code == 201 + user_id = uuid.UUID(resp.json()["id"]) + user = await session.get(User, user_id) + assert user is not None + assert user.password_must_change is True + + +@pytest.mark.asyncio +async def test_create_user_weak_password(admin_client): + """POST /api/admin/users with weak password → 422.""" + client, _admin, _session = admin_client + body = { + "handle": "weakpwuser", + "email": "weakpw@example.com", + "password": "short", + "role": "user", + } + resp = await client.post("/api/admin/users", json=body) + assert resp.status_code == 422 + + +@pytest.mark.asyncio +async def test_create_user_duplicate_email(admin_client): + """POST /api/admin/users with existing email → 409.""" + client, admin, session = admin_client + body = { + "handle": "dupuser_x1", + "email": admin.email, # already exists + "password": "ValidP@ss1word!", + "role": "user", + } + resp = await client.post("/api/admin/users", json=body) + assert resp.status_code == 409 + + +@pytest.mark.asyncio +async def test_deactivate_user(admin_client): + """PATCH /api/admin/users/{id}/status { is_active: false } → 200; user.is_active=False.""" + client, _admin, session = admin_client + target = await make_regular_user(session) + resp = await client.patch( + f"/api/admin/users/{target.id}/status", + json={"is_active": False}, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["is_active"] is False + + +@pytest.mark.asyncio +async def test_reactivate_user(admin_client): + """PATCH /api/admin/users/{id}/status { is_active: true } → 200; user.is_active=True.""" + client, _admin, session = admin_client + target = await make_regular_user(session) + target.is_active = False + session.add(target) + await session.flush() + + resp = await client.patch( + f"/api/admin/users/{target.id}/status", + json={"is_active": True}, + ) + assert resp.status_code == 200 + assert resp.json()["is_active"] is True + + +@pytest.mark.asyncio +async def test_cannot_deactivate_only_admin(admin_client): + """PATCH /api/admin/users/{id}/status to deactivate the only admin → 400.""" + client, admin, _session = admin_client + resp = await client.patch( + f"/api/admin/users/{admin.id}/status", + json={"is_active": False}, + ) + assert resp.status_code == 400 + + +@pytest.mark.asyncio +async def test_password_reset_initiates_email(admin_client): + """POST /api/admin/users/{id}/password-reset → 202; no token returned.""" + client, _admin, session = admin_client + target = await make_regular_user(session) + + resp = await client.post(f"/api/admin/users/{target.id}/password-reset") + assert resp.status_code == 202 + data = resp.json() + # No token, no impersonation — just a message + assert "message" in data + assert "access_token" not in data + assert "token" not in data + + +@pytest.mark.asyncio +async def test_get_quota(admin_client): + """GET /api/admin/users/{id}/quota → 200 with expected quota fields.""" + client, _admin, session = admin_client + target = await make_regular_user(session) + + resp = await client.get(f"/api/admin/users/{target.id}/quota") + assert resp.status_code == 200 + data = resp.json() + assert "limit_bytes" in data + assert "used_bytes" in data + assert "limit_mb" in data + assert "used_mb" in data + + +@pytest.mark.asyncio +async def test_update_quota(admin_client): + """PATCH /api/admin/users/{id}/quota { limit_bytes: 52428800 } → 200 with new limit.""" + client, _admin, session = admin_client + target = await make_regular_user(session) + + resp = await client.patch( + f"/api/admin/users/{target.id}/quota", + json={"limit_bytes": 52428800}, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["limit_bytes"] == 52428800 + + +@pytest.mark.asyncio +async def test_quota_below_usage_warning(admin_client): + """PATCH quota where new limit < used_bytes → 200 with warning=True.""" + client, _admin, session = admin_client + target = await make_regular_user(session) + + # Set used_bytes above the new limit we'll set + from sqlalchemy import update as sa_update + from db.models import Quota as QuotaModel + await session.execute( + sa_update(QuotaModel) + .where(QuotaModel.user_id == target.id) + .values(used_bytes=80000000) + ) + await session.flush() + + resp = await client.patch( + f"/api/admin/users/{target.id}/quota", + json={"limit_bytes": 10000000}, # less than 80 MB used + ) + assert resp.status_code == 200 + data = resp.json() + assert data["warning"] is True + + +@pytest.mark.asyncio +async def test_quota_zero_limit_invalid(admin_client): + """PATCH /api/admin/users/{id}/quota { limit_bytes: 0 } → 422.""" + client, _admin, session = admin_client + target = await make_regular_user(session) + + resp = await client.patch( + f"/api/admin/users/{target.id}/quota", + json={"limit_bytes": 0}, + ) + assert resp.status_code == 422 + + +@pytest.mark.asyncio +async def test_update_ai_config(admin_client): + """PATCH /api/admin/users/{id}/ai-config { ai_provider, ai_model } → 200.""" + client, _admin, session = admin_client + target = await make_regular_user(session) + + resp = await client.patch( + f"/api/admin/users/{target.id}/ai-config", + json={"ai_provider": "openai", "ai_model": "gpt-4o"}, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["ai_provider"] == "openai" + assert data["ai_model"] == "gpt-4o" + + +@pytest.mark.asyncio +async def test_admin_impersonation_not_found(async_client: AsyncClient): + """GET /api/admin/users/impersonate → 404 or 422 (route does not exist).""" + # No admin override — just verifying the route doesn't exist at all. + resp = await async_client.get("/api/admin/users/impersonate") + # 404 = no route; 422 = id parse failed (impersonate is not a UUID) → acceptable + # 401/403 = route exists but blocked by auth → NOT acceptable (route should not exist) + # We accept 404 or 422 only. + assert resp.status_code in {404, 422} + + +@pytest.mark.asyncio +async def test_admin_login_as_not_found(async_client: AsyncClient): + """GET /api/admin/login-as → 404 or 405 (route does not exist).""" + resp = await async_client.get("/api/admin/login-as") + assert resp.status_code in {404, 405} + + +@pytest.mark.asyncio +async def test_admin_response_no_password_hash(admin_client): + """GET /api/admin/users response items must not contain 'password_hash'.""" + client, _admin, session = admin_client + await make_regular_user(session) + + resp = await client.get("/api/admin/users") + assert resp.status_code == 200 + data = resp.json() + assert len(data["items"]) > 0 + for item in data["items"]: + assert "password_hash" not in item + assert "credentials_enc" not in item