Files
kite/backend/tests/conftest.py
T
curo1305 21ec9cb4c3 test(03-01): add Wave 0 xfail stubs and shared fixtures for Phase 3
- Add auth_user, admin_user, mock_minio_presigned, mock_minio_stat fixtures to conftest.py
- Create test_quota.py with 4 xfail stubs (STORE-03, STORE-05, STORE-06, SC2 race)
- Append test_migration_0003 to test_alembic.py (full pre-seed + post-migration assertions)
- Append 3 classifier xfail stubs (DOC-03, DOC-05, D-15)
- Append 6 document xfail stubs (D-05, STORE-04, SEC-04, D-16)
- Append 4 topic xfail stubs (DOC-04, D-09, D-17)
- Append test_settings_endpoint_removed stub (D-12)
- All 19 new test IDs collect cleanly with xfail(strict=False)
2026-05-23 13:42:37 +02:00

305 lines
10 KiB
Python

"""
pytest configuration for DocuVault backend tests.
Plan 05 cutover: all sync flat-file fixtures (isolated_data_dir, sync client)
removed. Tests use async fixtures only.
Service availability detection:
- INTEGRATION=1 env var: assume live Docker services are available
- Default (no INTEGRATION): use in-memory SQLite + skip tests requiring real
PostgreSQL/MinIO/Redis
SQLite compatibility note:
The ORM models use PostgreSQL-specific types (UUID, INET, JSONB). SQLite does
not understand these. The db_session fixture patches them before creating
tables so the in-memory engine can build the schema successfully.
"""
from __future__ import annotations
import os
import socket
import pytest
import pytest_asyncio
from httpx import ASGITransport, AsyncClient
from sqlalchemy import String, Text
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.pool import StaticPool
# ── Service availability ──────────────────────────────────────────────────────
def _port_open(host: str, port: int, timeout: float = 1.0) -> bool:
"""Return True if the given TCP port is reachable."""
try:
with socket.create_connection((host, port), timeout=timeout):
return True
except OSError:
return False
@pytest.fixture(scope="session")
def live_services_available():
"""True when Docker Compose services are reachable (or INTEGRATION=1 is set)."""
if os.environ.get("INTEGRATION") == "1":
return True
return (
_port_open("localhost", 5432)
and _port_open("localhost", 9000)
and _port_open("localhost", 6379)
)
# ── Core async fixtures ───────────────────────────────────────────────────────
def _patch_pg_types_for_sqlite():
"""Patch PostgreSQL-specific column types so SQLite can create the schema.
SQLite does not know about INET, UUID (as_uuid=True), or JSONB. We
replace them with Text/String equivalents for the in-memory test engine.
This is done by monkey-patching the dialect-type mapping rather than
modifying the models.
"""
try:
from sqlalchemy.dialects.postgresql import UUID as PG_UUID, INET, JSONB
# Override compile methods so SQLite renders them as TEXT
for pg_type in (INET, JSONB):
pg_type.__class_getitem__ = classmethod(lambda cls, item: cls())
# Patch impl so SQLite uses String
if not hasattr(INET, "_sqlite_patched"):
INET.impl = String
INET._sqlite_patched = True
if not hasattr(JSONB, "_sqlite_patched"):
JSONB.impl = Text
JSONB._sqlite_patched = True
except Exception:
pass # If patching fails, the fixture will raise a CompileError naturally
@pytest_asyncio.fixture
async def db_session():
"""In-memory async SQLite session for unit tests.
PostgreSQL-specific column types are overridden to Text/String so that
Base.metadata.create_all works against the SQLite dialect.
"""
from sqlalchemy.dialects.sqlite.base import SQLiteTypeCompiler
from sqlalchemy.dialects.postgresql import INET, JSONB
from db.models import Base
# ── Type compatibility shims ──────────────────────────────────────────────
# PostgreSQL-specific types (INET, JSONB) are unknown to the SQLite dialect.
# Temporarily add visit_* methods that render them as TEXT so that
# Base.metadata.create_all can build the schema in SQLite.
_orig_visit_INET = getattr(SQLiteTypeCompiler, "visit_INET", None)
_orig_visit_JSONB = getattr(SQLiteTypeCompiler, "visit_JSONB", None)
def _visit_inet(self, type_, **kw):
return "TEXT"
def _visit_jsonb(self, type_, **kw):
return "TEXT"
SQLiteTypeCompiler.visit_INET = _visit_inet # type: ignore[attr-defined]
SQLiteTypeCompiler.visit_JSONB = _visit_jsonb # type: ignore[attr-defined]
# UUID(as_uuid=True) renders as CHAR(32) in SQLite — already handled by
# SQLAlchemy's built-in UUID type mapping — no patch needed.
engine = create_async_engine(
"sqlite+aiosqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
try:
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
AsyncTestSession = async_sessionmaker(engine, expire_on_commit=False)
async with AsyncTestSession() as session:
yield session
finally:
await engine.dispose()
# Restore compiler methods to leave no side effects on other tests
if _orig_visit_INET is not None:
SQLiteTypeCompiler.visit_INET = _orig_visit_INET # type: ignore
else:
try:
del SQLiteTypeCompiler.visit_INET # type: ignore
except AttributeError:
pass
if _orig_visit_JSONB is not None:
SQLiteTypeCompiler.visit_JSONB = _orig_visit_JSONB # type: ignore
else:
try:
del SQLiteTypeCompiler.visit_JSONB # type: ignore
except AttributeError:
pass
@pytest_asyncio.fixture
async def async_client(db_session: AsyncSession):
"""Async HTTP test client with the DB dependency overridden to use in-memory SQLite."""
from deps.db import get_db
from main import app
app.dependency_overrides[get_db] = lambda: db_session
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c:
yield c
app.dependency_overrides.clear()
# ── File fixtures ─────────────────────────────────────────────────────────────
@pytest.fixture
def sample_txt(tmp_path):
p = tmp_path / "sample.txt"
p.write_text("This is a test document about invoices and finance.")
return p
@pytest.fixture
def sample_pdf(tmp_path):
"""Create a minimal valid PDF for testing."""
import fitz
doc = fitz.open()
page = doc.new_page()
page.insert_text((50, 50), "Test PDF document about contracts and legal matters.")
pdf_path = tmp_path / "sample.pdf"
doc.save(str(pdf_path))
doc.close()
return pdf_path
# ── Phase 3 shared fixtures ───────────────────────────────────────────────────
# These fixtures are used by test_quota.py, test_documents.py, test_topics.py,
# and test_classifier.py in Plans 03-02 through 03-04.
@pytest_asyncio.fixture
async def auth_user(db_session: AsyncSession):
"""Create a regular user with a Quota row and return auth context.
Returns dict with keys:
- user: User ORM instance
- token: signed JWT access token
- headers: {"Authorization": "Bearer <token>"}
The fixture issues a valid access token via services.auth.create_access_token
so that get_current_user accepts it in downstream endpoint tests.
"""
import uuid as _uuid
from db.models import User, Quota
from services.auth import hash_password, create_access_token
user_id = _uuid.uuid4()
user = User(
id=user_id,
handle=f"testuser_{user_id.hex[:8]}",
email=f"testuser_{user_id.hex[:8]}@example.com",
password_hash=hash_password("Testpassword123!"),
role="user",
is_active=True,
password_must_change=False,
)
quota = Quota(
user_id=user_id,
limit_bytes=104857600, # 100 MB
used_bytes=0,
)
db_session.add(user)
db_session.add(quota)
await db_session.commit()
token = create_access_token(str(user_id), "user")
return {
"user": user,
"token": token,
"headers": {"Authorization": f"Bearer {token}"},
}
@pytest_asyncio.fixture
async def admin_user(db_session: AsyncSession):
"""Create an admin user with a Quota row and return auth context.
Returns the same dict shape as auth_user but with role="admin".
"""
import uuid as _uuid
from db.models import User, Quota
from services.auth import hash_password, create_access_token
user_id = _uuid.uuid4()
user = User(
id=user_id,
handle=f"adminuser_{user_id.hex[:8]}",
email=f"adminuser_{user_id.hex[:8]}@example.com",
password_hash=hash_password("Testpassword123!"),
role="admin",
is_active=True,
password_must_change=False,
)
quota = Quota(
user_id=user_id,
limit_bytes=104857600,
used_bytes=0,
)
db_session.add(user)
db_session.add(quota)
await db_session.commit()
token = create_access_token(str(user_id), "admin")
return {
"user": user,
"token": token,
"headers": {"Authorization": f"Bearer {token}"},
}
@pytest.fixture
def mock_minio_presigned(monkeypatch):
"""Patch MinIOBackend.generate_presigned_put_url with an AsyncMock.
The patched method does not exist yet — it is added in Plan 03-02.
Using raising=False ensures the patch installs before the attribute exists.
Yields the AsyncMock so tests can assert call counts and args.
"""
from unittest.mock import AsyncMock
mock = AsyncMock(return_value="http://localhost:9000/docuvault/test-presigned-url")
try:
from storage.minio_backend import MinIOBackend
monkeypatch.setattr(MinIOBackend, "generate_presigned_put_url", mock, raising=False)
except ImportError:
pass # storage module not yet available — patch is best-effort
yield mock
@pytest.fixture
def mock_minio_stat(monkeypatch):
"""Patch MinIOBackend.stat_object with an AsyncMock returning 1024 bytes.
The patched method does not exist yet — it is added in Plan 03-02.
Using raising=False ensures the patch installs before the attribute exists.
Yields the AsyncMock for per-test customization:
mock_minio_stat.return_value = 50_000_000
"""
from unittest.mock import AsyncMock
mock = AsyncMock(return_value=1024)
try:
from storage.minio_backend import MinIOBackend
monkeypatch.setattr(MinIOBackend, "stat_object", mock, raising=False)
except ImportError:
pass # storage module not yet available — patch is best-effort
yield mock