""" pytest configuration for DocuVault backend tests. Plan 05 cutover: all sync flat-file fixtures (isolated_data_dir, sync client) removed. Tests use async fixtures only. Service availability detection: - INTEGRATION=1 env var: assume live Docker services are available - Default (no INTEGRATION): use in-memory SQLite + skip tests requiring real PostgreSQL/MinIO/Redis SQLite compatibility note: The ORM models use PostgreSQL-specific types (UUID, INET, JSONB). SQLite does not understand these. The db_session fixture patches them before creating tables so the in-memory engine can build the schema successfully. """ from __future__ import annotations import os import socket import pytest import pytest_asyncio from httpx import ASGITransport, AsyncClient from sqlalchemy import String, Text from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from sqlalchemy.pool import StaticPool # ── Service availability ────────────────────────────────────────────────────── def _port_open(host: str, port: int, timeout: float = 1.0) -> bool: """Return True if the given TCP port is reachable.""" try: with socket.create_connection((host, port), timeout=timeout): return True except OSError: return False @pytest.fixture(scope="session") def live_services_available(): """True when Docker Compose services are reachable (or INTEGRATION=1 is set).""" if os.environ.get("INTEGRATION") == "1": return True return ( _port_open("localhost", 5432) and _port_open("localhost", 9000) and _port_open("localhost", 6379) ) # ── Core async fixtures ─────────────────────────────────────────────────────── def _patch_pg_types_for_sqlite(): """Patch PostgreSQL-specific column types so SQLite can create the schema. SQLite does not know about INET, UUID (as_uuid=True), or JSONB. We replace them with Text/String equivalents for the in-memory test engine. This is done by monkey-patching the dialect-type mapping rather than modifying the models. """ try: from sqlalchemy.dialects.postgresql import UUID as PG_UUID, INET, JSONB # Override compile methods so SQLite renders them as TEXT for pg_type in (INET, JSONB): pg_type.__class_getitem__ = classmethod(lambda cls, item: cls()) # Patch impl so SQLite uses String if not hasattr(INET, "_sqlite_patched"): INET.impl = String INET._sqlite_patched = True if not hasattr(JSONB, "_sqlite_patched"): JSONB.impl = Text JSONB._sqlite_patched = True except Exception: pass # If patching fails, the fixture will raise a CompileError naturally @pytest_asyncio.fixture async def db_session(): """In-memory async SQLite session for unit tests. PostgreSQL-specific column types are overridden to Text/String so that Base.metadata.create_all works against the SQLite dialect. """ from sqlalchemy.dialects.sqlite.base import SQLiteTypeCompiler from sqlalchemy.dialects.postgresql import INET, JSONB from db.models import Base # ── Type compatibility shims ────────────────────────────────────────────── # PostgreSQL-specific types (INET, JSONB) are unknown to the SQLite dialect. # Temporarily add visit_* methods that render them as TEXT so that # Base.metadata.create_all can build the schema in SQLite. _orig_visit_INET = getattr(SQLiteTypeCompiler, "visit_INET", None) _orig_visit_JSONB = getattr(SQLiteTypeCompiler, "visit_JSONB", None) def _visit_inet(self, type_, **kw): return "TEXT" def _visit_jsonb(self, type_, **kw): return "TEXT" SQLiteTypeCompiler.visit_INET = _visit_inet # type: ignore[attr-defined] SQLiteTypeCompiler.visit_JSONB = _visit_jsonb # type: ignore[attr-defined] # UUID(as_uuid=True) renders as CHAR(32) in SQLite — already handled by # SQLAlchemy's built-in UUID type mapping — no patch needed. engine = create_async_engine( "sqlite+aiosqlite:///:memory:", connect_args={"check_same_thread": False}, poolclass=StaticPool, ) try: async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) AsyncTestSession = async_sessionmaker(engine, expire_on_commit=False) async with AsyncTestSession() as session: yield session finally: await engine.dispose() # Restore compiler methods to leave no side effects on other tests if _orig_visit_INET is not None: SQLiteTypeCompiler.visit_INET = _orig_visit_INET # type: ignore else: try: del SQLiteTypeCompiler.visit_INET # type: ignore except AttributeError: pass if _orig_visit_JSONB is not None: SQLiteTypeCompiler.visit_JSONB = _orig_visit_JSONB # type: ignore else: try: del SQLiteTypeCompiler.visit_JSONB # type: ignore except AttributeError: pass @pytest_asyncio.fixture async def async_client(db_session: AsyncSession): """Async HTTP test client with the DB dependency overridden to use in-memory SQLite.""" from deps.db import get_db from main import app app.dependency_overrides[get_db] = lambda: db_session async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c: yield c app.dependency_overrides.clear() # ── File fixtures ───────────────────────────────────────────────────────────── @pytest.fixture def sample_txt(tmp_path): p = tmp_path / "sample.txt" p.write_text("This is a test document about invoices and finance.") return p @pytest.fixture def sample_pdf(tmp_path): """Create a minimal valid PDF for testing.""" import fitz doc = fitz.open() page = doc.new_page() page.insert_text((50, 50), "Test PDF document about contracts and legal matters.") pdf_path = tmp_path / "sample.pdf" doc.save(str(pdf_path)) doc.close() return pdf_path # ── Phase 3 shared fixtures ─────────────────────────────────────────────────── # These fixtures are used by test_quota.py, test_documents.py, test_topics.py, # and test_classifier.py in Plans 03-02 through 03-04. @pytest_asyncio.fixture async def auth_user(db_session: AsyncSession): """Create a regular user with a Quota row and return auth context. Returns dict with keys: - user: User ORM instance - token: signed JWT access token - headers: {"Authorization": "Bearer "} The fixture issues a valid access token via services.auth.create_access_token so that get_current_user accepts it in downstream endpoint tests. """ import uuid as _uuid from db.models import User, Quota from services.auth import hash_password, create_access_token user_id = _uuid.uuid4() user = User( id=user_id, handle=f"testuser_{user_id.hex[:8]}", email=f"testuser_{user_id.hex[:8]}@example.com", password_hash=hash_password("Testpassword123!"), role="user", is_active=True, password_must_change=False, ) quota = Quota( user_id=user_id, limit_bytes=104857600, # 100 MB used_bytes=0, ) db_session.add(user) db_session.add(quota) await db_session.commit() token = create_access_token(str(user_id), "user") return { "user": user, "token": token, "headers": {"Authorization": f"Bearer {token}"}, } @pytest_asyncio.fixture async def second_auth_user(db_session: AsyncSession): """Create a second regular user with a Quota row and return auth context. Returns the same dict shape as auth_user but with a distinct handle prefix ("user2_") so sharing tests can have a sharer and a recipient in the same test without handle collisions. """ import uuid as _uuid from db.models import User, Quota from services.auth import hash_password, create_access_token user_id = _uuid.uuid4() user = User( id=user_id, handle=f"user2_{user_id.hex[:8]}", email=f"user2_{user_id.hex[:8]}@example.com", password_hash=hash_password("Testpassword123!"), role="user", is_active=True, password_must_change=False, ) quota = Quota( user_id=user_id, limit_bytes=104857600, # 100 MB used_bytes=0, ) db_session.add(user) db_session.add(quota) await db_session.commit() token = create_access_token(str(user_id), "user") return { "user": user, "token": token, "headers": {"Authorization": f"Bearer {token}"}, } @pytest_asyncio.fixture async def admin_user(db_session: AsyncSession): """Create an admin user with a Quota row and return auth context. Returns the same dict shape as auth_user but with role="admin". """ import uuid as _uuid from db.models import User, Quota from services.auth import hash_password, create_access_token user_id = _uuid.uuid4() user = User( id=user_id, handle=f"adminuser_{user_id.hex[:8]}", email=f"adminuser_{user_id.hex[:8]}@example.com", password_hash=hash_password("Testpassword123!"), role="admin", is_active=True, password_must_change=False, ) quota = Quota( user_id=user_id, limit_bytes=104857600, used_bytes=0, ) db_session.add(user) db_session.add(quota) await db_session.commit() token = create_access_token(str(user_id), "admin") return { "user": user, "token": token, "headers": {"Authorization": f"Bearer {token}"}, } @pytest.fixture def mock_minio_presigned(monkeypatch): """Patch MinIOBackend.generate_presigned_put_url with an AsyncMock. The patched method does not exist yet — it is added in Plan 03-02. Using raising=False ensures the patch installs before the attribute exists. Yields the AsyncMock so tests can assert call counts and args. """ from unittest.mock import AsyncMock mock = AsyncMock(return_value="http://localhost:9000/docuvault/test-presigned-url") try: from storage.minio_backend import MinIOBackend monkeypatch.setattr(MinIOBackend, "generate_presigned_put_url", mock, raising=False) except ImportError: pass # storage module not yet available — patch is best-effort yield mock @pytest.fixture def mock_minio_stat(monkeypatch): """Patch MinIOBackend.stat_object with an AsyncMock returning 1024 bytes. The patched method does not exist yet — it is added in Plan 03-02. Using raising=False ensures the patch installs before the attribute exists. Yields the AsyncMock for per-test customization: mock_minio_stat.return_value = 50_000_000 """ from unittest.mock import AsyncMock mock = AsyncMock(return_value=1024) try: from storage.minio_backend import MinIOBackend monkeypatch.setattr(MinIOBackend, "stat_object", mock, raising=False) except ImportError: pass # storage module not yet available — patch is best-effort yield mock # ── Phase 5 cloud storage fixtures ─────────────────────────────────────────── @pytest.fixture def mock_google_drive_creds(): """Return a fake Google Drive OAuth credential dict for unit tests. Shape mirrors what google-auth-oauthlib stores in credentials.to_json(). Expiry is far in the future so tests never hit a token-refresh branch. """ return { "access_token": "ya29.test_access", "refresh_token": "1//test_refresh", "expiry": "2099-12-31T23:59:59", "token_uri": "https://oauth2.googleapis.com/token", "client_id": "test_client_id", "client_secret": "test_client_secret", } @pytest.fixture def mock_onedrive_creds(): """Return a fake OneDrive MSAL credential dict for unit tests. Shape mirrors what msal.PublicClientApplication.acquire_token_by_auth_code_flow returns (simplified subset required by cloud_utils). """ return { "access_token": "test_ms_access", "refresh_token": "test_ms_refresh", "expires_at": "2099-12-31T23:59:59", } @pytest.fixture def mock_webdav_client(): """Return a MagicMock simulating a webdavclient3 Client instance. All four methods the DocuVault cloud layer calls are pre-wired to return None so tests can assert call counts without making real network connections. """ from unittest.mock import MagicMock client = MagicMock() client.upload_to = MagicMock(return_value=None) client.download_from = MagicMock(return_value=None) client.list = MagicMock(return_value=None) client.check = MagicMock(return_value=None) return client @pytest_asyncio.fixture async def cloud_connection_factory(db_session: AsyncSession): """Factory fixture that creates CloudConnection ORM rows in the test db_session. Usage:: async def test_something(cloud_connection_factory, auth_user): conn = await cloud_connection_factory( db_session, auth_user["user"].id, provider="google_drive" ) assert conn.status == "ACTIVE" Parameters ---------- session : AsyncSession The async SQLAlchemy session to use (typically db_session). user_id : uuid.UUID | str Owner of the cloud connection. provider : str Provider slug, e.g. "google_drive", "onedrive", "webdav". status : str Connection status string, default "ACTIVE". display_name : str | None Human-readable label; defaults to " account". credentials_enc : str Encrypted credential blob (use a placeholder in unit tests). """ import uuid as _uuid from db.models import CloudConnection async def _factory( session: AsyncSession, user_id, provider: str = "google_drive", status: str = "ACTIVE", display_name: str | None = None, credentials_enc: str = "fake_encrypted_creds", ) -> CloudConnection: conn = CloudConnection( id=_uuid.uuid4(), user_id=user_id if isinstance(user_id, _uuid.UUID) else _uuid.UUID(str(user_id)), provider=provider, display_name=display_name or f"{provider} account", credentials_enc=credentials_enc, status=status, ) session.add(conn) await session.flush() return conn return _factory