Files
kite/backend/tests/test_admin_api.py
T

343 lines
12 KiB
Python

"""
Admin API tests for DocuVault — TDD RED phase.
Tests cover:
- GET /api/admin/users (list users, requires admin)
- POST /api/admin/users (create user, password_must_change=True, weak pw 422)
- PATCH /api/admin/users/{id}/status (deactivate/reactivate)
- POST /api/admin/users/{id}/password-reset (sends email, no impersonation)
- GET /api/admin/users/{id}/quota (quota details)
- PATCH /api/admin/users/{id}/quota (quota adjust, warning when limit<used)
- PATCH /api/admin/users/{id}/ai-config (AI provider/model assignment)
- ADMIN-07: no impersonation route
- SEC-07: no password_hash in any response
"""
from __future__ import annotations
import uuid
import pytest
import pytest_asyncio
from httpx import ASGITransport, AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession
from db.models import Quota, User
from deps.auth import get_current_admin
from deps.db import get_db
from services.auth import hash_password
# ── Helpers ───────────────────────────────────────────────────────────────────
async def make_admin_user(session: AsyncSession) -> User:
"""Insert an admin User + Quota row and return the ORM object."""
user = User(
id=uuid.uuid4(),
handle=f"admin_{uuid.uuid4().hex[:6]}",
email=f"admin_{uuid.uuid4().hex[:6]}@example.com",
password_hash=hash_password("AdminPass1!Secret"),
role="admin",
is_active=True,
totp_enabled=False,
password_must_change=False,
)
session.add(user)
quota = Quota(user_id=user.id, limit_bytes=104857600, used_bytes=0)
session.add(quota)
await session.flush()
return user
async def make_regular_user(session: AsyncSession) -> User:
"""Insert a regular User + Quota row and return the ORM object."""
user = User(
id=uuid.uuid4(),
handle=f"user_{uuid.uuid4().hex[:6]}",
email=f"user_{uuid.uuid4().hex[:6]}@example.com",
password_hash=hash_password("UserPass1!Secret"),
role="user",
is_active=True,
totp_enabled=False,
password_must_change=False,
)
session.add(user)
quota = Quota(user_id=user.id, limit_bytes=104857600, used_bytes=0)
session.add(quota)
await session.flush()
return user
@pytest_asyncio.fixture
async def admin_client(db_session: AsyncSession):
"""Async client with get_current_admin overridden to an admin user."""
from main import app
admin = await make_admin_user(db_session)
app.dependency_overrides[get_db] = lambda: db_session
app.dependency_overrides[get_current_admin] = lambda: admin
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c:
yield c, admin, db_session
app.dependency_overrides.clear()
# ── Tests ─────────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_list_users_requires_admin(async_client: AsyncClient):
"""GET /api/admin/users without admin dep → 403 (or 401 for missing token)."""
# async_client does NOT override get_current_admin, so the real dep fires.
# Without a Bearer token the HTTPBearer dependency returns 403.
resp = await async_client.get("/api/admin/users")
assert resp.status_code in {401, 403}
@pytest.mark.asyncio
async def test_list_users_as_admin(admin_client):
"""GET /api/admin/users with admin override → 200 with 'items' key."""
client, _admin, _session = admin_client
resp = await client.get("/api/admin/users")
assert resp.status_code == 200
data = resp.json()
assert "items" in data
@pytest.mark.asyncio
async def test_create_user_as_admin(admin_client):
"""POST /api/admin/users with valid body → 201; response has id/email; no password_hash."""
client, _admin, _session = admin_client
body = {
"handle": "newuser_abc",
"email": "newuser_abc@example.com",
"password": "ValidP@ss1word!",
"role": "user",
}
resp = await client.post("/api/admin/users", json=body)
assert resp.status_code == 201
data = resp.json()
assert "id" in data
assert "email" in data
assert "password_hash" not in data
@pytest.mark.asyncio
async def test_create_user_sets_password_must_change(admin_client):
"""POST /api/admin/users → 201; DB confirms user.password_must_change is True."""
client, _admin, session = admin_client
body = {
"handle": "mustchange_user",
"email": "mustchange@example.com",
"password": "MustChange1@Secret",
"role": "user",
}
resp = await client.post("/api/admin/users", json=body)
assert resp.status_code == 201
user_id = uuid.UUID(resp.json()["id"])
user = await session.get(User, user_id)
assert user is not None
assert user.password_must_change is True
@pytest.mark.asyncio
async def test_create_user_weak_password(admin_client):
"""POST /api/admin/users with weak password → 422."""
client, _admin, _session = admin_client
body = {
"handle": "weakpwuser",
"email": "weakpw@example.com",
"password": "short",
"role": "user",
}
resp = await client.post("/api/admin/users", json=body)
assert resp.status_code == 422
@pytest.mark.asyncio
async def test_create_user_duplicate_email(admin_client):
"""POST /api/admin/users with existing email → 409."""
client, admin, session = admin_client
body = {
"handle": "dupuser_x1",
"email": admin.email, # already exists
"password": "ValidP@ss1word!",
"role": "user",
}
resp = await client.post("/api/admin/users", json=body)
assert resp.status_code == 409
@pytest.mark.asyncio
async def test_deactivate_user(admin_client):
"""PATCH /api/admin/users/{id}/status { is_active: false } → 200; user.is_active=False."""
client, _admin, session = admin_client
target = await make_regular_user(session)
resp = await client.patch(
f"/api/admin/users/{target.id}/status",
json={"is_active": False},
)
assert resp.status_code == 200
data = resp.json()
assert data["is_active"] is False
@pytest.mark.asyncio
async def test_reactivate_user(admin_client):
"""PATCH /api/admin/users/{id}/status { is_active: true } → 200; user.is_active=True."""
client, _admin, session = admin_client
target = await make_regular_user(session)
target.is_active = False
session.add(target)
await session.flush()
resp = await client.patch(
f"/api/admin/users/{target.id}/status",
json={"is_active": True},
)
assert resp.status_code == 200
assert resp.json()["is_active"] is True
@pytest.mark.asyncio
async def test_cannot_deactivate_only_admin(admin_client):
"""PATCH /api/admin/users/{id}/status to deactivate the only admin → 400."""
client, admin, _session = admin_client
resp = await client.patch(
f"/api/admin/users/{admin.id}/status",
json={"is_active": False},
)
assert resp.status_code == 400
@pytest.mark.asyncio
async def test_password_reset_initiates_email(admin_client):
"""POST /api/admin/users/{id}/password-reset → 202; no token returned."""
client, _admin, session = admin_client
target = await make_regular_user(session)
resp = await client.post(f"/api/admin/users/{target.id}/password-reset")
assert resp.status_code == 202
data = resp.json()
# No token, no impersonation — just a message
assert "message" in data
assert "access_token" not in data
assert "token" not in data
@pytest.mark.asyncio
async def test_get_quota(admin_client):
"""GET /api/admin/users/{id}/quota → 200 with expected quota fields."""
client, _admin, session = admin_client
target = await make_regular_user(session)
resp = await client.get(f"/api/admin/users/{target.id}/quota")
assert resp.status_code == 200
data = resp.json()
assert "limit_bytes" in data
assert "used_bytes" in data
assert "limit_mb" in data
assert "used_mb" in data
@pytest.mark.asyncio
async def test_update_quota(admin_client):
"""PATCH /api/admin/users/{id}/quota { limit_bytes: 52428800 } → 200 with new limit."""
client, _admin, session = admin_client
target = await make_regular_user(session)
resp = await client.patch(
f"/api/admin/users/{target.id}/quota",
json={"limit_bytes": 52428800},
)
assert resp.status_code == 200
data = resp.json()
assert data["limit_bytes"] == 52428800
@pytest.mark.asyncio
async def test_quota_below_usage_warning(admin_client):
"""PATCH quota where new limit < used_bytes → 200 with warning=True."""
client, _admin, session = admin_client
target = await make_regular_user(session)
# Set used_bytes above the new limit we'll set
from sqlalchemy import update as sa_update
from db.models import Quota as QuotaModel
await session.execute(
sa_update(QuotaModel)
.where(QuotaModel.user_id == target.id)
.values(used_bytes=80000000)
)
await session.flush()
resp = await client.patch(
f"/api/admin/users/{target.id}/quota",
json={"limit_bytes": 10000000}, # less than 80 MB used
)
assert resp.status_code == 200
data = resp.json()
assert data["warning"] is True
@pytest.mark.asyncio
async def test_quota_zero_limit_invalid(admin_client):
"""PATCH /api/admin/users/{id}/quota { limit_bytes: 0 } → 422."""
client, _admin, session = admin_client
target = await make_regular_user(session)
resp = await client.patch(
f"/api/admin/users/{target.id}/quota",
json={"limit_bytes": 0},
)
assert resp.status_code == 422
@pytest.mark.asyncio
async def test_update_ai_config(admin_client):
"""PATCH /api/admin/users/{id}/ai-config { ai_provider, ai_model } → 200."""
client, _admin, session = admin_client
target = await make_regular_user(session)
resp = await client.patch(
f"/api/admin/users/{target.id}/ai-config",
json={"ai_provider": "openai", "ai_model": "gpt-4o"},
)
assert resp.status_code == 200
data = resp.json()
assert data["ai_provider"] == "openai"
assert data["ai_model"] == "gpt-4o"
@pytest.mark.asyncio
async def test_admin_impersonation_not_found(async_client: AsyncClient):
"""GET /api/admin/users/impersonate → 404 or 422 (route does not exist)."""
# No admin override — just verifying the route doesn't exist at all.
resp = await async_client.get("/api/admin/users/impersonate")
# 404 = no route; 422 = id parse failed (impersonate is not a UUID) → acceptable
# 401/403 = route exists but blocked by auth → NOT acceptable (route should not exist)
# We accept 404 or 422 only.
assert resp.status_code in {404, 422}
@pytest.mark.asyncio
async def test_admin_login_as_not_found(async_client: AsyncClient):
"""GET /api/admin/login-as → 404 or 405 (route does not exist)."""
resp = await async_client.get("/api/admin/login-as")
assert resp.status_code in {404, 405}
@pytest.mark.asyncio
async def test_admin_response_no_password_hash(admin_client):
"""GET /api/admin/users response items must not contain 'password_hash'."""
client, _admin, session = admin_client
await make_regular_user(session)
resp = await client.get("/api/admin/users")
assert resp.status_code == 200
data = resp.json()
assert len(data["items"]) > 0
for item in data["items"]:
assert "password_hash" not in item
assert "credentials_enc" not in item