Files
curo1305 6fed5ba531 docs(01): create phase 1 plan — 5 plans in 4 waves
Research, pattern mapping, and verification complete.
Walking Skeleton mode active (MVP Phase 1).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-22 08:49:36 +02:00

38 KiB
Raw Permalink Blame History

Phase 1: Infrastructure Foundation - Pattern Map

Mapped: 2026-05-21 Files analyzed: 14 new/modified files Analogs found: 12 / 14


File Classification

New/Modified File Role Data Flow Closest Analog Match Quality
docker-compose.yml config request-response docker-compose.yml (current) exact — extend in-place
docker/postgres/initdb.d/01-init-users.sql config batch none in codebase no analog
backend/db/session.py config CRUD backend/config.py (module-level setup pattern) partial
backend/db/models.py model CRUD none in codebase no analog (schema from RESEARCH.md)
backend/deps/db.py utility CRUD backend/config.py (module-level constants pattern) partial
backend/config.py config request-response backend/config.py (current) exact — extend in-place
backend/main.py config request-response backend/main.py (current) exact — extend in-place
backend/storage/base.py utility request-response backend/ai/base.py exact role-match
backend/storage/__init__.py utility request-response backend/ai/__init__.py exact role-match
backend/storage/minio_backend.py service file-I/O backend/ai/openai_provider.py role-match (ABC impl)
backend/services/storage.py service CRUD backend/services/storage.py (current) exact — replace in-place
backend/celery_app.py config event-driven none in codebase no analog
backend/tasks/document_tasks.py service event-driven backend/services/classifier.py role-match (orchestration)
backend/api/documents.py controller request-response backend/api/documents.py (current) exact — update in-place
backend/api/topics.py controller request-response backend/api/topics.py (current) exact — update in-place
backend/requirements.txt config backend/requirements.txt (current) exact — extend in-place
.env.example config .env.example (current) exact — extend in-place
backend/tests/conftest.py test CRUD backend/tests/conftest.py (current) exact — update in-place
backend/tests/test_health.py test request-response backend/tests/test_health.py (current) exact — update in-place
backend/tests/test_documents.py test CRUD backend/tests/test_documents.py (current) exact — update in-place
backend/tests/test_storage.py test file-I/O none in codebase no analog (new)
backend/alembic.ini config none in codebase no analog
backend/migrations/env.py config batch none in codebase no analog (pattern from RESEARCH.md)
backend/migrations/versions/0001_initial_schema.py migration batch none in codebase no analog (schema from RESEARCH.md)

Pattern Assignments

docker-compose.yml (config, request-response)

Analog: docker-compose.yml (current, lines 126)

Existing service block pattern (lines 126 of current docker-compose.yml):

services:
  backend:
    build: ./backend
    ports:
      - "8000:8000"
    volumes:
      - ./backend/data:/app/data
      - ./backend:/app
    environment:
      - DATA_DIR=/app/data
      - PYTHONDONTWRITEBYTECODE=1
    extra_hosts:
      - "host.docker.internal:host-gateway"
    command: uvicorn main:app --host 0.0.0.0 --port 8000 --reload

  frontend:
    build: ./frontend
    ports:
      - "5173:5173"
    volumes:
      - ./frontend/src:/app/src
      - ./frontend/index.html:/app/index.html
    depends_on:
      - backend
    command: npm run dev -- --host 0.0.0.0

New services to add — copy structure from RESEARCH.md Pattern 6 (lines 512567):

  postgres:
    image: postgres:17-alpine
    environment:
      POSTGRES_DB: docuvault
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
    volumes:
      - postgres_data:/var/lib/postgresql/data
      - ./docker/postgres/initdb.d:/docker-entrypoint-initdb.d:ro
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U postgres -d docuvault"]
      interval: 10s
      timeout: 5s
      retries: 5
      start_period: 10s

  minio:
    image: minio/minio:latest
    command: server /data --console-address ":9001"
    environment:
      MINIO_ROOT_USER: ${MINIO_ROOT_USER}
      MINIO_ROOT_PASSWORD: ${MINIO_ROOT_PASSWORD}
    ports:
      - "9000:9000"
      - "9001:9001"
    volumes:
      - minio_data:/data
    healthcheck:
      test: ["CMD", "mc", "ready", "local"]
      interval: 10s
      timeout: 5s
      retries: 5
      start_period: 15s

  redis:
    image: redis:7-alpine
    command: redis-server --requirepass ${REDIS_PASSWORD}
    healthcheck:
      test: ["CMD", "redis-cli", "-a", "${REDIS_PASSWORD}", "ping"]
      interval: 10s
      timeout: 3s
      retries: 5

  celery-worker:
    build: ./backend
    command: celery -A celery_app worker --loglevel=info -Q documents
    environment:
      - DATABASE_URL=${DATABASE_URL}
      - REDIS_URL=${REDIS_URL}
      - MINIO_ENDPOINT=${MINIO_ENDPOINT}
      - MINIO_ACCESS_KEY=${MINIO_ACCESS_KEY}
      - MINIO_SECRET_KEY=${MINIO_SECRET_KEY}
      - MINIO_BUCKET=${MINIO_BUCKET}
    depends_on:
      postgres:
        condition: service_healthy
      redis:
        condition: service_healthy
      minio:
        condition: service_healthy

backend service update — add depends_on conditions:

  backend:
    ...
    environment:
      - DATABASE_URL=${DATABASE_URL}
      - DATABASE_MIGRATE_URL=${DATABASE_MIGRATE_URL}
      - MINIO_ENDPOINT=${MINIO_ENDPOINT}
      - MINIO_ACCESS_KEY=${MINIO_ACCESS_KEY}
      - MINIO_SECRET_KEY=${MINIO_SECRET_KEY}
      - MINIO_BUCKET=${MINIO_BUCKET}
      - REDIS_URL=${REDIS_URL}
      - PYTHONDONTWRITEBYTECODE=1
    depends_on:
      postgres:
        condition: service_healthy
      minio:
        condition: service_healthy
      redis:
        condition: service_healthy

Remove the volumes: entry for ./backend/data:/app/data — flat-file storage is deleted (D-04).

Add named volumes block at end of file:

volumes:
  postgres_data:
  minio_data:

backend/config.py (config, request-response)

Analog: backend/config.py (current, lines 152)

Existing pattern (lines 110 — module-level constants, NOT Pydantic Settings):

import json
import os
from pathlib import Path

DATA_DIR = Path(os.environ.get("DATA_DIR", "/app/data"))
UPLOADS_DIR = DATA_DIR / "uploads"
METADATA_DIR = DATA_DIR / "metadata"
TOPICS_FILE = DATA_DIR / "topics.json"
SETTINGS_FILE = DATA_DIR / "settings.json"

Replace entirely with Pydantic Settings (per RESEARCH.md Code Examples, lines 914937). The existing config.py does not use pydantic-settings — Phase 1 introduces it. The pattern to follow is the RESEARCH.md example, not the current file. Keep the DEFAULT_SYSTEM_PROMPT and DEFAULT_SETTINGS constants for backward compatibility during the transition; remove ensure_data_dirs() and all path constants once services/storage.py is replaced.

New pattern:

# backend/config.py
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    # Legacy — keep during transition, remove after storage.py rewrite
    data_dir: str = "/app/data"

    # Phase 1 additions
    database_url: str = "postgresql+psycopg://docuvault_app:changeme@postgres/docuvault"
    database_migrate_url: str = "postgresql+psycopg://docuvault_migrate:changeme@postgres/docuvault"
    minio_endpoint: str = "minio:9000"
    minio_access_key: str = "docuvault_app"
    minio_secret_key: str = "changeme"
    minio_bucket: str = "docuvault"
    redis_url: str = "redis://:changeme@redis:6379/0"
    secret_key: str = "CHANGEME"  # documented for Phase 2; not read in Phase 1

    class Config:
        env_file = ".env"
        env_file_encoding = "utf-8"

settings = Settings()

Note: pydantic-settings is already in requirements.txt (line 4). No new dependency needed.


backend/main.py (config, request-response)

Analog: backend/main.py (current, lines 134)

Existing lifespan pattern (lines 1014):

from contextlib import asynccontextmanager
from fastapi import FastAPI

@asynccontextmanager
async def lifespan(app: FastAPI):
    ensure_data_dirs()
    yield

Extend lifespan — replace ensure_data_dirs() call with engine setup and MinIO bucket init. Copy the asynccontextmanager + yield structure exactly:

from contextlib import asynccontextmanager
import asyncio
from fastapi import FastAPI
from minio import Minio
from db.session import engine
from config import settings

@asynccontextmanager
async def lifespan(app: FastAPI):
    # MinIO bucket initialization
    minio_client = Minio(
        settings.minio_endpoint,
        access_key=settings.minio_access_key,
        secret_key=settings.minio_secret_key,
        secure=False,
    )
    exists = await asyncio.to_thread(minio_client.bucket_exists, settings.minio_bucket)
    if not exists:
        await asyncio.to_thread(minio_client.make_bucket, settings.minio_bucket)
    app.state.minio = minio_client
    yield
    # Shutdown: close all pooled connections
    await engine.dispose()

Extend /health endpoint — keep existing route signature @app.get("/health") and async def health(), extend the body:

@app.get("/health")
async def health(request: Request):
    checks = {}
    # PostgreSQL probe
    try:
        async with AsyncSessionLocal() as session:
            await session.execute(text("SELECT 1"))
        checks["postgres"] = "ok"
    except Exception as e:
        checks["postgres"] = f"error: {e}"

    # MinIO probe
    try:
        ok = await asyncio.to_thread(request.app.state.minio.bucket_exists, settings.minio_bucket)
        checks["minio"] = "ok" if ok else "bucket missing"
    except Exception as e:
        checks["minio"] = f"error: {e}"

    overall = "ok" if all(v == "ok" for v in checks.values()) else "degraded"
    return {"status": overall, "checks": checks}

backend/db/session.py (config, CRUD)

Analog: None exact. Closest structural analog is backend/config.py (module-level initialization pattern at lines 110).

Pattern from RESEARCH.md Pattern 1 (lines 240266):

# backend/db/session.py
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from config import settings

engine = create_async_engine(
    settings.database_url,      # postgresql+psycopg://docuvault_app:...@postgres/docuvault
    pool_pre_ping=True,         # detect stale connections before use
    echo=False,
)

AsyncSessionLocal = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,     # prevent MissingGreenlet errors after commit
)

Key rule: expire_on_commit=False is mandatory — see RESEARCH.md Pitfall 1.


backend/deps/db.py (utility, CRUD)

Analog: None exact. The dependency injection yield pattern mirrors how backend/tests/conftest.py yields fixtures (lines 1343).

Pattern from RESEARCH.md Pattern 1 (lines 258266):

# backend/deps/db.py
from db.session import AsyncSessionLocal

async def get_db():
    async with AsyncSessionLocal() as session:
        try:
            yield session
        finally:
            await session.close()

Use as a FastAPI dependency: session: AsyncSession = Depends(get_db).


backend/db/models.py (model, CRUD)

Analog: None in codebase. The full schema is specified in RESEARCH.md Code Examples (lines 769908).

Import block to copy:

import uuid
from datetime import datetime, timezone
from sqlalchemy import (
    Boolean, BigInteger, ForeignKey, Index, String, Text,
    TIMESTAMP, UniqueConstraint, Integer
)
from sqlalchemy.dialects.postgresql import UUID, INET, JSONB
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy.sql import func

Base class pattern:

class Base(DeclarativeBase):
    pass

Critical D-03: Document.user_id must be nullable=True in Phase 1:

user_id: Mapped[uuid.UUID | None] = mapped_column(
    UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"), nullable=True
)

Use the full schema from RESEARCH.md lines 788908 verbatim — it was designed to be implementation-ready.


backend/storage/base.py (utility, request-response)

Analog: backend/ai/base.py (lines 133) — exact structural match.

ABC pattern from backend/ai/base.py (lines 133):

from abc import ABC, abstractmethod
from dataclasses import dataclass, field

class AIProvider(ABC):
    @abstractmethod
    async def classify(self, ...) -> ClassificationResult: ...

    @abstractmethod
    async def health_check(self) -> bool: ...

Apply same structure for StorageBackend. The health_check() abstract method is already present in ai/base.py (line 31) — mirror it exactly in StorageBackend. Method signatures from RESEARCH.md Pattern 8 (lines 617640):

# backend/storage/base.py
from abc import ABC, abstractmethod

class StorageBackend(ABC):
    @abstractmethod
    async def put_object(
        self, user_id: str, document_id: str,
        file_bytes: bytes, extension: str, content_type: str,
    ) -> str:
        """Store object; return the object_key used."""

    @abstractmethod
    async def get_object(self, object_key: str) -> bytes:
        """Retrieve object bytes by key."""

    @abstractmethod
    async def delete_object(self, object_key: str) -> None:
        """Delete object by key."""

    @abstractmethod
    async def presigned_get_url(self, object_key: str, expires_minutes: int = 60) -> str:
        """Return a time-limited download URL."""

    @abstractmethod
    async def health_check(self) -> bool:
        """Return True if backend is reachable."""

backend/storage/__init__.py (utility, request-response)

Analog: backend/ai/__init__.py (lines 136) — exact structural match.

Factory pattern from backend/ai/__init__.py (lines 110 and 836):

from ai.base import AIProvider, ClassificationResult
from ai.anthropic_provider import AnthropicProvider
# ... more imports

def get_provider(settings: dict) -> AIProvider:
    active = settings.get("active_provider", "lmstudio")
    match active:
        case "anthropic":
            return AnthropicProvider(...)
        case _:
            raise ValueError(f"Unknown AI provider: {active}")

Apply same factory pattern for storage. Phase 1 has only one backend (MinIO), so the match can be omitted initially, but the factory function signature is mandatory:

# backend/storage/__init__.py
from config import settings
from storage.minio_backend import MinIOBackend
from storage.base import StorageBackend

def get_storage_backend() -> StorageBackend:
    return MinIOBackend(
        endpoint=settings.minio_endpoint,
        access_key=settings.minio_access_key,
        secret_key=settings.minio_secret_key,
        bucket=settings.minio_bucket,
        secure=False,
    )

backend/storage/minio_backend.py (service, file-I/O)

Analog: backend/ai/openai_provider.py (lines 1104) — same ABC-implementation pattern.

ABC implementation pattern from backend/ai/openai_provider.py (lines 970):

class OpenAIProvider(AIProvider):
    def __init__(self, api_key: str, model: str = "gpt-4o", base_url: str | None = None):
        self._api_key = api_key
        self._model = model
        self._base_url = base_url

    def _client(self) -> AsyncOpenAI:
        return AsyncOpenAI(api_key=self._api_key or "placeholder", base_url=self._base_url)

    async def health_check(self) -> bool:
        try:
            await self._client().chat.completions.create(...)
            return True
        except Exception:
            return False

Copy this structure: __init__ stores config, private _client attribute holds SDK instance, every method is async def, health_check wraps in try/except returning bool.

Key difference from AI providers: MinIO SDK is synchronous — all calls must be wrapped in asyncio.to_thread(). Copy the wrapping pattern from RESEARCH.md Pattern 3 (lines 349403):

import asyncio
import io
import uuid

class MinIOBackend(StorageBackend):
    def __init__(self, endpoint, access_key, secret_key, bucket, secure=False):
        self._client = Minio(endpoint=endpoint, access_key=access_key,
                             secret_key=secret_key, secure=secure)
        self._bucket = bucket

    async def put_object(self, user_id, document_id, file_bytes, extension, content_type) -> str:
        object_key = f"{user_id}/{document_id}/{uuid.uuid4()}{extension}"
        data = io.BytesIO(file_bytes)   # BytesIO() constructor sets pointer at 0 — no seek(0) needed
        await asyncio.to_thread(
            self._client.put_object,
            self._bucket, object_key, data, length=len(file_bytes), content_type=content_type,
        )
        return object_key

    async def health_check(self) -> bool:
        try:
            return await asyncio.to_thread(self._client.bucket_exists, self._bucket)
        except Exception:
            return False

backend/services/storage.py (service, CRUD)

Analog: backend/services/storage.py (current, lines 1188) — replace entirely.

Current pattern shows the data-access interface that api/documents.py depends on (lines 1895). The new implementation must preserve the same function signatures where possible to minimize changes in api/documents.py. The new storage.py is a thin orchestrator: it calls db/session.py for ORM operations and storage/minio_backend.py for object storage.

New async signatures to match existing callers in api/documents.py (lines 3257):

# Old (sync):      storage.save_upload(content, file.filename, mime)
# New (async):     await storage.save_upload(content, file.filename, mime)

# Old (sync):      storage.save_metadata(meta)
# New (async):     await storage.save_metadata(meta)  — or merged into save_upload

# Old (sync):      storage.list_metadata(topic=topic)
# New (async):     await storage.list_metadata(topic=topic)

# Old (sync):      storage.get_metadata(doc_id)
# New (async):     await storage.get_metadata(doc_id)

# Old (sync):      storage.delete_document(doc_id)
# New (async):     await storage.delete_document(doc_id)

Session injection pattern: New storage.py functions accept an AsyncSession parameter (injected by the FastAPI dependency via Depends(get_db)), not create their own. This mirrors how the classifier calls storage functions with state passed in.

Error handling from current storage.py (lines 3438 — return None for not-found, not exceptions):

def get_metadata(doc_id: str) -> dict | None:
    path = METADATA_DIR / f"{doc_id}.json"
    if not path.exists():
        return None
    return json.loads(path.read_text())

Keep the same None-on-not-found contract in the async ORM version so api/documents.py if meta is None: raise HTTPException(404, ...) checks continue to work unchanged.


backend/celery_app.py (config, event-driven)

Analog: None in codebase.

Pattern from RESEARCH.md Pattern 5 (lines 462475):

# backend/celery_app.py
import os
from celery import Celery

celery_app = Celery("docuvault")
celery_app.conf.broker_url = os.environ.get("REDIS_URL", "redis://redis:6379/0")
celery_app.conf.result_backend = os.environ.get("REDIS_URL", "redis://redis:6379/0")
celery_app.conf.task_serializer = "json"
celery_app.conf.result_serializer = "json"
celery_app.conf.accept_content = ["json"]
celery_app.conf.task_routes = {
    "tasks.document_tasks.*": {"queue": "documents"},
}

Critical: Use os.environ.get() directly here, NOT from config import settings. config.py imports pydantic-settings, which may trigger FastAPI-related imports. Keep celery_app.py minimal to avoid Pitfall 7 (circular imports with the FastAPI app).


backend/tasks/document_tasks.py (service, event-driven)

Analog: backend/services/classifier.py (lines 159) — same orchestration pattern (load metadata, load settings, call services, persist results).

Orchestration pattern from backend/services/classifier.py (lines 1146):

async def classify_document(doc_id: str, topic_names: list[str] | None = None) -> list[str]:
    meta = storage.get_metadata(doc_id)
    if meta is None:
        raise ValueError(f"Document {doc_id} not found")

    settings = storage.load_settings()
    provider = get_provider(settings)
    text = meta.get("extracted_text", "")
    result = await provider.classify(text[:MAX_AI_CHARS], topic_names, system_prompt)
    # ... persist results
    storage.update_document_topics(doc_id, final_topics)
    return final_topics

Apply same orchestration structure for the Celery task, with three critical differences:

  1. Task function must be def, not async def (Celery workers have no asyncio event loop)
  2. Import services directly — never import from main.py or any router module
  3. Use asyncio.run() to call async service functions if unavoidable
# backend/tasks/document_tasks.py
from celery_app import celery_app

@celery_app.task(name="tasks.document_tasks.extract_and_classify")
def extract_and_classify(document_id: str) -> dict:
    import asyncio
    from services import extractor, classifier
    # ... call services, persist results
    return {"document_id": document_id, "status": "classified"}

Replace in api/documents.py (lines 4956):

# Old:
if auto_classify:
    topics = await classifier.classify_document(saved["id"])
# New:
from tasks.document_tasks import extract_and_classify
extract_and_classify.delay(str(saved_doc.id))

backend/api/documents.py (controller, request-response)

Analog: backend/api/documents.py (current, lines 1102) — update in-place.

Existing route structure to preserve (lines 2158):

  • @router.post("/upload") — keep signature (file: UploadFile, auto_classify: bool)
  • @router.get("") — keep pagination params (topic, page, per_page)
  • @router.get("/{doc_id}") — keep path param
  • @router.delete("/{doc_id}") — keep path param
  • @router.post("/{doc_id}/classify") — keep path param + body

Session injection change — current (lines 14):

from services import storage, extractor, classifier

New — add session dependency:

from fastapi import APIRouter, UploadFile, File, Form, HTTPException, Query, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from deps.db import get_db
from services import storage, extractor
from tasks.document_tasks import extract_and_classify

Add session parameter to route handlers:

@router.post("/upload")
async def upload_document(
    file: UploadFile = File(...),
    auto_classify: bool = Form(True),
    session: AsyncSession = Depends(get_db),   # NEW
):

Error handling pattern (lines 5056 — keep unchanged):

try:
    topics = await classifier.classify_document(saved["id"])
    meta["topics"] = topics
except Exception as e:
    meta["classification_error"] = str(e)  # classification failure is non-fatal

HTTP error pattern (lines 7577 — keep unchanged):

if meta is None:
    raise HTTPException(404, "Document not found")

backend/api/topics.py (controller, request-response)

Analog: backend/api/topics.py (current, lines 173) — update in-place.

Existing Pydantic model pattern (lines 819):

class TopicCreate(BaseModel):
    name: str
    description: str = ""
    color: str = "#6366f1"

class TopicUpdate(BaseModel):
    name: str | None = None
    description: str | None = None
    color: str | None = None

Keep these models unchanged — they match the PostgreSQL topics table columns.

Storage call pattern (lines 2630):

@router.get("")
async def list_topics():
    topics = storage.load_topics()
    counts = storage.topic_doc_counts()

Update to inject session: AsyncSession = Depends(get_db) and call async ORM queries instead of flat-file storage functions. Response shape must remain identical ({"topics": [...]} with doc_count appended per topic).


backend/requirements.txt (config)

Analog: backend/requirements.txt (current, lines 116)

Current file (lines 116):

fastapi>=0.111
uvicorn[standard]>=0.29
python-multipart
pydantic-settings>=2.2
anthropic>=0.26
openai>=1.30
PyMuPDF>=1.24
python-docx>=1.1
pytesseract>=0.3
Pillow>=10.3
filelock>=3.14     # REMOVE — replaced by PostgreSQL transactions
aiofiles>=23.2
httpx>=0.27
pytest>=8.2
pytest-asyncio>=0.23

Additions (append to file):

sqlalchemy[asyncio]>=2.0
psycopg[binary]>=3.3
alembic>=1.13
minio>=7.2
celery[redis]>=5.4
redis>=7.0

Remove: filelock>=3.14 — no longer needed once services/storage.py is replaced (RESEARCH.md line 952).


.env.example (config)

Analog: .env.example (current, lines 16)

Current file (lines 16):

# Copy to .env and fill in as needed.
ANTHROPIC_API_KEY=
OPENAI_API_KEY=

Extend with all Phase 1 vars (D-11, D-13, D-15, D-16). Keep existing vars at top. Pattern: group by service, comment each variable:

# ── PostgreSQL ───────────────────────────────────────────────────────────────
# App user (restricted: SELECT/INSERT/UPDATE/DELETE only — used by FastAPI + Celery)
DATABASE_URL=postgresql+psycopg://docuvault_app:changeme@postgres:5432/docuvault
# Migration user (DDL privileges — used ONLY by Alembic, never by the app at runtime)
DATABASE_MIGRATE_URL=postgresql+psycopg://docuvault_migrate:changeme@postgres:5432/docuvault
# Superuser password for the postgres init container (used only by initdb.d scripts)
POSTGRES_PASSWORD=changeme

# ── MinIO ────────────────────────────────────────────────────────────────────
MINIO_ROOT_USER=minioadmin
MINIO_ROOT_PASSWORD=changeme
MINIO_ENDPOINT=minio:9000
# App-level access key (minimal permissions: read/write on docuvault bucket only)
MINIO_ACCESS_KEY=docuvault_app
MINIO_SECRET_KEY=changeme
MINIO_BUCKET=docuvault

# ── Redis ────────────────────────────────────────────────────────────────────
REDIS_PASSWORD=changeme
REDIS_URL=redis://:changeme@redis:6379/0

# ── Security (Phase 2) ───────────────────────────────────────────────────────
# Not read by the app in Phase 1. Documented here for Phase 2 JWT + HKDF use.
SECRET_KEY=CHANGEME-replace-with-64-char-random-hex

backend/tests/conftest.py (test, CRUD)

Analog: backend/tests/conftest.py (current, lines 171) — update in-place.

Current fixture pattern (lines 1343):

@pytest.fixture(autouse=True)
def isolated_data_dir(monkeypatch, tmp_path):
    """Each test gets its own clean data directory."""
    data_dir = tmp_path / "data"
    ...
    monkeypatch.setenv("DATA_DIR", str(data_dir))
    import config
    monkeypatch.setattr(config, "DATA_DIR", data_dir)
    ...
    yield data_dir

New async session fixture — replace isolated_data_dir with an async SQLite in-memory engine for unit tests, and keep a separate fixture for integration tests using the real Docker database. Copy the yield + teardown structure exactly:

import pytest
import pytest_asyncio
from httpx import AsyncClient, ASGITransport
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.pool import StaticPool
from db.models import Base
from deps.db import get_db
from main import app

@pytest_asyncio.fixture
async def db_session():
    """In-memory async SQLite session for unit tests."""
    engine = create_async_engine(
        "sqlite+aiosqlite:///:memory:",
        connect_args={"check_same_thread": False},
        poolclass=StaticPool,
    )
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

    AsyncTestSession = async_sessionmaker(engine, expire_on_commit=False)
    async with AsyncTestSession() as session:
        yield session

    await engine.dispose()

@pytest_asyncio.fixture
async def client(db_session):
    """Async test client with DB dependency overridden."""
    app.dependency_overrides[get_db] = lambda: db_session
    async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c:
        yield c
    app.dependency_overrides.clear()

Note: aiosqlite must be added to requirements.txt for tests. Alternatively, pin to the real PostgreSQL test database via DATABASE_URL env var in integration tests.


backend/tests/test_health.py (test, request-response)

Analog: backend/tests/test_health.py (current, lines 15) — update in-place.

Current test (lines 15):

def test_health(client):
    resp = client.get("/health")
    assert resp.status_code == 200
    assert resp.json() == {"status": "ok"}

Extended pattern — keep the existing test function name; add new assertions for the richer response shape. Use the async/await style required by pytest-asyncio:

import pytest

async def test_health_ok(client):
    resp = await client.get("/health")
    assert resp.status_code == 200
    data = resp.json()
    assert data["status"] == "ok"

async def test_health_checks_postgres_and_minio(client):
    resp = await client.get("/health")
    data = resp.json()
    assert "checks" in data
    assert "postgres" in data["checks"]
    assert "minio" in data["checks"]
    assert data["checks"]["postgres"] == "ok"
    assert data["checks"]["minio"] == "ok"

backend/tests/test_documents.py (test, CRUD)

Analog: backend/tests/test_documents.py (current, lines 1108) — port to async.

Current sync pattern (lines 114):

def test_upload_txt_no_classify(client, sample_txt):
    with open(sample_txt, "rb") as f:
        resp = client.post(
            "/api/documents/upload",
            files={"file": ("sample.txt", f, "text/plain")},
            data={"auto_classify": "false"},
        )
    assert resp.status_code == 200

Port to async — change def to async def and client.post to await client.post:

async def test_upload_txt_no_classify(client, sample_txt):
    with open(sample_txt, "rb") as f:
        resp = await client.post(
            "/api/documents/upload",
            files={"file": ("sample.txt", f, "text/plain")},
            data={"auto_classify": "false"},
        )
    assert resp.status_code == 200
    data = resp.json()
    assert data["original_name"] == "sample.txt"

Keep all assertion logic from the current file — only the defasync def and client.verb()await client.verb() changes are needed. Add new tests for STORE-01 and STORE-02 requirements.


backend/tests/test_storage.py (test, file-I/O)

Analog: None in codebase — new file.

Pattern from RESEARCH.md Validation section (lines 10221028) and the MinIO key schema (D-06):

import pytest
import re

async def test_object_key_schema(db_session):
    """STORE-02: MinIO object key must match {user_id}/{document_id}/{uuid4}{ext}."""
    from storage.minio_backend import MinIOBackend
    # Use a mock or capture the key returned by put_object
    key = f"user-123/doc-456/{uuid.uuid4()}.pdf"
    pattern = re.compile(
        r'^[0-9a-f-]{36}/[0-9a-f-]{36}/[0-9a-f-]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\.[a-z]+$'
    )
    assert pattern.match(key)

async def test_filename_not_in_object_key():
    """STORE-02: Human-readable filename must NOT appear in the MinIO object key."""
    original_name = "invoice_Q3_2025.pdf"
    # The key returned by MinIOBackend.put_object must not contain the original name
    from storage.minio_backend import MinIOBackend
    # ... call with mock Minio client, assert key does not contain original_name
    assert original_name not in generated_key

docker/postgres/initdb.d/01-init-users.sql (config, batch)

Analog: None in codebase.

Pattern from RESEARCH.md Pattern 7 (lines 581599):

-- docker/postgres/initdb.d/01-init-users.sql
-- Runs as the POSTGRES_USER superuser on first container start only.

-- Migration user: DDL privileges (CREATE TABLE, ALTER TABLE, CREATE INDEX)
CREATE USER docuvault_migrate WITH PASSWORD 'PLACEHOLDER_MIGRATE_PASSWORD';
GRANT ALL PRIVILEGES ON DATABASE docuvault TO docuvault_migrate;

-- App user: runtime DML only (SELECT, INSERT, UPDATE, DELETE)
CREATE USER docuvault_app WITH PASSWORD 'PLACEHOLDER_APP_PASSWORD';
GRANT CONNECT ON DATABASE docuvault TO docuvault_app;

Important: Passwords here are Docker init-time placeholders. The actual passwords come from .env via docker-compose.yml environment vars. The init script runs once on empty volume — it cannot read env vars directly, so passwords must be hardcoded (and should match what's in .env).

The ALTER DEFAULT PRIVILEGES grant (for future tables created by Alembic) must be run inside the first Alembic migration (0001_initial_schema.py) using op.execute(), not in this init script — see RESEARCH.md Pattern 7 (lines 601603) and Pitfall 4.


backend/alembic.ini and backend/migrations/env.py (config, batch)

Analog: None in codebase.

alembic.ini key section (from RESEARCH.md Pattern 2, lines 328334):

[alembic]
script_location = migrations
sqlalchemy.url = %(DATABASE_MIGRATE_URL)s

migrations/env.py async pattern (from RESEARCH.md Pattern 2, lines 300327):

import asyncio
from sqlalchemy.ext.asyncio import async_engine_from_config
from sqlalchemy import pool
from alembic import context
from db.models import Base  # noqa: F401 — must import to register all models

target_metadata = Base.metadata

def do_run_migrations(connection):
    context.configure(connection=connection, target_metadata=target_metadata)
    with context.begin_transaction():
        context.run_migrations()

async def run_async_migrations():
    connectable = async_engine_from_config(
        config.get_section(config.config_ini_section, {}),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )
    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)
    await connectable.dispose()

def run_migrations_online():
    asyncio.run(run_async_migrations())

Generate the base file with alembic init -t async migrations — it produces this exact structure. Then add the from db.models import Base import and set target_metadata = Base.metadata.


Shared Patterns

Async/Await Convention

Source: backend/main.py (lines 1013), backend/api/documents.py (lines 2158) Apply to: All new db/, deps/, storage/, services/, tasks/ modules, all test files

All new code is async def. Synchronous SDK calls (MinIO) use asyncio.to_thread(). Celery task functions are the only exception: they must be plain def (see RESEARCH.md Pitfall: Celery tasks are synchronous).

None-on-not-found Contract

Source: backend/services/storage.py (lines 3438) Apply to: backend/services/storage.py (rewritten), backend/db/ query helpers

def get_metadata(doc_id: str) -> dict | None:
    ...
    if not path.exists():
        return None

Async ORM equivalent:

async def get_document(session: AsyncSession, doc_id: uuid.UUID) -> Document | None:
    return await session.get(Document, doc_id)

Return None for not-found; let the API layer raise HTTPException(404). Never raise exceptions from the service layer for expected missing-resource conditions.

HTTP Error Pattern

Source: backend/api/documents.py (lines 7477), backend/api/topics.py (lines 5759) Apply to: All API route handlers

if meta is None:
    raise HTTPException(404, "Document not found")

Use bare string messages (no detail= keyword) — consistent with existing code.

Classification Failure Non-Fatal Pattern

Source: backend/api/documents.py (lines 5056) Apply to: backend/api/documents.py (updated upload handler)

try:
    topics = await classifier.classify_document(saved["id"])
    meta["topics"] = topics
except Exception as e:
    meta["classification_error"] = str(e)  # classification failure is non-fatal

Document upload succeeds even if classification fails. Celery task failure equivalent: task enters FAILURE state but the document row remains with status="pending".

ABC + Factory Pattern

Source: backend/ai/base.py + backend/ai/__init__.py (lines 136) Apply to: backend/storage/base.py + backend/storage/__init__.py

This is the project's established pattern for pluggable backends. Follow it exactly: separate base.py (ABC), __init__.py (factory function get_X_backend()), concrete implementations in separate modules.


No Analog Found

Files with no close match in the codebase (planner should use RESEARCH.md patterns instead):

File Role Data Flow Reason
docker/postgres/initdb.d/01-init-users.sql config batch No SQL scripts exist in codebase; use RESEARCH.md Pattern 7
backend/celery_app.py config event-driven No task queue code exists; use RESEARCH.md Pattern 5
backend/alembic.ini config batch No Alembic config exists; generate with alembic init -t async
backend/migrations/env.py config batch No migrations exist; use alembic init -t async output + RESEARCH.md Pattern 2
backend/migrations/versions/0001_initial_schema.py migration batch No migrations exist; use full schema from RESEARCH.md Code Examples (lines 769908)
backend/tests/test_storage.py test file-I/O No object storage tests exist; new file per RESEARCH.md Validation section

Metadata

Analog search scope: backend/ (all .py files), docker-compose.yml, .env.example, backend/requirements.txt, backend/Dockerfile Files scanned: 25 Pattern extraction date: 2026-05-21