""" Admin API tests for DocuVault — TDD RED phase. Tests cover: - GET /api/admin/users (list users, requires admin) - POST /api/admin/users (create user, password_must_change=True, weak pw 422) - PATCH /api/admin/users/{id}/status (deactivate/reactivate) - POST /api/admin/users/{id}/password-reset (sends email, no impersonation) - GET /api/admin/users/{id}/quota (quota details) - PATCH /api/admin/users/{id}/quota (quota adjust, warning when limit User: """Insert an admin User + Quota row and return the ORM object.""" user = User( id=uuid.uuid4(), handle=f"admin_{uuid.uuid4().hex[:6]}", email=f"admin_{uuid.uuid4().hex[:6]}@example.com", password_hash=hash_password("AdminPass1!Secret"), role="admin", is_active=True, totp_enabled=False, password_must_change=False, ) session.add(user) quota = Quota(user_id=user.id, limit_bytes=104857600, used_bytes=0) session.add(quota) await session.flush() return user async def make_regular_user(session: AsyncSession) -> User: """Insert a regular User + Quota row and return the ORM object.""" user = User( id=uuid.uuid4(), handle=f"user_{uuid.uuid4().hex[:6]}", email=f"user_{uuid.uuid4().hex[:6]}@example.com", password_hash=hash_password("UserPass1!Secret"), role="user", is_active=True, totp_enabled=False, password_must_change=False, ) session.add(user) quota = Quota(user_id=user.id, limit_bytes=104857600, used_bytes=0) session.add(quota) await session.flush() return user @pytest_asyncio.fixture async def admin_client(db_session: AsyncSession): """Async client with get_current_admin overridden to an admin user.""" from main import app admin = await make_admin_user(db_session) app.dependency_overrides[get_db] = lambda: db_session app.dependency_overrides[get_current_admin] = lambda: admin async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c: yield c, admin, db_session app.dependency_overrides.clear() # ── Tests ───────────────────────────────────────────────────────────────────── @pytest.mark.asyncio async def test_list_users_requires_admin(async_client: AsyncClient): """GET /api/admin/users without admin dep → 403 (or 401 for missing token).""" # async_client does NOT override get_current_admin, so the real dep fires. # Without a Bearer token the HTTPBearer dependency returns 403. resp = await async_client.get("/api/admin/users") assert resp.status_code in {401, 403} @pytest.mark.asyncio async def test_list_users_as_admin(admin_client): """GET /api/admin/users with admin override → 200 with 'items' key.""" client, _admin, _session = admin_client resp = await client.get("/api/admin/users") assert resp.status_code == 200 data = resp.json() assert "items" in data @pytest.mark.asyncio async def test_create_user_as_admin(admin_client): """POST /api/admin/users with valid body → 201; response has id/email; no password_hash.""" client, _admin, _session = admin_client body = { "handle": "newuser_abc", "email": "newuser_abc@example.com", "password": "ValidP@ss1word!", "role": "user", } resp = await client.post("/api/admin/users", json=body) assert resp.status_code == 201 data = resp.json() assert "id" in data assert "email" in data assert "password_hash" not in data @pytest.mark.asyncio async def test_create_user_sets_password_must_change(admin_client): """POST /api/admin/users → 201; DB confirms user.password_must_change is True.""" client, _admin, session = admin_client body = { "handle": "mustchange_user", "email": "mustchange@example.com", "password": "MustChange1@Secret", "role": "user", } resp = await client.post("/api/admin/users", json=body) assert resp.status_code == 201 user_id = uuid.UUID(resp.json()["id"]) user = await session.get(User, user_id) assert user is not None assert user.password_must_change is True @pytest.mark.asyncio async def test_create_user_weak_password(admin_client): """POST /api/admin/users with weak password → 422.""" client, _admin, _session = admin_client body = { "handle": "weakpwuser", "email": "weakpw@example.com", "password": "short", "role": "user", } resp = await client.post("/api/admin/users", json=body) assert resp.status_code == 422 @pytest.mark.asyncio async def test_create_user_duplicate_email(admin_client): """POST /api/admin/users with existing email → 409.""" client, admin, session = admin_client body = { "handle": "dupuser_x1", "email": admin.email, # already exists "password": "ValidP@ss1word!", "role": "user", } resp = await client.post("/api/admin/users", json=body) assert resp.status_code == 409 @pytest.mark.asyncio async def test_deactivate_user(admin_client): """PATCH /api/admin/users/{id}/status { is_active: false } → 200; user.is_active=False.""" client, _admin, session = admin_client target = await make_regular_user(session) resp = await client.patch( f"/api/admin/users/{target.id}/status", json={"is_active": False}, ) assert resp.status_code == 200 data = resp.json() assert data["is_active"] is False @pytest.mark.asyncio async def test_reactivate_user(admin_client): """PATCH /api/admin/users/{id}/status { is_active: true } → 200; user.is_active=True.""" client, _admin, session = admin_client target = await make_regular_user(session) target.is_active = False session.add(target) await session.flush() resp = await client.patch( f"/api/admin/users/{target.id}/status", json={"is_active": True}, ) assert resp.status_code == 200 assert resp.json()["is_active"] is True @pytest.mark.asyncio async def test_cannot_deactivate_only_admin(admin_client): """PATCH /api/admin/users/{id}/status to deactivate the only admin → 400.""" client, admin, _session = admin_client resp = await client.patch( f"/api/admin/users/{admin.id}/status", json={"is_active": False}, ) assert resp.status_code == 400 @pytest.mark.asyncio async def test_password_reset_initiates_email(admin_client): """POST /api/admin/users/{id}/password-reset → 202; no token returned.""" from unittest.mock import patch client, _admin, session = admin_client target = await make_regular_user(session) # Mock Celery task to avoid Redis connection in unit tests with patch("tasks.email_tasks.send_reset_email.delay") as mock_delay: resp = await client.post(f"/api/admin/users/{target.id}/password-reset") # Verify email was dispatched (no return of token to caller) mock_delay.assert_called_once() call_args = mock_delay.call_args[0] assert call_args[0] == target.email # sent to the right address assert "token=" in call_args[1] # reset link contains token assert resp.status_code == 202 data = resp.json() # No token, no impersonation — just a message assert "message" in data assert "access_token" not in data assert "token" not in data @pytest.mark.asyncio async def test_get_quota(admin_client): """GET /api/admin/users/{id}/quota → 200 with expected quota fields.""" client, _admin, session = admin_client target = await make_regular_user(session) resp = await client.get(f"/api/admin/users/{target.id}/quota") assert resp.status_code == 200 data = resp.json() assert "limit_bytes" in data assert "used_bytes" in data assert "limit_mb" in data assert "used_mb" in data @pytest.mark.asyncio async def test_update_quota(admin_client): """PATCH /api/admin/users/{id}/quota { limit_bytes: 52428800 } → 200 with new limit.""" client, _admin, session = admin_client target = await make_regular_user(session) resp = await client.patch( f"/api/admin/users/{target.id}/quota", json={"limit_bytes": 52428800}, ) assert resp.status_code == 200 data = resp.json() assert data["limit_bytes"] == 52428800 @pytest.mark.asyncio async def test_quota_below_usage_warning(admin_client): """PATCH quota where new limit < used_bytes → 200 with warning=True.""" client, _admin, session = admin_client target = await make_regular_user(session) # Set used_bytes above the new limit we'll set from sqlalchemy import update as sa_update from db.models import Quota as QuotaModel await session.execute( sa_update(QuotaModel) .where(QuotaModel.user_id == target.id) .values(used_bytes=80000000) ) await session.flush() resp = await client.patch( f"/api/admin/users/{target.id}/quota", json={"limit_bytes": 10000000}, # less than 80 MB used ) assert resp.status_code == 200 data = resp.json() assert data["warning"] is True @pytest.mark.asyncio async def test_quota_zero_limit_invalid(admin_client): """PATCH /api/admin/users/{id}/quota { limit_bytes: 0 } → 422.""" client, _admin, session = admin_client target = await make_regular_user(session) resp = await client.patch( f"/api/admin/users/{target.id}/quota", json={"limit_bytes": 0}, ) assert resp.status_code == 422 @pytest.mark.asyncio async def test_update_ai_config(admin_client): """PATCH /api/admin/users/{id}/ai-config { ai_provider, ai_model } → 200.""" client, _admin, session = admin_client target = await make_regular_user(session) resp = await client.patch( f"/api/admin/users/{target.id}/ai-config", json={"ai_provider": "openai", "ai_model": "gpt-4o"}, ) assert resp.status_code == 200 data = resp.json() assert data["ai_provider"] == "openai" assert data["ai_model"] == "gpt-4o" @pytest.mark.asyncio async def test_admin_impersonation_not_found(async_client: AsyncClient): """GET /api/admin/users/impersonate → 404, 422, or 405 (no GET impersonation route). Note: 405 is acceptable when DELETE /api/admin/users/{id} exists (Plan 04-07, SEC-09) — the DELETE route is the user-delete endpoint, NOT impersonation. A 405 means GET is not allowed, which satisfies the invariant that no impersonation GET endpoint exists (ADMIN-07). """ # No admin override — just verifying the route doesn't exist at all. resp = await async_client.get("/api/admin/users/impersonate") # 404 = no route; 422 = id parse failed (impersonate is not a UUID) → acceptable # 405 = Method Not Allowed (DELETE /users/{id} exists but no GET handler) → also acceptable # 401/403 = route exists but blocked by auth → NOT acceptable (route should not exist) assert resp.status_code in {404, 405, 422} @pytest.mark.asyncio async def test_admin_login_as_not_found(async_client: AsyncClient): """GET /api/admin/login-as → 404 or 405 (route does not exist).""" resp = await async_client.get("/api/admin/login-as") assert resp.status_code in {404, 405} @pytest.mark.asyncio async def test_admin_response_no_password_hash(admin_client): """GET /api/admin/users response items must not contain 'password_hash'.""" client, _admin, session = admin_client await make_regular_user(session) resp = await client.get("/api/admin/users") assert resp.status_code == 200 data = resp.json() assert len(data["items"]) > 0 for item in data["items"]: assert "password_hash" not in item assert "credentials_enc" not in item