Research, pattern mapping, and verification complete. Walking Skeleton mode active (MVP Phase 1). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
38 KiB
Phase 1: Infrastructure Foundation - Pattern Map
Mapped: 2026-05-21 Files analyzed: 14 new/modified files Analogs found: 12 / 14
File Classification
| New/Modified File | Role | Data Flow | Closest Analog | Match Quality |
|---|---|---|---|---|
docker-compose.yml |
config | request-response | docker-compose.yml (current) |
exact — extend in-place |
docker/postgres/initdb.d/01-init-users.sql |
config | batch | none in codebase | no analog |
backend/db/session.py |
config | CRUD | backend/config.py (module-level setup pattern) |
partial |
backend/db/models.py |
model | CRUD | none in codebase | no analog (schema from RESEARCH.md) |
backend/deps/db.py |
utility | CRUD | backend/config.py (module-level constants pattern) |
partial |
backend/config.py |
config | request-response | backend/config.py (current) |
exact — extend in-place |
backend/main.py |
config | request-response | backend/main.py (current) |
exact — extend in-place |
backend/storage/base.py |
utility | request-response | backend/ai/base.py |
exact role-match |
backend/storage/__init__.py |
utility | request-response | backend/ai/__init__.py |
exact role-match |
backend/storage/minio_backend.py |
service | file-I/O | backend/ai/openai_provider.py |
role-match (ABC impl) |
backend/services/storage.py |
service | CRUD | backend/services/storage.py (current) |
exact — replace in-place |
backend/celery_app.py |
config | event-driven | none in codebase | no analog |
backend/tasks/document_tasks.py |
service | event-driven | backend/services/classifier.py |
role-match (orchestration) |
backend/api/documents.py |
controller | request-response | backend/api/documents.py (current) |
exact — update in-place |
backend/api/topics.py |
controller | request-response | backend/api/topics.py (current) |
exact — update in-place |
backend/requirements.txt |
config | — | backend/requirements.txt (current) |
exact — extend in-place |
.env.example |
config | — | .env.example (current) |
exact — extend in-place |
backend/tests/conftest.py |
test | CRUD | backend/tests/conftest.py (current) |
exact — update in-place |
backend/tests/test_health.py |
test | request-response | backend/tests/test_health.py (current) |
exact — update in-place |
backend/tests/test_documents.py |
test | CRUD | backend/tests/test_documents.py (current) |
exact — update in-place |
backend/tests/test_storage.py |
test | file-I/O | none in codebase | no analog (new) |
backend/alembic.ini |
config | — | none in codebase | no analog |
backend/migrations/env.py |
config | batch | none in codebase | no analog (pattern from RESEARCH.md) |
backend/migrations/versions/0001_initial_schema.py |
migration | batch | none in codebase | no analog (schema from RESEARCH.md) |
Pattern Assignments
docker-compose.yml (config, request-response)
Analog: docker-compose.yml (current, lines 1–26)
Existing service block pattern (lines 1–26 of current docker-compose.yml):
services:
backend:
build: ./backend
ports:
- "8000:8000"
volumes:
- ./backend/data:/app/data
- ./backend:/app
environment:
- DATA_DIR=/app/data
- PYTHONDONTWRITEBYTECODE=1
extra_hosts:
- "host.docker.internal:host-gateway"
command: uvicorn main:app --host 0.0.0.0 --port 8000 --reload
frontend:
build: ./frontend
ports:
- "5173:5173"
volumes:
- ./frontend/src:/app/src
- ./frontend/index.html:/app/index.html
depends_on:
- backend
command: npm run dev -- --host 0.0.0.0
New services to add — copy structure from RESEARCH.md Pattern 6 (lines 512–567):
postgres:
image: postgres:17-alpine
environment:
POSTGRES_DB: docuvault
POSTGRES_USER: postgres
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
volumes:
- postgres_data:/var/lib/postgresql/data
- ./docker/postgres/initdb.d:/docker-entrypoint-initdb.d:ro
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres -d docuvault"]
interval: 10s
timeout: 5s
retries: 5
start_period: 10s
minio:
image: minio/minio:latest
command: server /data --console-address ":9001"
environment:
MINIO_ROOT_USER: ${MINIO_ROOT_USER}
MINIO_ROOT_PASSWORD: ${MINIO_ROOT_PASSWORD}
ports:
- "9000:9000"
- "9001:9001"
volumes:
- minio_data:/data
healthcheck:
test: ["CMD", "mc", "ready", "local"]
interval: 10s
timeout: 5s
retries: 5
start_period: 15s
redis:
image: redis:7-alpine
command: redis-server --requirepass ${REDIS_PASSWORD}
healthcheck:
test: ["CMD", "redis-cli", "-a", "${REDIS_PASSWORD}", "ping"]
interval: 10s
timeout: 3s
retries: 5
celery-worker:
build: ./backend
command: celery -A celery_app worker --loglevel=info -Q documents
environment:
- DATABASE_URL=${DATABASE_URL}
- REDIS_URL=${REDIS_URL}
- MINIO_ENDPOINT=${MINIO_ENDPOINT}
- MINIO_ACCESS_KEY=${MINIO_ACCESS_KEY}
- MINIO_SECRET_KEY=${MINIO_SECRET_KEY}
- MINIO_BUCKET=${MINIO_BUCKET}
depends_on:
postgres:
condition: service_healthy
redis:
condition: service_healthy
minio:
condition: service_healthy
backend service update — add depends_on conditions:
backend:
...
environment:
- DATABASE_URL=${DATABASE_URL}
- DATABASE_MIGRATE_URL=${DATABASE_MIGRATE_URL}
- MINIO_ENDPOINT=${MINIO_ENDPOINT}
- MINIO_ACCESS_KEY=${MINIO_ACCESS_KEY}
- MINIO_SECRET_KEY=${MINIO_SECRET_KEY}
- MINIO_BUCKET=${MINIO_BUCKET}
- REDIS_URL=${REDIS_URL}
- PYTHONDONTWRITEBYTECODE=1
depends_on:
postgres:
condition: service_healthy
minio:
condition: service_healthy
redis:
condition: service_healthy
Remove the volumes: entry for ./backend/data:/app/data — flat-file storage is deleted (D-04).
Add named volumes block at end of file:
volumes:
postgres_data:
minio_data:
backend/config.py (config, request-response)
Analog: backend/config.py (current, lines 1–52)
Existing pattern (lines 1–10 — module-level constants, NOT Pydantic Settings):
import json
import os
from pathlib import Path
DATA_DIR = Path(os.environ.get("DATA_DIR", "/app/data"))
UPLOADS_DIR = DATA_DIR / "uploads"
METADATA_DIR = DATA_DIR / "metadata"
TOPICS_FILE = DATA_DIR / "topics.json"
SETTINGS_FILE = DATA_DIR / "settings.json"
Replace entirely with Pydantic Settings (per RESEARCH.md Code Examples, lines 914–937).
The existing config.py does not use pydantic-settings — Phase 1 introduces it. The pattern to follow is the RESEARCH.md example, not the current file. Keep the DEFAULT_SYSTEM_PROMPT and DEFAULT_SETTINGS constants for backward compatibility during the transition; remove ensure_data_dirs() and all path constants once services/storage.py is replaced.
New pattern:
# backend/config.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
# Legacy — keep during transition, remove after storage.py rewrite
data_dir: str = "/app/data"
# Phase 1 additions
database_url: str = "postgresql+psycopg://docuvault_app:changeme@postgres/docuvault"
database_migrate_url: str = "postgresql+psycopg://docuvault_migrate:changeme@postgres/docuvault"
minio_endpoint: str = "minio:9000"
minio_access_key: str = "docuvault_app"
minio_secret_key: str = "changeme"
minio_bucket: str = "docuvault"
redis_url: str = "redis://:changeme@redis:6379/0"
secret_key: str = "CHANGEME" # documented for Phase 2; not read in Phase 1
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
settings = Settings()
Note: pydantic-settings is already in requirements.txt (line 4). No new dependency needed.
backend/main.py (config, request-response)
Analog: backend/main.py (current, lines 1–34)
Existing lifespan pattern (lines 10–14):
from contextlib import asynccontextmanager
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
ensure_data_dirs()
yield
Extend lifespan — replace ensure_data_dirs() call with engine setup and MinIO bucket init. Copy the asynccontextmanager + yield structure exactly:
from contextlib import asynccontextmanager
import asyncio
from fastapi import FastAPI
from minio import Minio
from db.session import engine
from config import settings
@asynccontextmanager
async def lifespan(app: FastAPI):
# MinIO bucket initialization
minio_client = Minio(
settings.minio_endpoint,
access_key=settings.minio_access_key,
secret_key=settings.minio_secret_key,
secure=False,
)
exists = await asyncio.to_thread(minio_client.bucket_exists, settings.minio_bucket)
if not exists:
await asyncio.to_thread(minio_client.make_bucket, settings.minio_bucket)
app.state.minio = minio_client
yield
# Shutdown: close all pooled connections
await engine.dispose()
Extend /health endpoint — keep existing route signature @app.get("/health") and async def health(), extend the body:
@app.get("/health")
async def health(request: Request):
checks = {}
# PostgreSQL probe
try:
async with AsyncSessionLocal() as session:
await session.execute(text("SELECT 1"))
checks["postgres"] = "ok"
except Exception as e:
checks["postgres"] = f"error: {e}"
# MinIO probe
try:
ok = await asyncio.to_thread(request.app.state.minio.bucket_exists, settings.minio_bucket)
checks["minio"] = "ok" if ok else "bucket missing"
except Exception as e:
checks["minio"] = f"error: {e}"
overall = "ok" if all(v == "ok" for v in checks.values()) else "degraded"
return {"status": overall, "checks": checks}
backend/db/session.py (config, CRUD)
Analog: None exact. Closest structural analog is backend/config.py (module-level initialization pattern at lines 1–10).
Pattern from RESEARCH.md Pattern 1 (lines 240–266):
# backend/db/session.py
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from config import settings
engine = create_async_engine(
settings.database_url, # postgresql+psycopg://docuvault_app:...@postgres/docuvault
pool_pre_ping=True, # detect stale connections before use
echo=False,
)
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False, # prevent MissingGreenlet errors after commit
)
Key rule: expire_on_commit=False is mandatory — see RESEARCH.md Pitfall 1.
backend/deps/db.py (utility, CRUD)
Analog: None exact. The dependency injection yield pattern mirrors how backend/tests/conftest.py yields fixtures (lines 13–43).
Pattern from RESEARCH.md Pattern 1 (lines 258–266):
# backend/deps/db.py
from db.session import AsyncSessionLocal
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()
Use as a FastAPI dependency: session: AsyncSession = Depends(get_db).
backend/db/models.py (model, CRUD)
Analog: None in codebase. The full schema is specified in RESEARCH.md Code Examples (lines 769–908).
Import block to copy:
import uuid
from datetime import datetime, timezone
from sqlalchemy import (
Boolean, BigInteger, ForeignKey, Index, String, Text,
TIMESTAMP, UniqueConstraint, Integer
)
from sqlalchemy.dialects.postgresql import UUID, INET, JSONB
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy.sql import func
Base class pattern:
class Base(DeclarativeBase):
pass
Critical D-03: Document.user_id must be nullable=True in Phase 1:
user_id: Mapped[uuid.UUID | None] = mapped_column(
UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"), nullable=True
)
Use the full schema from RESEARCH.md lines 788–908 verbatim — it was designed to be implementation-ready.
backend/storage/base.py (utility, request-response)
Analog: backend/ai/base.py (lines 1–33) — exact structural match.
ABC pattern from backend/ai/base.py (lines 1–33):
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
class AIProvider(ABC):
@abstractmethod
async def classify(self, ...) -> ClassificationResult: ...
@abstractmethod
async def health_check(self) -> bool: ...
Apply same structure for StorageBackend. The health_check() abstract method is already present in ai/base.py (line 31) — mirror it exactly in StorageBackend. Method signatures from RESEARCH.md Pattern 8 (lines 617–640):
# backend/storage/base.py
from abc import ABC, abstractmethod
class StorageBackend(ABC):
@abstractmethod
async def put_object(
self, user_id: str, document_id: str,
file_bytes: bytes, extension: str, content_type: str,
) -> str:
"""Store object; return the object_key used."""
@abstractmethod
async def get_object(self, object_key: str) -> bytes:
"""Retrieve object bytes by key."""
@abstractmethod
async def delete_object(self, object_key: str) -> None:
"""Delete object by key."""
@abstractmethod
async def presigned_get_url(self, object_key: str, expires_minutes: int = 60) -> str:
"""Return a time-limited download URL."""
@abstractmethod
async def health_check(self) -> bool:
"""Return True if backend is reachable."""
backend/storage/__init__.py (utility, request-response)
Analog: backend/ai/__init__.py (lines 1–36) — exact structural match.
Factory pattern from backend/ai/__init__.py (lines 1–10 and 8–36):
from ai.base import AIProvider, ClassificationResult
from ai.anthropic_provider import AnthropicProvider
# ... more imports
def get_provider(settings: dict) -> AIProvider:
active = settings.get("active_provider", "lmstudio")
match active:
case "anthropic":
return AnthropicProvider(...)
case _:
raise ValueError(f"Unknown AI provider: {active}")
Apply same factory pattern for storage. Phase 1 has only one backend (MinIO), so the match can be omitted initially, but the factory function signature is mandatory:
# backend/storage/__init__.py
from config import settings
from storage.minio_backend import MinIOBackend
from storage.base import StorageBackend
def get_storage_backend() -> StorageBackend:
return MinIOBackend(
endpoint=settings.minio_endpoint,
access_key=settings.minio_access_key,
secret_key=settings.minio_secret_key,
bucket=settings.minio_bucket,
secure=False,
)
backend/storage/minio_backend.py (service, file-I/O)
Analog: backend/ai/openai_provider.py (lines 1–104) — same ABC-implementation pattern.
ABC implementation pattern from backend/ai/openai_provider.py (lines 9–70):
class OpenAIProvider(AIProvider):
def __init__(self, api_key: str, model: str = "gpt-4o", base_url: str | None = None):
self._api_key = api_key
self._model = model
self._base_url = base_url
def _client(self) -> AsyncOpenAI:
return AsyncOpenAI(api_key=self._api_key or "placeholder", base_url=self._base_url)
async def health_check(self) -> bool:
try:
await self._client().chat.completions.create(...)
return True
except Exception:
return False
Copy this structure: __init__ stores config, private _client attribute holds SDK instance, every method is async def, health_check wraps in try/except returning bool.
Key difference from AI providers: MinIO SDK is synchronous — all calls must be wrapped in asyncio.to_thread(). Copy the wrapping pattern from RESEARCH.md Pattern 3 (lines 349–403):
import asyncio
import io
import uuid
class MinIOBackend(StorageBackend):
def __init__(self, endpoint, access_key, secret_key, bucket, secure=False):
self._client = Minio(endpoint=endpoint, access_key=access_key,
secret_key=secret_key, secure=secure)
self._bucket = bucket
async def put_object(self, user_id, document_id, file_bytes, extension, content_type) -> str:
object_key = f"{user_id}/{document_id}/{uuid.uuid4()}{extension}"
data = io.BytesIO(file_bytes) # BytesIO() constructor sets pointer at 0 — no seek(0) needed
await asyncio.to_thread(
self._client.put_object,
self._bucket, object_key, data, length=len(file_bytes), content_type=content_type,
)
return object_key
async def health_check(self) -> bool:
try:
return await asyncio.to_thread(self._client.bucket_exists, self._bucket)
except Exception:
return False
backend/services/storage.py (service, CRUD)
Analog: backend/services/storage.py (current, lines 1–188) — replace entirely.
Current pattern shows the data-access interface that api/documents.py depends on (lines 18–95). The new implementation must preserve the same function signatures where possible to minimize changes in api/documents.py. The new storage.py is a thin orchestrator: it calls db/session.py for ORM operations and storage/minio_backend.py for object storage.
New async signatures to match existing callers in api/documents.py (lines 32–57):
# Old (sync): storage.save_upload(content, file.filename, mime)
# New (async): await storage.save_upload(content, file.filename, mime)
# Old (sync): storage.save_metadata(meta)
# New (async): await storage.save_metadata(meta) — or merged into save_upload
# Old (sync): storage.list_metadata(topic=topic)
# New (async): await storage.list_metadata(topic=topic)
# Old (sync): storage.get_metadata(doc_id)
# New (async): await storage.get_metadata(doc_id)
# Old (sync): storage.delete_document(doc_id)
# New (async): await storage.delete_document(doc_id)
Session injection pattern: New storage.py functions accept an AsyncSession parameter (injected by the FastAPI dependency via Depends(get_db)), not create their own. This mirrors how the classifier calls storage functions with state passed in.
Error handling from current storage.py (lines 34–38 — return None for not-found, not exceptions):
def get_metadata(doc_id: str) -> dict | None:
path = METADATA_DIR / f"{doc_id}.json"
if not path.exists():
return None
return json.loads(path.read_text())
Keep the same None-on-not-found contract in the async ORM version so api/documents.py if meta is None: raise HTTPException(404, ...) checks continue to work unchanged.
backend/celery_app.py (config, event-driven)
Analog: None in codebase.
Pattern from RESEARCH.md Pattern 5 (lines 462–475):
# backend/celery_app.py
import os
from celery import Celery
celery_app = Celery("docuvault")
celery_app.conf.broker_url = os.environ.get("REDIS_URL", "redis://redis:6379/0")
celery_app.conf.result_backend = os.environ.get("REDIS_URL", "redis://redis:6379/0")
celery_app.conf.task_serializer = "json"
celery_app.conf.result_serializer = "json"
celery_app.conf.accept_content = ["json"]
celery_app.conf.task_routes = {
"tasks.document_tasks.*": {"queue": "documents"},
}
Critical: Use os.environ.get() directly here, NOT from config import settings. config.py imports pydantic-settings, which may trigger FastAPI-related imports. Keep celery_app.py minimal to avoid Pitfall 7 (circular imports with the FastAPI app).
backend/tasks/document_tasks.py (service, event-driven)
Analog: backend/services/classifier.py (lines 1–59) — same orchestration pattern (load metadata, load settings, call services, persist results).
Orchestration pattern from backend/services/classifier.py (lines 11–46):
async def classify_document(doc_id: str, topic_names: list[str] | None = None) -> list[str]:
meta = storage.get_metadata(doc_id)
if meta is None:
raise ValueError(f"Document {doc_id} not found")
settings = storage.load_settings()
provider = get_provider(settings)
text = meta.get("extracted_text", "")
result = await provider.classify(text[:MAX_AI_CHARS], topic_names, system_prompt)
# ... persist results
storage.update_document_topics(doc_id, final_topics)
return final_topics
Apply same orchestration structure for the Celery task, with three critical differences:
- Task function must be
def, notasync def(Celery workers have no asyncio event loop) - Import services directly — never import from
main.pyor any router module - Use
asyncio.run()to call async service functions if unavoidable
# backend/tasks/document_tasks.py
from celery_app import celery_app
@celery_app.task(name="tasks.document_tasks.extract_and_classify")
def extract_and_classify(document_id: str) -> dict:
import asyncio
from services import extractor, classifier
# ... call services, persist results
return {"document_id": document_id, "status": "classified"}
Replace in api/documents.py (lines 49–56):
# Old:
if auto_classify:
topics = await classifier.classify_document(saved["id"])
# New:
from tasks.document_tasks import extract_and_classify
extract_and_classify.delay(str(saved_doc.id))
backend/api/documents.py (controller, request-response)
Analog: backend/api/documents.py (current, lines 1–102) — update in-place.
Existing route structure to preserve (lines 21–58):
@router.post("/upload")— keep signature(file: UploadFile, auto_classify: bool)@router.get("")— keep pagination params(topic, page, per_page)@router.get("/{doc_id}")— keep path param@router.delete("/{doc_id}")— keep path param@router.post("/{doc_id}/classify")— keep path param + body
Session injection change — current (lines 1–4):
from services import storage, extractor, classifier
New — add session dependency:
from fastapi import APIRouter, UploadFile, File, Form, HTTPException, Query, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from deps.db import get_db
from services import storage, extractor
from tasks.document_tasks import extract_and_classify
Add session parameter to route handlers:
@router.post("/upload")
async def upload_document(
file: UploadFile = File(...),
auto_classify: bool = Form(True),
session: AsyncSession = Depends(get_db), # NEW
):
Error handling pattern (lines 50–56 — keep unchanged):
try:
topics = await classifier.classify_document(saved["id"])
meta["topics"] = topics
except Exception as e:
meta["classification_error"] = str(e) # classification failure is non-fatal
HTTP error pattern (lines 75–77 — keep unchanged):
if meta is None:
raise HTTPException(404, "Document not found")
backend/api/topics.py (controller, request-response)
Analog: backend/api/topics.py (current, lines 1–73) — update in-place.
Existing Pydantic model pattern (lines 8–19):
class TopicCreate(BaseModel):
name: str
description: str = ""
color: str = "#6366f1"
class TopicUpdate(BaseModel):
name: str | None = None
description: str | None = None
color: str | None = None
Keep these models unchanged — they match the PostgreSQL topics table columns.
Storage call pattern (lines 26–30):
@router.get("")
async def list_topics():
topics = storage.load_topics()
counts = storage.topic_doc_counts()
Update to inject session: AsyncSession = Depends(get_db) and call async ORM queries instead of flat-file storage functions. Response shape must remain identical ({"topics": [...]} with doc_count appended per topic).
backend/requirements.txt (config)
Analog: backend/requirements.txt (current, lines 1–16)
Current file (lines 1–16):
fastapi>=0.111
uvicorn[standard]>=0.29
python-multipart
pydantic-settings>=2.2
anthropic>=0.26
openai>=1.30
PyMuPDF>=1.24
python-docx>=1.1
pytesseract>=0.3
Pillow>=10.3
filelock>=3.14 # REMOVE — replaced by PostgreSQL transactions
aiofiles>=23.2
httpx>=0.27
pytest>=8.2
pytest-asyncio>=0.23
Additions (append to file):
sqlalchemy[asyncio]>=2.0
psycopg[binary]>=3.3
alembic>=1.13
minio>=7.2
celery[redis]>=5.4
redis>=7.0
Remove: filelock>=3.14 — no longer needed once services/storage.py is replaced (RESEARCH.md line 952).
.env.example (config)
Analog: .env.example (current, lines 1–6)
Current file (lines 1–6):
# Copy to .env and fill in as needed.
ANTHROPIC_API_KEY=
OPENAI_API_KEY=
Extend with all Phase 1 vars (D-11, D-13, D-15, D-16). Keep existing vars at top. Pattern: group by service, comment each variable:
# ── PostgreSQL ───────────────────────────────────────────────────────────────
# App user (restricted: SELECT/INSERT/UPDATE/DELETE only — used by FastAPI + Celery)
DATABASE_URL=postgresql+psycopg://docuvault_app:changeme@postgres:5432/docuvault
# Migration user (DDL privileges — used ONLY by Alembic, never by the app at runtime)
DATABASE_MIGRATE_URL=postgresql+psycopg://docuvault_migrate:changeme@postgres:5432/docuvault
# Superuser password for the postgres init container (used only by initdb.d scripts)
POSTGRES_PASSWORD=changeme
# ── MinIO ────────────────────────────────────────────────────────────────────
MINIO_ROOT_USER=minioadmin
MINIO_ROOT_PASSWORD=changeme
MINIO_ENDPOINT=minio:9000
# App-level access key (minimal permissions: read/write on docuvault bucket only)
MINIO_ACCESS_KEY=docuvault_app
MINIO_SECRET_KEY=changeme
MINIO_BUCKET=docuvault
# ── Redis ────────────────────────────────────────────────────────────────────
REDIS_PASSWORD=changeme
REDIS_URL=redis://:changeme@redis:6379/0
# ── Security (Phase 2) ───────────────────────────────────────────────────────
# Not read by the app in Phase 1. Documented here for Phase 2 JWT + HKDF use.
SECRET_KEY=CHANGEME-replace-with-64-char-random-hex
backend/tests/conftest.py (test, CRUD)
Analog: backend/tests/conftest.py (current, lines 1–71) — update in-place.
Current fixture pattern (lines 13–43):
@pytest.fixture(autouse=True)
def isolated_data_dir(monkeypatch, tmp_path):
"""Each test gets its own clean data directory."""
data_dir = tmp_path / "data"
...
monkeypatch.setenv("DATA_DIR", str(data_dir))
import config
monkeypatch.setattr(config, "DATA_DIR", data_dir)
...
yield data_dir
New async session fixture — replace isolated_data_dir with an async SQLite in-memory engine for unit tests, and keep a separate fixture for integration tests using the real Docker database. Copy the yield + teardown structure exactly:
import pytest
import pytest_asyncio
from httpx import AsyncClient, ASGITransport
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.pool import StaticPool
from db.models import Base
from deps.db import get_db
from main import app
@pytest_asyncio.fixture
async def db_session():
"""In-memory async SQLite session for unit tests."""
engine = create_async_engine(
"sqlite+aiosqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
AsyncTestSession = async_sessionmaker(engine, expire_on_commit=False)
async with AsyncTestSession() as session:
yield session
await engine.dispose()
@pytest_asyncio.fixture
async def client(db_session):
"""Async test client with DB dependency overridden."""
app.dependency_overrides[get_db] = lambda: db_session
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c:
yield c
app.dependency_overrides.clear()
Note: aiosqlite must be added to requirements.txt for tests. Alternatively, pin to the real PostgreSQL test database via DATABASE_URL env var in integration tests.
backend/tests/test_health.py (test, request-response)
Analog: backend/tests/test_health.py (current, lines 1–5) — update in-place.
Current test (lines 1–5):
def test_health(client):
resp = client.get("/health")
assert resp.status_code == 200
assert resp.json() == {"status": "ok"}
Extended pattern — keep the existing test function name; add new assertions for the richer response shape. Use the async/await style required by pytest-asyncio:
import pytest
async def test_health_ok(client):
resp = await client.get("/health")
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "ok"
async def test_health_checks_postgres_and_minio(client):
resp = await client.get("/health")
data = resp.json()
assert "checks" in data
assert "postgres" in data["checks"]
assert "minio" in data["checks"]
assert data["checks"]["postgres"] == "ok"
assert data["checks"]["minio"] == "ok"
backend/tests/test_documents.py (test, CRUD)
Analog: backend/tests/test_documents.py (current, lines 1–108) — port to async.
Current sync pattern (lines 1–14):
def test_upload_txt_no_classify(client, sample_txt):
with open(sample_txt, "rb") as f:
resp = client.post(
"/api/documents/upload",
files={"file": ("sample.txt", f, "text/plain")},
data={"auto_classify": "false"},
)
assert resp.status_code == 200
Port to async — change def to async def and client.post to await client.post:
async def test_upload_txt_no_classify(client, sample_txt):
with open(sample_txt, "rb") as f:
resp = await client.post(
"/api/documents/upload",
files={"file": ("sample.txt", f, "text/plain")},
data={"auto_classify": "false"},
)
assert resp.status_code == 200
data = resp.json()
assert data["original_name"] == "sample.txt"
Keep all assertion logic from the current file — only the def→async def and client.verb()→await client.verb() changes are needed. Add new tests for STORE-01 and STORE-02 requirements.
backend/tests/test_storage.py (test, file-I/O)
Analog: None in codebase — new file.
Pattern from RESEARCH.md Validation section (lines 1022–1028) and the MinIO key schema (D-06):
import pytest
import re
async def test_object_key_schema(db_session):
"""STORE-02: MinIO object key must match {user_id}/{document_id}/{uuid4}{ext}."""
from storage.minio_backend import MinIOBackend
# Use a mock or capture the key returned by put_object
key = f"user-123/doc-456/{uuid.uuid4()}.pdf"
pattern = re.compile(
r'^[0-9a-f-]{36}/[0-9a-f-]{36}/[0-9a-f-]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\.[a-z]+$'
)
assert pattern.match(key)
async def test_filename_not_in_object_key():
"""STORE-02: Human-readable filename must NOT appear in the MinIO object key."""
original_name = "invoice_Q3_2025.pdf"
# The key returned by MinIOBackend.put_object must not contain the original name
from storage.minio_backend import MinIOBackend
# ... call with mock Minio client, assert key does not contain original_name
assert original_name not in generated_key
docker/postgres/initdb.d/01-init-users.sql (config, batch)
Analog: None in codebase.
Pattern from RESEARCH.md Pattern 7 (lines 581–599):
-- docker/postgres/initdb.d/01-init-users.sql
-- Runs as the POSTGRES_USER superuser on first container start only.
-- Migration user: DDL privileges (CREATE TABLE, ALTER TABLE, CREATE INDEX)
CREATE USER docuvault_migrate WITH PASSWORD 'PLACEHOLDER_MIGRATE_PASSWORD';
GRANT ALL PRIVILEGES ON DATABASE docuvault TO docuvault_migrate;
-- App user: runtime DML only (SELECT, INSERT, UPDATE, DELETE)
CREATE USER docuvault_app WITH PASSWORD 'PLACEHOLDER_APP_PASSWORD';
GRANT CONNECT ON DATABASE docuvault TO docuvault_app;
Important: Passwords here are Docker init-time placeholders. The actual passwords come from .env via docker-compose.yml environment vars. The init script runs once on empty volume — it cannot read env vars directly, so passwords must be hardcoded (and should match what's in .env).
The ALTER DEFAULT PRIVILEGES grant (for future tables created by Alembic) must be run inside the first Alembic migration (0001_initial_schema.py) using op.execute(), not in this init script — see RESEARCH.md Pattern 7 (lines 601–603) and Pitfall 4.
backend/alembic.ini and backend/migrations/env.py (config, batch)
Analog: None in codebase.
alembic.ini key section (from RESEARCH.md Pattern 2, lines 328–334):
[alembic]
script_location = migrations
sqlalchemy.url = %(DATABASE_MIGRATE_URL)s
migrations/env.py async pattern (from RESEARCH.md Pattern 2, lines 300–327):
import asyncio
from sqlalchemy.ext.asyncio import async_engine_from_config
from sqlalchemy import pool
from alembic import context
from db.models import Base # noqa: F401 — must import to register all models
target_metadata = Base.metadata
def do_run_migrations(connection):
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations():
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_online():
asyncio.run(run_async_migrations())
Generate the base file with alembic init -t async migrations — it produces this exact structure. Then add the from db.models import Base import and set target_metadata = Base.metadata.
Shared Patterns
Async/Await Convention
Source: backend/main.py (lines 10–13), backend/api/documents.py (lines 21–58)
Apply to: All new db/, deps/, storage/, services/, tasks/ modules, all test files
All new code is async def. Synchronous SDK calls (MinIO) use asyncio.to_thread(). Celery task functions are the only exception: they must be plain def (see RESEARCH.md Pitfall: Celery tasks are synchronous).
None-on-not-found Contract
Source: backend/services/storage.py (lines 34–38)
Apply to: backend/services/storage.py (rewritten), backend/db/ query helpers
def get_metadata(doc_id: str) -> dict | None:
...
if not path.exists():
return None
Async ORM equivalent:
async def get_document(session: AsyncSession, doc_id: uuid.UUID) -> Document | None:
return await session.get(Document, doc_id)
Return None for not-found; let the API layer raise HTTPException(404). Never raise exceptions from the service layer for expected missing-resource conditions.
HTTP Error Pattern
Source: backend/api/documents.py (lines 74–77), backend/api/topics.py (lines 57–59)
Apply to: All API route handlers
if meta is None:
raise HTTPException(404, "Document not found")
Use bare string messages (no detail= keyword) — consistent with existing code.
Classification Failure Non-Fatal Pattern
Source: backend/api/documents.py (lines 50–56)
Apply to: backend/api/documents.py (updated upload handler)
try:
topics = await classifier.classify_document(saved["id"])
meta["topics"] = topics
except Exception as e:
meta["classification_error"] = str(e) # classification failure is non-fatal
Document upload succeeds even if classification fails. Celery task failure equivalent: task enters FAILURE state but the document row remains with status="pending".
ABC + Factory Pattern
Source: backend/ai/base.py + backend/ai/__init__.py (lines 1–36)
Apply to: backend/storage/base.py + backend/storage/__init__.py
This is the project's established pattern for pluggable backends. Follow it exactly: separate base.py (ABC), __init__.py (factory function get_X_backend()), concrete implementations in separate modules.
No Analog Found
Files with no close match in the codebase (planner should use RESEARCH.md patterns instead):
| File | Role | Data Flow | Reason |
|---|---|---|---|
docker/postgres/initdb.d/01-init-users.sql |
config | batch | No SQL scripts exist in codebase; use RESEARCH.md Pattern 7 |
backend/celery_app.py |
config | event-driven | No task queue code exists; use RESEARCH.md Pattern 5 |
backend/alembic.ini |
config | batch | No Alembic config exists; generate with alembic init -t async |
backend/migrations/env.py |
config | batch | No migrations exist; use alembic init -t async output + RESEARCH.md Pattern 2 |
backend/migrations/versions/0001_initial_schema.py |
migration | batch | No migrations exist; use full schema from RESEARCH.md Code Examples (lines 769–908) |
backend/tests/test_storage.py |
test | file-I/O | No object storage tests exist; new file per RESEARCH.md Validation section |
Metadata
Analog search scope: backend/ (all .py files), docker-compose.yml, .env.example, backend/requirements.txt, backend/Dockerfile
Files scanned: 25
Pattern extraction date: 2026-05-21