Files
curo1305 b7df9719c2 feat(6.1-01): add second_auth_user fixture to conftest.py
- Add @pytest_asyncio.fixture second_auth_user with handle prefix 'user2_'
- Creates User + Quota row following the same pattern as auth_user
- Returns {user, token, headers} dict shape for use in sharing tests
2026-05-30 23:09:39 +02:00

448 lines
15 KiB
Python

"""
pytest configuration for DocuVault backend tests.
Plan 05 cutover: all sync flat-file fixtures (isolated_data_dir, sync client)
removed. Tests use async fixtures only.
Service availability detection:
- INTEGRATION=1 env var: assume live Docker services are available
- Default (no INTEGRATION): use in-memory SQLite + skip tests requiring real
PostgreSQL/MinIO/Redis
SQLite compatibility note:
The ORM models use PostgreSQL-specific types (UUID, INET, JSONB). SQLite does
not understand these. The db_session fixture patches them before creating
tables so the in-memory engine can build the schema successfully.
"""
from __future__ import annotations
import os
import socket
import pytest
import pytest_asyncio
from httpx import ASGITransport, AsyncClient
from sqlalchemy import String, Text
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.pool import StaticPool
# ── Service availability ──────────────────────────────────────────────────────
def _port_open(host: str, port: int, timeout: float = 1.0) -> bool:
"""Return True if the given TCP port is reachable."""
try:
with socket.create_connection((host, port), timeout=timeout):
return True
except OSError:
return False
@pytest.fixture(scope="session")
def live_services_available():
"""True when Docker Compose services are reachable (or INTEGRATION=1 is set)."""
if os.environ.get("INTEGRATION") == "1":
return True
return (
_port_open("localhost", 5432)
and _port_open("localhost", 9000)
and _port_open("localhost", 6379)
)
# ── Core async fixtures ───────────────────────────────────────────────────────
def _patch_pg_types_for_sqlite():
"""Patch PostgreSQL-specific column types so SQLite can create the schema.
SQLite does not know about INET, UUID (as_uuid=True), or JSONB. We
replace them with Text/String equivalents for the in-memory test engine.
This is done by monkey-patching the dialect-type mapping rather than
modifying the models.
"""
try:
from sqlalchemy.dialects.postgresql import UUID as PG_UUID, INET, JSONB
# Override compile methods so SQLite renders them as TEXT
for pg_type in (INET, JSONB):
pg_type.__class_getitem__ = classmethod(lambda cls, item: cls())
# Patch impl so SQLite uses String
if not hasattr(INET, "_sqlite_patched"):
INET.impl = String
INET._sqlite_patched = True
if not hasattr(JSONB, "_sqlite_patched"):
JSONB.impl = Text
JSONB._sqlite_patched = True
except Exception:
pass # If patching fails, the fixture will raise a CompileError naturally
@pytest_asyncio.fixture
async def db_session():
"""In-memory async SQLite session for unit tests.
PostgreSQL-specific column types are overridden to Text/String so that
Base.metadata.create_all works against the SQLite dialect.
"""
from sqlalchemy.dialects.sqlite.base import SQLiteTypeCompiler
from sqlalchemy.dialects.postgresql import INET, JSONB
from db.models import Base
# ── Type compatibility shims ──────────────────────────────────────────────
# PostgreSQL-specific types (INET, JSONB) are unknown to the SQLite dialect.
# Temporarily add visit_* methods that render them as TEXT so that
# Base.metadata.create_all can build the schema in SQLite.
_orig_visit_INET = getattr(SQLiteTypeCompiler, "visit_INET", None)
_orig_visit_JSONB = getattr(SQLiteTypeCompiler, "visit_JSONB", None)
def _visit_inet(self, type_, **kw):
return "TEXT"
def _visit_jsonb(self, type_, **kw):
return "TEXT"
SQLiteTypeCompiler.visit_INET = _visit_inet # type: ignore[attr-defined]
SQLiteTypeCompiler.visit_JSONB = _visit_jsonb # type: ignore[attr-defined]
# UUID(as_uuid=True) renders as CHAR(32) in SQLite — already handled by
# SQLAlchemy's built-in UUID type mapping — no patch needed.
engine = create_async_engine(
"sqlite+aiosqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
try:
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
AsyncTestSession = async_sessionmaker(engine, expire_on_commit=False)
async with AsyncTestSession() as session:
yield session
finally:
await engine.dispose()
# Restore compiler methods to leave no side effects on other tests
if _orig_visit_INET is not None:
SQLiteTypeCompiler.visit_INET = _orig_visit_INET # type: ignore
else:
try:
del SQLiteTypeCompiler.visit_INET # type: ignore
except AttributeError:
pass
if _orig_visit_JSONB is not None:
SQLiteTypeCompiler.visit_JSONB = _orig_visit_JSONB # type: ignore
else:
try:
del SQLiteTypeCompiler.visit_JSONB # type: ignore
except AttributeError:
pass
@pytest_asyncio.fixture
async def async_client(db_session: AsyncSession):
"""Async HTTP test client with the DB dependency overridden to use in-memory SQLite."""
from deps.db import get_db
from main import app
app.dependency_overrides[get_db] = lambda: db_session
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c:
yield c
app.dependency_overrides.clear()
# ── File fixtures ─────────────────────────────────────────────────────────────
@pytest.fixture
def sample_txt(tmp_path):
p = tmp_path / "sample.txt"
p.write_text("This is a test document about invoices and finance.")
return p
@pytest.fixture
def sample_pdf(tmp_path):
"""Create a minimal valid PDF for testing."""
import fitz
doc = fitz.open()
page = doc.new_page()
page.insert_text((50, 50), "Test PDF document about contracts and legal matters.")
pdf_path = tmp_path / "sample.pdf"
doc.save(str(pdf_path))
doc.close()
return pdf_path
# ── Phase 3 shared fixtures ───────────────────────────────────────────────────
# These fixtures are used by test_quota.py, test_documents.py, test_topics.py,
# and test_classifier.py in Plans 03-02 through 03-04.
@pytest_asyncio.fixture
async def auth_user(db_session: AsyncSession):
"""Create a regular user with a Quota row and return auth context.
Returns dict with keys:
- user: User ORM instance
- token: signed JWT access token
- headers: {"Authorization": "Bearer <token>"}
The fixture issues a valid access token via services.auth.create_access_token
so that get_current_user accepts it in downstream endpoint tests.
"""
import uuid as _uuid
from db.models import User, Quota
from services.auth import hash_password, create_access_token
user_id = _uuid.uuid4()
user = User(
id=user_id,
handle=f"testuser_{user_id.hex[:8]}",
email=f"testuser_{user_id.hex[:8]}@example.com",
password_hash=hash_password("Testpassword123!"),
role="user",
is_active=True,
password_must_change=False,
)
quota = Quota(
user_id=user_id,
limit_bytes=104857600, # 100 MB
used_bytes=0,
)
db_session.add(user)
db_session.add(quota)
await db_session.commit()
token = create_access_token(str(user_id), "user")
return {
"user": user,
"token": token,
"headers": {"Authorization": f"Bearer {token}"},
}
@pytest_asyncio.fixture
async def second_auth_user(db_session: AsyncSession):
"""Create a second regular user with a Quota row and return auth context.
Returns the same dict shape as auth_user but with a distinct handle prefix
("user2_") so sharing tests can have a sharer and a recipient in the same
test without handle collisions.
"""
import uuid as _uuid
from db.models import User, Quota
from services.auth import hash_password, create_access_token
user_id = _uuid.uuid4()
user = User(
id=user_id,
handle=f"user2_{user_id.hex[:8]}",
email=f"user2_{user_id.hex[:8]}@example.com",
password_hash=hash_password("Testpassword123!"),
role="user",
is_active=True,
password_must_change=False,
)
quota = Quota(
user_id=user_id,
limit_bytes=104857600, # 100 MB
used_bytes=0,
)
db_session.add(user)
db_session.add(quota)
await db_session.commit()
token = create_access_token(str(user_id), "user")
return {
"user": user,
"token": token,
"headers": {"Authorization": f"Bearer {token}"},
}
@pytest_asyncio.fixture
async def admin_user(db_session: AsyncSession):
"""Create an admin user with a Quota row and return auth context.
Returns the same dict shape as auth_user but with role="admin".
"""
import uuid as _uuid
from db.models import User, Quota
from services.auth import hash_password, create_access_token
user_id = _uuid.uuid4()
user = User(
id=user_id,
handle=f"adminuser_{user_id.hex[:8]}",
email=f"adminuser_{user_id.hex[:8]}@example.com",
password_hash=hash_password("Testpassword123!"),
role="admin",
is_active=True,
password_must_change=False,
)
quota = Quota(
user_id=user_id,
limit_bytes=104857600,
used_bytes=0,
)
db_session.add(user)
db_session.add(quota)
await db_session.commit()
token = create_access_token(str(user_id), "admin")
return {
"user": user,
"token": token,
"headers": {"Authorization": f"Bearer {token}"},
}
@pytest.fixture
def mock_minio_presigned(monkeypatch):
"""Patch MinIOBackend.generate_presigned_put_url with an AsyncMock.
The patched method does not exist yet — it is added in Plan 03-02.
Using raising=False ensures the patch installs before the attribute exists.
Yields the AsyncMock so tests can assert call counts and args.
"""
from unittest.mock import AsyncMock
mock = AsyncMock(return_value="http://localhost:9000/docuvault/test-presigned-url")
try:
from storage.minio_backend import MinIOBackend
monkeypatch.setattr(MinIOBackend, "generate_presigned_put_url", mock, raising=False)
except ImportError:
pass # storage module not yet available — patch is best-effort
yield mock
@pytest.fixture
def mock_minio_stat(monkeypatch):
"""Patch MinIOBackend.stat_object with an AsyncMock returning 1024 bytes.
The patched method does not exist yet — it is added in Plan 03-02.
Using raising=False ensures the patch installs before the attribute exists.
Yields the AsyncMock for per-test customization:
mock_minio_stat.return_value = 50_000_000
"""
from unittest.mock import AsyncMock
mock = AsyncMock(return_value=1024)
try:
from storage.minio_backend import MinIOBackend
monkeypatch.setattr(MinIOBackend, "stat_object", mock, raising=False)
except ImportError:
pass # storage module not yet available — patch is best-effort
yield mock
# ── Phase 5 cloud storage fixtures ───────────────────────────────────────────
@pytest.fixture
def mock_google_drive_creds():
"""Return a fake Google Drive OAuth credential dict for unit tests.
Shape mirrors what google-auth-oauthlib stores in credentials.to_json().
Expiry is far in the future so tests never hit a token-refresh branch.
"""
return {
"access_token": "ya29.test_access",
"refresh_token": "1//test_refresh",
"expiry": "2099-12-31T23:59:59",
"token_uri": "https://oauth2.googleapis.com/token",
"client_id": "test_client_id",
"client_secret": "test_client_secret",
}
@pytest.fixture
def mock_onedrive_creds():
"""Return a fake OneDrive MSAL credential dict for unit tests.
Shape mirrors what msal.PublicClientApplication.acquire_token_by_auth_code_flow
returns (simplified subset required by cloud_utils).
"""
return {
"access_token": "test_ms_access",
"refresh_token": "test_ms_refresh",
"expires_at": "2099-12-31T23:59:59",
}
@pytest.fixture
def mock_webdav_client():
"""Return a MagicMock simulating a webdavclient3 Client instance.
All four methods the DocuVault cloud layer calls are pre-wired to return None
so tests can assert call counts without making real network connections.
"""
from unittest.mock import MagicMock
client = MagicMock()
client.upload_to = MagicMock(return_value=None)
client.download_from = MagicMock(return_value=None)
client.list = MagicMock(return_value=None)
client.check = MagicMock(return_value=None)
return client
@pytest_asyncio.fixture
async def cloud_connection_factory(db_session: AsyncSession):
"""Factory fixture that creates CloudConnection ORM rows in the test db_session.
Usage::
async def test_something(cloud_connection_factory, auth_user):
conn = await cloud_connection_factory(
db_session, auth_user["user"].id, provider="google_drive"
)
assert conn.status == "ACTIVE"
Parameters
----------
session : AsyncSession
The async SQLAlchemy session to use (typically db_session).
user_id : uuid.UUID | str
Owner of the cloud connection.
provider : str
Provider slug, e.g. "google_drive", "onedrive", "webdav".
status : str
Connection status string, default "ACTIVE".
display_name : str | None
Human-readable label; defaults to "<provider> account".
credentials_enc : str
Encrypted credential blob (use a placeholder in unit tests).
"""
import uuid as _uuid
from db.models import CloudConnection
async def _factory(
session: AsyncSession,
user_id,
provider: str = "google_drive",
status: str = "ACTIVE",
display_name: str | None = None,
credentials_enc: str = "fake_encrypted_creds",
) -> CloudConnection:
conn = CloudConnection(
id=_uuid.uuid4(),
user_id=user_id if isinstance(user_id, _uuid.UUID) else _uuid.UUID(str(user_id)),
provider=provider,
display_name=display_name or f"{provider} account",
credentials_enc=credentials_enc,
status=status,
)
session.add(conn)
await session.flush()
return conn
return _factory