8e6005cb73
- Add CloudConnectionOut Pydantic model (SEC-08): credentials_enc deliberately excluded
- Implement DELETE /api/admin/users/{id} (SEC-09): collects user docs, deletes MinIO
objects best-effort before DB delete; audit log written within same transaction
- Add write_audit_log calls to: create_user (admin.user_created), update_user_status
(admin.user_deactivated/admin.user_activated), update_user_quota (admin.quota_changed),
update_ai_config (admin.ai_provider_assigned), delete_user (admin.user_deleted)
- Add Request param to all admin state-changing handlers for IP extraction
- Fix test_admin_impersonation_not_found: accept 405 in addition to 404/422
(expected: DELETE /users/{id} exists now, so GET returns 405 — no impersonation
route still satisfied, just a different HTTP status for non-existent method)
358 lines
12 KiB
Python
358 lines
12 KiB
Python
"""
|
|
Admin API tests for DocuVault — TDD RED phase.
|
|
|
|
Tests cover:
|
|
- GET /api/admin/users (list users, requires admin)
|
|
- POST /api/admin/users (create user, password_must_change=True, weak pw 422)
|
|
- PATCH /api/admin/users/{id}/status (deactivate/reactivate)
|
|
- POST /api/admin/users/{id}/password-reset (sends email, no impersonation)
|
|
- GET /api/admin/users/{id}/quota (quota details)
|
|
- PATCH /api/admin/users/{id}/quota (quota adjust, warning when limit<used)
|
|
- PATCH /api/admin/users/{id}/ai-config (AI provider/model assignment)
|
|
- ADMIN-07: no impersonation route
|
|
- SEC-07: no password_hash in any response
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import uuid
|
|
|
|
import pytest
|
|
import pytest_asyncio
|
|
from httpx import ASGITransport, AsyncClient
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from db.models import Quota, User
|
|
from deps.auth import get_current_admin
|
|
from deps.db import get_db
|
|
from services.auth import hash_password
|
|
|
|
|
|
# ── Helpers ───────────────────────────────────────────────────────────────────
|
|
|
|
async def make_admin_user(session: AsyncSession) -> User:
|
|
"""Insert an admin User + Quota row and return the ORM object."""
|
|
user = User(
|
|
id=uuid.uuid4(),
|
|
handle=f"admin_{uuid.uuid4().hex[:6]}",
|
|
email=f"admin_{uuid.uuid4().hex[:6]}@example.com",
|
|
password_hash=hash_password("AdminPass1!Secret"),
|
|
role="admin",
|
|
is_active=True,
|
|
totp_enabled=False,
|
|
password_must_change=False,
|
|
)
|
|
session.add(user)
|
|
quota = Quota(user_id=user.id, limit_bytes=104857600, used_bytes=0)
|
|
session.add(quota)
|
|
await session.flush()
|
|
return user
|
|
|
|
|
|
async def make_regular_user(session: AsyncSession) -> User:
|
|
"""Insert a regular User + Quota row and return the ORM object."""
|
|
user = User(
|
|
id=uuid.uuid4(),
|
|
handle=f"user_{uuid.uuid4().hex[:6]}",
|
|
email=f"user_{uuid.uuid4().hex[:6]}@example.com",
|
|
password_hash=hash_password("UserPass1!Secret"),
|
|
role="user",
|
|
is_active=True,
|
|
totp_enabled=False,
|
|
password_must_change=False,
|
|
)
|
|
session.add(user)
|
|
quota = Quota(user_id=user.id, limit_bytes=104857600, used_bytes=0)
|
|
session.add(quota)
|
|
await session.flush()
|
|
return user
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def admin_client(db_session: AsyncSession):
|
|
"""Async client with get_current_admin overridden to an admin user."""
|
|
from main import app
|
|
|
|
admin = await make_admin_user(db_session)
|
|
app.dependency_overrides[get_db] = lambda: db_session
|
|
app.dependency_overrides[get_current_admin] = lambda: admin
|
|
|
|
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c:
|
|
yield c, admin, db_session
|
|
|
|
app.dependency_overrides.clear()
|
|
|
|
|
|
# ── Tests ─────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_users_requires_admin(async_client: AsyncClient):
|
|
"""GET /api/admin/users without admin dep → 403 (or 401 for missing token)."""
|
|
# async_client does NOT override get_current_admin, so the real dep fires.
|
|
# Without a Bearer token the HTTPBearer dependency returns 403.
|
|
resp = await async_client.get("/api/admin/users")
|
|
assert resp.status_code in {401, 403}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_users_as_admin(admin_client):
|
|
"""GET /api/admin/users with admin override → 200 with 'items' key."""
|
|
client, _admin, _session = admin_client
|
|
resp = await client.get("/api/admin/users")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert "items" in data
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_user_as_admin(admin_client):
|
|
"""POST /api/admin/users with valid body → 201; response has id/email; no password_hash."""
|
|
client, _admin, _session = admin_client
|
|
body = {
|
|
"handle": "newuser_abc",
|
|
"email": "newuser_abc@example.com",
|
|
"password": "ValidP@ss1word!",
|
|
"role": "user",
|
|
}
|
|
resp = await client.post("/api/admin/users", json=body)
|
|
assert resp.status_code == 201
|
|
data = resp.json()
|
|
assert "id" in data
|
|
assert "email" in data
|
|
assert "password_hash" not in data
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_user_sets_password_must_change(admin_client):
|
|
"""POST /api/admin/users → 201; DB confirms user.password_must_change is True."""
|
|
client, _admin, session = admin_client
|
|
body = {
|
|
"handle": "mustchange_user",
|
|
"email": "mustchange@example.com",
|
|
"password": "MustChange1@Secret",
|
|
"role": "user",
|
|
}
|
|
resp = await client.post("/api/admin/users", json=body)
|
|
assert resp.status_code == 201
|
|
user_id = uuid.UUID(resp.json()["id"])
|
|
user = await session.get(User, user_id)
|
|
assert user is not None
|
|
assert user.password_must_change is True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_user_weak_password(admin_client):
|
|
"""POST /api/admin/users with weak password → 422."""
|
|
client, _admin, _session = admin_client
|
|
body = {
|
|
"handle": "weakpwuser",
|
|
"email": "weakpw@example.com",
|
|
"password": "short",
|
|
"role": "user",
|
|
}
|
|
resp = await client.post("/api/admin/users", json=body)
|
|
assert resp.status_code == 422
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_user_duplicate_email(admin_client):
|
|
"""POST /api/admin/users with existing email → 409."""
|
|
client, admin, session = admin_client
|
|
body = {
|
|
"handle": "dupuser_x1",
|
|
"email": admin.email, # already exists
|
|
"password": "ValidP@ss1word!",
|
|
"role": "user",
|
|
}
|
|
resp = await client.post("/api/admin/users", json=body)
|
|
assert resp.status_code == 409
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_deactivate_user(admin_client):
|
|
"""PATCH /api/admin/users/{id}/status { is_active: false } → 200; user.is_active=False."""
|
|
client, _admin, session = admin_client
|
|
target = await make_regular_user(session)
|
|
resp = await client.patch(
|
|
f"/api/admin/users/{target.id}/status",
|
|
json={"is_active": False},
|
|
)
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["is_active"] is False
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_reactivate_user(admin_client):
|
|
"""PATCH /api/admin/users/{id}/status { is_active: true } → 200; user.is_active=True."""
|
|
client, _admin, session = admin_client
|
|
target = await make_regular_user(session)
|
|
target.is_active = False
|
|
session.add(target)
|
|
await session.flush()
|
|
|
|
resp = await client.patch(
|
|
f"/api/admin/users/{target.id}/status",
|
|
json={"is_active": True},
|
|
)
|
|
assert resp.status_code == 200
|
|
assert resp.json()["is_active"] is True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cannot_deactivate_only_admin(admin_client):
|
|
"""PATCH /api/admin/users/{id}/status to deactivate the only admin → 400."""
|
|
client, admin, _session = admin_client
|
|
resp = await client.patch(
|
|
f"/api/admin/users/{admin.id}/status",
|
|
json={"is_active": False},
|
|
)
|
|
assert resp.status_code == 400
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_password_reset_initiates_email(admin_client):
|
|
"""POST /api/admin/users/{id}/password-reset → 202; no token returned."""
|
|
from unittest.mock import patch
|
|
client, _admin, session = admin_client
|
|
target = await make_regular_user(session)
|
|
|
|
# Mock Celery task to avoid Redis connection in unit tests
|
|
with patch("tasks.email_tasks.send_reset_email.delay") as mock_delay:
|
|
resp = await client.post(f"/api/admin/users/{target.id}/password-reset")
|
|
# Verify email was dispatched (no return of token to caller)
|
|
mock_delay.assert_called_once()
|
|
call_args = mock_delay.call_args[0]
|
|
assert call_args[0] == target.email # sent to the right address
|
|
assert "token=" in call_args[1] # reset link contains token
|
|
|
|
assert resp.status_code == 202
|
|
data = resp.json()
|
|
# No token, no impersonation — just a message
|
|
assert "message" in data
|
|
assert "access_token" not in data
|
|
assert "token" not in data
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_quota(admin_client):
|
|
"""GET /api/admin/users/{id}/quota → 200 with expected quota fields."""
|
|
client, _admin, session = admin_client
|
|
target = await make_regular_user(session)
|
|
|
|
resp = await client.get(f"/api/admin/users/{target.id}/quota")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert "limit_bytes" in data
|
|
assert "used_bytes" in data
|
|
assert "limit_mb" in data
|
|
assert "used_mb" in data
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_quota(admin_client):
|
|
"""PATCH /api/admin/users/{id}/quota { limit_bytes: 52428800 } → 200 with new limit."""
|
|
client, _admin, session = admin_client
|
|
target = await make_regular_user(session)
|
|
|
|
resp = await client.patch(
|
|
f"/api/admin/users/{target.id}/quota",
|
|
json={"limit_bytes": 52428800},
|
|
)
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["limit_bytes"] == 52428800
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_quota_below_usage_warning(admin_client):
|
|
"""PATCH quota where new limit < used_bytes → 200 with warning=True."""
|
|
client, _admin, session = admin_client
|
|
target = await make_regular_user(session)
|
|
|
|
# Set used_bytes above the new limit we'll set
|
|
from sqlalchemy import update as sa_update
|
|
from db.models import Quota as QuotaModel
|
|
await session.execute(
|
|
sa_update(QuotaModel)
|
|
.where(QuotaModel.user_id == target.id)
|
|
.values(used_bytes=80000000)
|
|
)
|
|
await session.flush()
|
|
|
|
resp = await client.patch(
|
|
f"/api/admin/users/{target.id}/quota",
|
|
json={"limit_bytes": 10000000}, # less than 80 MB used
|
|
)
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["warning"] is True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_quota_zero_limit_invalid(admin_client):
|
|
"""PATCH /api/admin/users/{id}/quota { limit_bytes: 0 } → 422."""
|
|
client, _admin, session = admin_client
|
|
target = await make_regular_user(session)
|
|
|
|
resp = await client.patch(
|
|
f"/api/admin/users/{target.id}/quota",
|
|
json={"limit_bytes": 0},
|
|
)
|
|
assert resp.status_code == 422
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_ai_config(admin_client):
|
|
"""PATCH /api/admin/users/{id}/ai-config { ai_provider, ai_model } → 200."""
|
|
client, _admin, session = admin_client
|
|
target = await make_regular_user(session)
|
|
|
|
resp = await client.patch(
|
|
f"/api/admin/users/{target.id}/ai-config",
|
|
json={"ai_provider": "openai", "ai_model": "gpt-4o"},
|
|
)
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["ai_provider"] == "openai"
|
|
assert data["ai_model"] == "gpt-4o"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_admin_impersonation_not_found(async_client: AsyncClient):
|
|
"""GET /api/admin/users/impersonate → 404, 422, or 405 (no GET impersonation route).
|
|
|
|
Note: 405 is acceptable when DELETE /api/admin/users/{id} exists (Plan 04-07, SEC-09)
|
|
— the DELETE route is the user-delete endpoint, NOT impersonation. A 405 means
|
|
GET is not allowed, which satisfies the invariant that no impersonation GET endpoint
|
|
exists (ADMIN-07).
|
|
"""
|
|
# No admin override — just verifying the route doesn't exist at all.
|
|
resp = await async_client.get("/api/admin/users/impersonate")
|
|
# 404 = no route; 422 = id parse failed (impersonate is not a UUID) → acceptable
|
|
# 405 = Method Not Allowed (DELETE /users/{id} exists but no GET handler) → also acceptable
|
|
# 401/403 = route exists but blocked by auth → NOT acceptable (route should not exist)
|
|
assert resp.status_code in {404, 405, 422}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_admin_login_as_not_found(async_client: AsyncClient):
|
|
"""GET /api/admin/login-as → 404 or 405 (route does not exist)."""
|
|
resp = await async_client.get("/api/admin/login-as")
|
|
assert resp.status_code in {404, 405}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_admin_response_no_password_hash(admin_client):
|
|
"""GET /api/admin/users response items must not contain 'password_hash'."""
|
|
client, _admin, session = admin_client
|
|
await make_regular_user(session)
|
|
|
|
resp = await client.get("/api/admin/users")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert len(data["items"]) > 0
|
|
for item in data["items"]:
|
|
assert "password_hash" not in item
|
|
assert "credentials_enc" not in item
|