62daf0d750
Add 6 new tests covering document sort (name/size), FTS search cross-user isolation, credentials_enc exclusion from all responses, and MinIO object cleanup on user deletion. Fix FTS try/except misplacement in api/documents.py — was wrapping the ORM statement builder (never raises) instead of the execute call, causing HTTP 500 on SQLite test env. Now falls back to unfiltered results when @@ unsupported. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
189 lines
6.8 KiB
Python
189 lines
6.8 KiB
Python
"""
|
|
Security invariant tests — Phase 4 gaps SEC-08 and SEC-09.
|
|
|
|
SEC-08: credentials_enc must be absent from all user-facing API responses.
|
|
SEC-09: Admin DELETE /api/admin/users/{id} must call storage.delete_object for
|
|
each document the target user owned before removing the DB row.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import uuid
|
|
|
|
import pytest
|
|
import pytest_asyncio
|
|
from httpx import ASGITransport, AsyncClient
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from db.models import Document, Quota, User
|
|
from deps.auth import get_current_admin
|
|
from deps.db import get_db
|
|
from services.auth import hash_password, create_access_token
|
|
|
|
|
|
# ── Shared helpers ────────────────────────────────────────────────────────────
|
|
|
|
|
|
async def _make_admin(session: AsyncSession) -> User:
|
|
"""Insert an admin User + Quota row; password = 'AdminPass1!Secret'."""
|
|
user = User(
|
|
id=uuid.uuid4(),
|
|
handle=f"admin_{uuid.uuid4().hex[:6]}",
|
|
email=f"admin_{uuid.uuid4().hex[:6]}@example.com",
|
|
password_hash=hash_password("AdminPass1!Secret"),
|
|
role="admin",
|
|
is_active=True,
|
|
totp_enabled=False,
|
|
password_must_change=False,
|
|
)
|
|
session.add(user)
|
|
session.add(Quota(user_id=user.id, limit_bytes=104857600, used_bytes=0))
|
|
await session.flush()
|
|
return user
|
|
|
|
|
|
async def _make_regular_user(session: AsyncSession) -> User:
|
|
"""Insert a regular User + Quota row."""
|
|
user = User(
|
|
id=uuid.uuid4(),
|
|
handle=f"user_{uuid.uuid4().hex[:6]}",
|
|
email=f"user_{uuid.uuid4().hex[:6]}@example.com",
|
|
password_hash=hash_password("UserPass1!Secret"),
|
|
role="user",
|
|
is_active=True,
|
|
totp_enabled=False,
|
|
password_must_change=False,
|
|
)
|
|
session.add(user)
|
|
session.add(Quota(user_id=user.id, limit_bytes=104857600, used_bytes=0))
|
|
await session.flush()
|
|
return user
|
|
|
|
|
|
async def _make_doc(session: AsyncSession, user: User, filename: str = "test.pdf") -> Document:
|
|
"""Insert a minimal Document row owned by *user*."""
|
|
doc_id = uuid.uuid4()
|
|
doc = Document(
|
|
id=doc_id,
|
|
user_id=user.id,
|
|
filename=filename,
|
|
content_type="application/pdf",
|
|
size_bytes=1024,
|
|
storage_backend="minio",
|
|
status="uploaded",
|
|
object_key=f"{user.id}/{doc_id}/{uuid.uuid4()}.pdf",
|
|
)
|
|
session.add(doc)
|
|
await session.flush()
|
|
return doc
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def admin_client(db_session: AsyncSession):
|
|
"""Async client with get_current_admin overridden; yields (client, admin, session)."""
|
|
from main import app
|
|
|
|
admin = await _make_admin(db_session)
|
|
app.dependency_overrides[get_db] = lambda: db_session
|
|
app.dependency_overrides[get_current_admin] = lambda: admin
|
|
|
|
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c:
|
|
yield c, admin, db_session
|
|
|
|
app.dependency_overrides.clear()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SEC-08: credentials_enc absent from all user-facing API responses (task 4-07-01)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def test_credentials_enc_not_in_response(async_client, auth_user, db_session):
|
|
"""No user-facing API response includes credentials_enc.
|
|
|
|
SEC-08: checks GET /api/documents (list) and GET /api/documents/{id} (single doc).
|
|
The implementation's _doc_to_dict whitelist must never include credentials_enc.
|
|
"""
|
|
doc = await _make_doc(db_session, auth_user["user"], "sec08_test.pdf")
|
|
await db_session.commit()
|
|
|
|
# 1. List endpoint
|
|
list_resp = await async_client.get("/api/documents", headers=auth_user["headers"])
|
|
assert list_resp.status_code == 200, list_resp.text
|
|
list_data = list_resp.json()
|
|
assert list_data["total"] >= 1
|
|
for item in list_data["items"]:
|
|
assert "credentials_enc" not in item, (
|
|
f"credentials_enc found in list item: {item}"
|
|
)
|
|
|
|
# 2. Single document endpoint
|
|
single_resp = await async_client.get(
|
|
f"/api/documents/{doc.id}", headers=auth_user["headers"]
|
|
)
|
|
assert single_resp.status_code == 200, single_resp.text
|
|
single_data = single_resp.json()
|
|
assert "credentials_enc" not in single_data, (
|
|
f"credentials_enc found in single doc response: {single_data}"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SEC-09: Admin delete user triggers MinIO object deletion before DB removal
|
|
# (task 4-07-02)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def test_delete_user_cleans_files(admin_client, monkeypatch):
|
|
"""DELETE /api/admin/users/{id} calls delete_object for each user doc before DB removal.
|
|
|
|
SEC-09: MinIO objects deleted BEFORE user row is removed from the database.
|
|
Verifies:
|
|
1. delete_object is called at least once per document owned by the user.
|
|
2. The call happens (and is tracked) before the 204 response is returned.
|
|
3. The user is gone from the DB after the call.
|
|
"""
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
client, admin, session = admin_client
|
|
|
|
# Create a target regular user with 2 MinIO documents
|
|
target = await _make_regular_user(session)
|
|
doc1 = await _make_doc(session, target, "file1.pdf")
|
|
doc2 = await _make_doc(session, target, "file2.pdf")
|
|
await session.commit()
|
|
|
|
deleted_keys: list[str] = []
|
|
|
|
async def _fake_delete_object(self_inst, object_key: str) -> None:
|
|
deleted_keys.append(object_key)
|
|
|
|
# Patch the MinIO backend's delete_object so we can observe calls.
|
|
# The fake must accept self as first positional arg because it replaces
|
|
# an instance method on the class.
|
|
from storage.minio_backend import MinIOBackend
|
|
monkeypatch.setattr(MinIOBackend, "delete_object", _fake_delete_object, raising=False)
|
|
|
|
resp = await client.request(
|
|
"DELETE",
|
|
f"/api/admin/users/{target.id}",
|
|
json={"admin_password": "AdminPass1!Secret"},
|
|
)
|
|
assert resp.status_code == 204, f"Expected 204, got {resp.status_code}: {resp.text}"
|
|
|
|
# delete_object must have been called for BOTH documents
|
|
assert doc1.object_key in deleted_keys, (
|
|
f"delete_object not called for doc1.object_key={doc1.object_key!r}; "
|
|
f"called keys: {deleted_keys}"
|
|
)
|
|
assert doc2.object_key in deleted_keys, (
|
|
f"delete_object not called for doc2.object_key={doc2.object_key!r}; "
|
|
f"called keys: {deleted_keys}"
|
|
)
|
|
|
|
# Confirm the user is gone from the DB
|
|
from sqlalchemy import select as sa_select
|
|
result = await session.execute(sa_select(User).where(User.id == target.id))
|
|
assert result.scalar_one_or_none() is None, (
|
|
"User row still present in DB after admin delete"
|
|
)
|