Research, pattern mapping, and verification complete. Walking Skeleton mode active (MVP Phase 1). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
61 KiB
Phase 1: Infrastructure Foundation - Research
Researched: 2026-05-21 Domain: PostgreSQL + MinIO + Redis + Celery wired into FastAPI via Docker Compose; Alembic async migrations; storage service rewrite Confidence: HIGH
<user_constraints>
User Constraints (from CONTEXT.md)
Locked Decisions
Schema Scope
- D-01: Phase 1 initial Alembic migration creates the full v1 skeleton — all tables:
users,refresh_tokens,quotas,documents,topics,folders,shares,audit_log,cloud_connections. Subsequent phases add data and constraints, not new tables. - D-02:
groupstable stub included in Phase 1 migration (v2 feature; empty table, correct columns and FKs). - D-03:
documents.user_idis nullable in Phase 1 (no auth system yet). Phase 2 migration adds the NOT NULL constraint after the user/auth system is live. - D-04: Existing
data/directory contents (flat-file JSON metadata + uploaded files) are deleted in Phase 1. Test data only — no migration script needed.
App Wiring
- D-05: Phase 1 switches the storage service layer to PostgreSQL + MinIO.
backend/services/storage.pyis rewritten to use async SQLAlchemy + MinIO SDK. The app does not continue using the filesystem after Phase 1. - D-06: Single MinIO bucket named
docuvault. Object keys follow{user_id}/{document_id}/{uuid4()}{ext}(STORE-02). Human-readable filenames stored in thedocuments.filenameDB column only — never in the MinIO key. - D-07:
backend/main.py/healthendpoint extended to check PostgreSQL + MinIO connectivity (not just{"status": "ok"}). Health checks gatedocker compose upreadiness.
Background Worker
- D-08: Background task queue: Celery + Redis (STORE-08). FastAPI
BackgroundTasksreplaced. - D-09: Redis service added to
docker-compose.ymlin Phase 1. Redis doubles as the rate-limiting store for Phase 2 auth endpoints — no second Redis needed later. - D-10: A
celery-workerservice is added todocker-compose.yml. Celery broker and result backend both point to the same Redis instance viaREDIS_URL.
Env / Secrets Strategy
- D-11:
.envgitignored +.env.examplecommitted.docker-compose.ymlreads vars via${VAR_NAME}..env.examplehas safe placeholder values and comments explaining each variable. - D-12: Production secrets stored outside the project directory at
/etc/docuvault/env(chmod 600, owned by the service user, not root).docker-compose.ymlreferences it viaenv_file:. Documented in deployment notes. - D-13: Two PostgreSQL DSNs:
DATABASE_URL(restricted app userdocuvault_app, SELECT/INSERT/UPDATE/DELETE only; no DDL) andDATABASE_MIGRATE_URL(migration userdocuvault_migrate, DDL privileges; used only by Alembic). - D-14: PostgreSQL init script in
docker/postgres/initdb.d/provisions both users on first container start. The app never connects as the PostgreSQL superuser. - D-15: MinIO vars:
MINIO_ENDPOINT,MINIO_ROOT_USER,MINIO_ROOT_PASSWORD(init only),MINIO_BUCKET(value:docuvault),MINIO_ACCESS_KEY,MINIO_SECRET_KEY(separate app-level access key pair with minimal bucket permissions). - D-16: Additional vars in Phase 1
.env.example:REDIS_URL,SECRET_KEY(documented now for Phase 2 JWT + HKDF use; app does not read it in Phase 1).
Claude's Discretion
None — user made explicit choices for all areas.
Deferred Ideas (OUT OF SCOPE)
None — discussion stayed within phase scope. </user_constraints>
<phase_requirements>
Phase Requirements
| ID | Description | Research Support |
|---|---|---|
| STORE-01 | Platform storage layer migrated from flat-file JSON + local filesystem to PostgreSQL (metadata) + MinIO (objects) | SQLAlchemy 2.0 async ORM + MinIO SDK patterns documented; service rewrite approach confirmed |
| STORE-02 | Each user's MinIO objects use {user_id}/{document_id}/{uuid4()}{ext} keys — human-readable filenames stored in DB only |
MinIO put_object() API confirmed; key schema enforced in model/service layer |
| STORE-07 | Backend is stateless — no per-instance file locks; multiple instances can run behind a load balancer | PostgreSQL atomic UPDATE + Celery + Redis replaces filelock pattern; verified |
| </phase_requirements> |
Summary
Phase 1 replaces the entire flat-file persistence layer (JSON metadata + local filesystem uploads) with PostgreSQL (via SQLAlchemy 2.0 async ORM) + MinIO (via the official Python SDK) wired into Docker Compose. Redis and a Celery worker are added alongside as the background task queue that replaces FastAPI BackgroundTasks, delivering statelessness required by STORE-07. All infrastructure services are health-checked and ordered via depends_on conditions so docker compose up can be treated as the single operational command. Alembic manages the schema using the async migration template with a two-DSN strategy (restricted app user + DDL migration user). The walking skeleton requirement is satisfied by: the full v1 schema applied via Alembic, one real document upload persisted to PostgreSQL and MinIO through the rewritten storage service, and the /health endpoint returning live connectivity checks for all three services.
The existing single-user document upload → text extraction → AI classification workflow continues to work end-to-end after Phase 1. The Vue frontend requires no changes. All API routes and response shapes are preserved.
Primary recommendation: Wire infrastructure with Docker Compose health checks first; apply Alembic migration second; rewrite services/storage.py third; replace BackgroundTasks with Celery tasks last. This ordering allows each layer to be verified before the next is built.
Architectural Responsibility Map
| Capability | Primary Tier | Secondary Tier | Rationale |
|---|---|---|---|
| Document metadata persistence | Database / Storage (PostgreSQL) | API / Backend | All metadata is authored and read server-side; no client involvement |
| Binary file storage | Database / Storage (MinIO) | API / Backend | Object store owns bytes; backend generates keys and proxies operations |
| Background text extraction + classification | Background Worker (Celery) | API / Backend | CPU-intensive, deferred; must not block HTTP event loop |
| Health checking | API / Backend | Docker Compose | FastAPI /health probes PostgreSQL + MinIO; Compose waits on it |
| Schema migrations | Database / Storage (Alembic + PostgreSQL) | — | DDL-only responsibility; executed before app starts |
| Object key namespacing | API / Backend (service layer) | — | Key construction is a code concern, not a storage concern |
| Service ordering / startup sequencing | CDN / Static (Docker Compose) | — | depends_on: condition: service_healthy enforces boot order |
| Connection pooling | API / Backend (SQLAlchemy pool) | Database / Storage | App holds pool; PostgreSQL is the pooled resource |
| Task queue / broker | Background Worker (Redis / Celery) | API / Backend | Broker is Redis; workers are separate Docker Compose services |
Standard Stack
Core
| Library | Version | Purpose | Why Standard |
|---|---|---|---|
sqlalchemy[asyncio] |
>=2.0.49 |
ORM + async engine + connection pool | Industry standard for Python async PostgreSQL; create_async_engine + async_sessionmaker pattern is the canonical FastAPI integration |
psycopg[binary] |
>=3.3.4 |
PostgreSQL async driver | psycopg v3 (psycopg) is SQLAlchemy 2.0's preferred async dialect; [binary] provides pre-built wheels with no system dependency on libpq headers |
alembic |
>=1.18.4 |
Database migrations | The only maintained migration tool for SQLAlchemy; provides async template (alembic init -t async) |
minio |
>=7.2.20 |
MinIO / S3 object storage SDK | Official MinIO Python SDK; stable API for put_object, get_object, bucket_exists, presigned_get_object |
celery[redis] |
>=5.6.3 |
Background task queue + Redis transport | Battle-tested distributed task queue; [redis] extra installs redis client; replaces per-instance BackgroundTasks |
redis |
>=7.4.0 |
Redis Python client (Celery dependency + Phase 2 rate limiting) | Official Redis client; installed transitively by celery[redis] but worth pinning for Phase 2 rate limiting use |
Supporting
| Library | Version | Purpose | When to Use |
|---|---|---|---|
pydantic-settings |
>=2.2 |
Env var configuration (already in project) | Extended with new DATABASE_URL, MINIO_*, REDIS_URL vars |
anyio |
>=4.13.0 |
Async testing utilities | Required by httpx for async test transport in pytest |
httpx |
>=0.28.1 |
Async HTTP client for integration tests | Needed to replace TestClient (sync) with AsyncClient for async route testing |
pytest-asyncio |
>=1.3.0 |
Async test runner integration | Already in project as >=0.23; upgrade to >=1.3.0 for asyncio_mode = auto support in new async tests |
Alternatives Considered
| Instead of | Could Use | Tradeoff |
|---|---|---|
psycopg[binary] |
asyncpg |
asyncpg is faster in benchmarks but requires a separate sync driver (psycopg2) for Alembic. psycopg v3 works for both sync (Alembic) and async (FastAPI) with the same URL — zero driver switching |
celery[redis] |
pgqueuer / pg_boss |
pgqueuer uses PostgreSQL as the queue (no Redis required). However, the user explicitly selected Celery + Redis. Redis is also needed in Phase 2 for rate limiting, so Redis is justified regardless |
minio Python SDK (sync, wrapped in asyncio.to_thread) |
aiobotocore |
MinIO SDK is the official client with full API coverage including MinIO-specific features. aiobotocore is AWS-oriented and less tested with MinIO-specific APIs. to_thread() wrapping is the correct async pattern for the sync SDK |
Installation (backend/requirements.txt additions):
sqlalchemy[asyncio]>=2.0
psycopg[binary]>=3.3
alembic>=1.13
minio>=7.2
celery[redis]>=5.4
redis>=7.0
httpx>=0.27
pytest-asyncio>=0.23
Note: psycopg[binary] is specified with bracket extras in requirements.txt. The binary extra installs a self-contained wheel — no system libpq-dev package required in the Docker image, simplifying the Dockerfile.
Package Legitimacy Audit
All packages verified on PyPI registry via pip3 index versions and slopcheck install (v0.6.1, run 2026-05-21).
| Package | Registry | Age | Downloads | Source Repo | slopcheck | Disposition |
|---|---|---|---|---|---|---|
sqlalchemy |
PyPI | ~20 yrs | Very high (millions/wk) | github.com/sqlalchemy/sqlalchemy | OK | Approved |
psycopg |
PyPI | ~4 yrs (v3) | High | github.com/psycopg/psycopg | OK | Approved |
alembic |
PyPI | ~12 yrs | Very high | github.com/sqlalchemy/alembic | OK | Approved |
minio |
PyPI | ~8 yrs | High | github.com/minio/minio-py | OK | Approved |
celery |
PyPI | ~15 yrs | Very high (millions/wk) | github.com/celery/celery | OK | Approved |
redis |
PyPI | ~12 yrs | Very high | github.com/redis/redis-py | OK | Approved |
Packages removed due to slopcheck [SLOP] verdict: none Packages flagged as suspicious [SUS]: none
Note: psycopg[binary] is specified with extras syntax in requirements.txt; the installable wheel is psycopg-binary on PyPI, which also passed registry verification (version 3.3.4 confirmed). [VERIFIED: PyPI registry + slopcheck OK]
Architecture Patterns
System Architecture Diagram
Browser (Vue 3 SPA — unchanged in Phase 1)
│ HTTP/JSON + multipart (same API contract)
▼
FastAPI (port 8000) — lifespan creates async engine, disposes on shutdown
│
├── api/documents.py ─── calls ──► services/storage.py (REWRITTEN)
│ │
│ ├─► db/session.py (AsyncSession)
│ │ │
│ │ ▼
│ │ PostgreSQL (port 5432)
│ │ [docuvault_app user, restricted]
│ │
│ └─► storage/minio_backend.py
│ │
│ ▼
│ MinIO (port 9000)
│ [bucket: docuvault]
│ [app-level access key]
│
├── /health ─── probes ──► PostgreSQL + MinIO connectivity
│
└── celery_app.py ─── enqueues tasks ──► Redis (port 6379)
│
Celery Worker (separate container)
├── task: extract_and_classify()
│ ├─► services/extractor.py
│ └─► services/classifier.py
└── consumes from Redis queue
Alembic (run once at deploy time, not part of app startup)
│ uses DATABASE_MIGRATE_URL (docuvault_migrate user, DDL privileges)
└─► PostgreSQL — applies full v1 schema
Recommended Project Structure
backend/
├── main.py # FastAPI app; extend lifespan for engine/dispose
├── config.py # pydantic-settings: extend with new env vars
├── celery_app.py # Celery app instance (broker from REDIS_URL)
├── db/
│ ├── __init__.py
│ ├── session.py # async engine + async_sessionmaker
│ └── models.py # all SQLAlchemy ORM models (full v1 schema)
├── deps/
│ └── db.py # get_db() — yields AsyncSession
├── services/
│ ├── storage.py # REPLACED: async SQLAlchemy + MinIO SDK
│ ├── extractor.py # unchanged
│ └── classifier.py # update to accept session; dispatch via Celery
├── storage/ # NEW: StorageBackend ABC + MinIO implementation
│ ├── __init__.py # get_storage_backend() factory
│ ├── base.py # StorageBackend ABC (mirrors ai/base.py)
│ └── minio_backend.py # MinIO implementation
├── tasks/
│ └── document_tasks.py # Celery task definitions (extract_and_classify)
├── migrations/ # Alembic migration directory
│ ├── env.py # async env.py with two-DSN strategy
│ ├── script.py.mako
│ └── versions/
│ └── 0001_initial_schema.py
├── alembic.ini # sqlalchemy.url = DATABASE_MIGRATE_URL
├── api/
│ ├── documents.py # update to use async storage service
│ ├── topics.py # unchanged (topics still in DB after migration)
│ └── settings.py # unchanged
└── tests/
├── conftest.py # UPDATE: add async engine + session fixtures
├── test_health.py # UPDATE: test PostgreSQL + MinIO health probes
├── test_documents.py # UPDATE: adapt for async storage layer
└── test_storage.py # NEW: unit tests for MinIO object key schema
Pattern 1: SQLAlchemy 2.0 Async Engine + Session Factory (FastAPI Lifespan)
What: Create engine once at startup, share it application-wide via app.state. Session factory (async_sessionmaker) yields per-request sessions via a FastAPI dependency.
When to use: Any database access in FastAPI route handlers or services.
Example:
# db/session.py
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from config import settings
engine = create_async_engine(
settings.database_url, # postgresql+psycopg://docuvault_app:...@postgres/docuvault
pool_pre_ping=True, # detect stale connections before use
echo=False,
)
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False, # prevent lazy-load errors after commit
)
# deps/db.py
from db.session import AsyncSessionLocal
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()
# main.py — lifespan
from contextlib import asynccontextmanager
from db.session import engine
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: engine creates pool on first connection
yield
# Shutdown: close all pooled connections
await engine.dispose()
app = FastAPI(lifespan=lifespan)
Source: [CITED: docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html]
Key detail — URL format for psycopg v3:
postgresql+psycopg://user:password@host:port/dbname
The same postgresql+psycopg:// prefix works for both create_engine() (Alembic) and create_async_engine() (FastAPI). SQLAlchemy selects the sync or async dialect variant automatically. [CITED: docs.sqlalchemy.org/en/20/dialects/postgresql.html]
Key detail — expire_on_commit=False: After session.commit(), SQLAlchemy marks all objects as expired and would trigger another SELECT on next attribute access. In async context, this causes MissingGreenlet errors because there's no active async context at that point. Setting expire_on_commit=False prevents this. [CITED: docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html]
Pattern 2: Alembic Async Configuration with Two DSNs
What: Alembic's async template (alembic init -t async) generates env.py that uses async_engine_from_config and asyncio.run(). The DATABASE_MIGRATE_URL DSN (DDL privileges) is used only by Alembic; the app uses DATABASE_URL (restricted). This separates migration risk from runtime risk.
When to use: Every alembic upgrade head call. Never used by FastAPI directly.
Example:
# migrations/env.py (key section — async online migrations)
import asyncio
from sqlalchemy.ext.asyncio import async_engine_from_config
from sqlalchemy import pool
from alembic import context
from db.models import Base # import all models so metadata is populated
target_metadata = Base.metadata
def do_run_migrations(connection):
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations():
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool, # migrations use per-run connection, not pool
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_online():
asyncio.run(run_async_migrations())
# alembic.ini
[alembic]
script_location = migrations
sqlalchemy.url = %(DATABASE_MIGRATE_URL)s # reads from env via %(VAR)s interpolation
Two-DSN in practice: The alembic.ini sqlalchemy.url references DATABASE_MIGRATE_URL. FastAPI's db/session.py reads DATABASE_URL. Both are set in .env. The Docker Compose backend service has both env vars; the celery-worker service has DATABASE_URL only (workers need no DDL).
Source: [CITED: alembic.sqlalchemy.org/en/latest/cookbook.html#using-asyncio-with-alembic] + [CITED: github.com/sqlalchemy/alembic/blob/main/alembic/templates/async/env.py]
Pattern 3: MinIO SDK Sync-in-Async via asyncio.to_thread()
What: The MinIO Python SDK is synchronous. In an async FastAPI context, blocking I/O blocks the event loop. Wrap MinIO SDK calls in asyncio.to_thread() to offload to a thread pool without blocking.
When to use: All MinIO operations (put_object, get_object, bucket_exists, presigned_get_object) called from async def handlers or services.
Example:
# storage/minio_backend.py
import asyncio
import io
import uuid
from datetime import timedelta
from minio import Minio
from storage.base import StorageBackend
class MinIOBackend(StorageBackend):
def __init__(self, endpoint: str, access_key: str, secret_key: str,
bucket: str, secure: bool = False):
self._client = Minio(
endpoint=endpoint,
access_key=access_key,
secret_key=secret_key,
secure=secure, # False for Docker internal network (HTTP)
)
self._bucket = bucket
async def put_object(
self,
user_id: str,
document_id: str,
file_bytes: bytes,
extension: str,
content_type: str,
) -> str:
object_key = f"{user_id}/{document_id}/{uuid.uuid4()}{extension}"
data = io.BytesIO(file_bytes)
await asyncio.to_thread(
self._client.put_object,
self._bucket,
object_key,
data,
length=len(file_bytes),
content_type=content_type,
)
return object_key
async def presigned_get_url(self, object_key: str, expires_minutes: int = 60) -> str:
return await asyncio.to_thread(
self._client.presigned_get_object,
bucket_name=self._bucket,
object_name=object_key,
expires=timedelta(minutes=expires_minutes),
)
async def health_check(self) -> bool:
try:
return await asyncio.to_thread(
self._client.bucket_exists, self._bucket
)
except Exception:
return False
MinIO put_object signature (confirmed):
client.put_object(
bucket_name: str,
object_name: str, # the object key
data: io.RawIOBase, # io.BytesIO is accepted
length: int, # -1 with part_size for unknown-length streams
content_type: str = "application/octet-stream",
)
Note on length=-1: For unknown-length streams, set length=-1 and part_size=10*1024*1024. For in-memory io.BytesIO, always pass length=len(bytes) — this avoids a multipart upload when not needed.
Source: [CITED: github.com/minio/minio-py/blob/master/docs/API.md]
Pattern 4: MinIO Bucket Initialization at Startup
What: On first docker compose up, MinIO starts with an empty state. The application must create the docuvault bucket if it doesn't exist. This is done in the FastAPI lifespan, not in user request handlers.
Example:
# main.py lifespan extension
@asynccontextmanager
async def lifespan(app: FastAPI):
# PostgreSQL engine + pool
# MinIO bucket initialization
minio_client = Minio(
settings.minio_endpoint,
access_key=settings.minio_access_key,
secret_key=settings.minio_secret_key,
secure=False,
)
exists = await asyncio.to_thread(minio_client.bucket_exists, settings.minio_bucket)
if not exists:
await asyncio.to_thread(minio_client.make_bucket, settings.minio_bucket)
app.state.minio = minio_client
yield
await engine.dispose()
Pattern 5: Celery App + Redis Broker Configuration
What: A single celery_app.py module defines the Celery application. Tasks are defined as decorated functions. FastAPI route handlers call .delay() to enqueue; the celery-worker container processes them.
Redis URL format (with password, Docker internal network):
redis://:${REDIS_PASSWORD}@redis:6379/0
The : before the password with no username is the correct format when Redis is configured with requirepass but no ACL users. [CITED: docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html via WebSearch]
Example:
# celery_app.py
import os
from celery import Celery
celery_app = Celery("docuvault")
celery_app.conf.broker_url = os.environ.get("REDIS_URL", "redis://redis:6379/0")
celery_app.conf.result_backend = os.environ.get("REDIS_URL", "redis://redis:6379/0")
celery_app.conf.task_serializer = "json"
celery_app.conf.result_serializer = "json"
celery_app.conf.accept_content = ["json"]
celery_app.conf.task_routes = {
"tasks.document_tasks.*": {"queue": "documents"},
}
# tasks/document_tasks.py
from celery_app import celery_app
@celery_app.task(name="tasks.document_tasks.extract_and_classify")
def extract_and_classify(document_id: str) -> dict:
# Celery tasks are SYNCHRONOUS functions — do NOT use async def here.
# Use asyncio.run() sparingly or run sync equivalents of extractor/classifier.
from services import extractor, classifier
...
# api/documents.py — calling the task
from tasks.document_tasks import extract_and_classify
@router.post("/upload")
async def upload_document(...):
...
# Replace: background_tasks.add_task(classifier.classify_document, doc_id)
# With:
extract_and_classify.delay(str(saved_doc.id))
return meta
Critical: Celery tasks are synchronous. The Celery worker runs a standard Python event loop (not asyncio). Calling async def functions inside a Celery task requires asyncio.run(), which creates a new event loop per task invocation. This is acceptable for Phase 1 since the existing extractor.py and classifier.py services already have sync and async entry points, but keep tasks pure-sync where possible. [VERIFIED via WebSearch cross-checked with official docs]
Worker startup command:
celery -A celery_app worker --loglevel=info -Q documents
Pattern 6: Docker Compose Health Checks + depends_on
What: Each infrastructure service has a healthcheck definition. The backend service uses depends_on: condition: service_healthy to wait for all three (postgres, minio, redis) before starting.
Example:
services:
postgres:
image: postgres:17-alpine
environment:
POSTGRES_DB: docuvault
POSTGRES_USER: postgres
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
volumes:
- postgres_data:/var/lib/postgresql/data
- ./docker/postgres/initdb.d:/docker-entrypoint-initdb.d:ro
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres -d docuvault"]
interval: 10s
timeout: 5s
retries: 5
start_period: 10s
minio:
image: minio/minio:latest
command: server /data --console-address ":9001"
environment:
MINIO_ROOT_USER: ${MINIO_ROOT_USER}
MINIO_ROOT_PASSWORD: ${MINIO_ROOT_PASSWORD}
ports:
- "9000:9000"
- "9001:9001"
volumes:
- minio_data:/data
healthcheck:
# curl is removed from recent MinIO images; use the /minio/health/live HTTP endpoint
# from the host. Inside the container, mc is available:
test: ["CMD", "mc", "ready", "local"]
interval: 10s
timeout: 5s
retries: 5
start_period: 15s
redis:
image: redis:7-alpine
command: redis-server --requirepass ${REDIS_PASSWORD}
healthcheck:
test: ["CMD", "redis-cli", "-a", "${REDIS_PASSWORD}", "ping"]
interval: 10s
timeout: 3s
retries: 5
backend:
depends_on:
postgres:
condition: service_healthy
minio:
condition: service_healthy
redis:
condition: service_healthy
MinIO healthcheck note: curl was removed from MinIO's Docker image in October 2023. The mc ready local command is the current recommended healthcheck inside the container. The /minio/health/live HTTP endpoint (returns 200 OK) is still valid for external probing but cannot be used inside the container without curl. [CITED: github.com/minio/minio/issues/18389]
Pattern 7: PostgreSQL Two-User Init Script
What: The official PostgreSQL Docker image runs scripts in /docker-entrypoint-initdb.d/ on first start (empty volume). A SQL script provisions two users: docuvault_migrate (DDL) and docuvault_app (runtime, restricted).
When to use: First docker compose up with a fresh volume. Idempotent for re-runs is not required — init scripts only run once.
Example:
-- docker/postgres/initdb.d/01-init-users.sql
-- Runs as the POSTGRES_USER superuser on first container start only.
-- Migration user: DDL privileges (CREATE TABLE, ALTER TABLE, CREATE INDEX)
CREATE USER docuvault_migrate WITH PASSWORD 'PLACEHOLDER_MIGRATE_PASSWORD';
GRANT ALL PRIVILEGES ON DATABASE docuvault TO docuvault_migrate;
-- App user: runtime DML only (SELECT, INSERT, UPDATE, DELETE)
CREATE USER docuvault_app WITH PASSWORD 'PLACEHOLDER_APP_PASSWORD';
GRANT CONNECT ON DATABASE docuvault TO docuvault_app;
-- Grant schema-level privileges AFTER migration user creates the schema
-- This must run after alembic upgrade head, OR grant in a second script.
-- Pattern: grant via a post-migration step or grant within the migration itself:
-- GRANT USAGE ON SCHEMA public TO docuvault_app;
-- GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA public TO docuvault_app;
-- ALTER DEFAULT PRIVILEGES IN SCHEMA public
-- GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO docuvault_app;
Important: The GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES must be run AFTER Alembic has created the tables, because ON ALL TABLES applies only to existing tables. Use ALTER DEFAULT PRIVILEGES so future tables (from future migrations) are also accessible. This can be done at the end of the first Alembic migration file, or in a post-migration Docker entrypoint hook.
Recommended approach for Phase 1: Run the GRANT as the last step of the 0001_initial_schema.py migration using op.execute() as the docuvault_migrate user (which has full privileges). [ASSUMED — no official doc confirming this is the standard Alembic pattern, but it follows from standard PostgreSQL privilege management]
Pattern 8: StorageBackend ABC (Mirrors ai/ Pattern)
What: storage/base.py defines StorageBackend as an abstract base class with the same structure as ai/base.py. storage/__init__.py provides a get_storage_backend() factory. storage/minio_backend.py is the Phase 1 implementation.
Example:
# storage/base.py
from abc import ABC, abstractmethod
class StorageBackend(ABC):
@abstractmethod
async def put_object(
self, user_id: str, document_id: str,
file_bytes: bytes, extension: str, content_type: str,
) -> str:
"""Store object; return the object_key used."""
@abstractmethod
async def get_object(self, object_key: str) -> bytes:
"""Retrieve object bytes by key."""
@abstractmethod
async def delete_object(self, object_key: str) -> None:
"""Delete object by key."""
@abstractmethod
async def presigned_get_url(self, object_key: str, expires_minutes: int = 60) -> str:
"""Return a time-limited download URL."""
@abstractmethod
async def health_check(self) -> bool:
"""Return True if backend is reachable."""
# storage/__init__.py
from config import settings
from storage.minio_backend import MinIOBackend
def get_storage_backend() -> StorageBackend:
return MinIOBackend(
endpoint=settings.minio_endpoint,
access_key=settings.minio_access_key,
secret_key=settings.minio_secret_key,
bucket=settings.minio_bucket,
secure=False,
)
Anti-Patterns to Avoid
- Sync SQLAlchemy in async context: Using
create_engine()instead ofcreate_async_engine()in FastAPI will block the event loop on every database call. Usecreate_async_enginethroughout. - Calling
await session.commit()then accessing lazy-loaded attributes: Always setexpire_on_commit=Falseor explicitly refresh after commit. - Connecting Alembic using
DATABASE_URL(restricted user): The restricteddocuvault_appuser has no DDL privileges. Alembic migrations will fail withpermission deniederrors. Alembic must always useDATABASE_MIGRATE_URL. - Using
async deffor Celery task functions: Celery workers do not run an asyncio event loop. Define tasks asdef, notasync def. Wrap any async calls withasyncio.run()if unavoidable, but prefer sync implementations in tasks. - Storing human-readable filename as MinIO object key: Object keys must be UUID-based (
{user_id}/{document_id}/{uuid4()}{ext}). Filenames are stored ONLY in thedocuments.filenameDB column. Putting human filenames in the key enables path traversal and makes key prediction trivial. - Using
minio_client.bucket_exists()inside async handlers withoutasyncio.to_thread: The MinIO SDK is synchronous; calling it directly fromasync defwill block the event loop. - MinIO
mc ready localhealthcheck with a password-protected Redisredis-cli ping: For Redis withrequirepass, the healthcheck must pass-a $REDIS_PASSWORDtoredis-cli. A bareredis-cli pingwill returnNOAUTHand be treated as unhealthy.
Don't Hand-Roll
| Problem | Don't Build | Use Instead | Why |
|---|---|---|---|
| Async PostgreSQL session management | Custom connection/context manager | SQLAlchemy async_sessionmaker + Depends(get_db) |
Handles connection pooling, transaction boundaries, error cleanup, and the expire_on_commit edge case |
| Database schema migrations | Manual CREATE TABLE scripts in Python |
Alembic | Manages migration history, rollbacks, auto-generation from ORM models, and multi-environment DSN configuration |
| MinIO object lifecycle | Custom S3-like HTTP client | minio Python SDK |
Handles multipart uploads, signature v4, presigned URL expiry, retry logic, and connection pooling |
| Background task distribution | Thread pools or asyncio.create_task() |
Celery + Redis | Cross-instance task distribution, retry on failure, dead letter queues, task result storage |
| Docker service ordering | sleep commands in Compose entrypoints |
healthcheck + depends_on: condition: service_healthy |
Deterministic, declarative; sleep is a race condition |
| PostgreSQL privilege management | Per-table GRANT scripts written by hand | ALTER DEFAULT PRIVILEGES in Alembic migration |
Future migrations automatically inherit privileges; hand-written grants go stale |
Key insight: The existing filelock-based services/storage.py uses at least 6 custom concurrency primitives to solve problems that PostgreSQL's transaction isolation and MinIO's atomic object operations solve at the infrastructure level. The rewrite simplifies the code while gaining correctness guarantees.
Common Pitfalls
Pitfall 1: expire_on_commit=True (the default) Causes MissingGreenlet
What goes wrong: After await session.commit(), accessing any ORM object attribute triggers a new SELECT query. In async context, if there is no active session scope, SQLAlchemy raises sqlalchemy.exc.MissingGreenlet: greenlet_spawn has not been called.
Why it happens: The default Session.expire_on_commit=True marks objects as "expired" post-commit. The next attribute access triggers a lazy load, which needs a sync greenlet context (not available in asyncio).
How to avoid: Always set expire_on_commit=False in async_sessionmaker. [CITED: docs.sqlalchemy.org]
Warning signs: MissingGreenlet in tracebacks after commit; attribute access on model instances outside async with session blocks.
Pitfall 2: Alembic env.py Not Importing All Models
What goes wrong: alembic revision --autogenerate generates an empty migration even though models were defined.
Why it happens: Alembic's target_metadata must be set to Base.metadata, and all model modules must be imported BEFORE target_metadata is accessed in env.py. Python only knows about models that have been imported.
How to avoid: In migrations/env.py, explicitly import all model modules:
from db import models # noqa: F401 — must import to register with Base.metadata
target_metadata = models.Base.metadata
Warning signs: Empty op. blocks in generated migrations; tables not appearing in migration history.
Pitfall 3: MinIO put_object Requires io.BytesIO.seek(0) Before Use
What goes wrong: put_object reads 0 bytes if the io.BytesIO object's file pointer is at the end (e.g., after writing to it).
Why it happens: io.BytesIO.write() advances the pointer to the end of the data. put_object starts reading from the current position.
How to avoid: Always call data.seek(0) before passing a BytesIO to put_object. Or construct the BytesIO from the complete bytes directly: io.BytesIO(file_bytes) starts the pointer at 0.
Warning signs: MinIO reports successful upload but object is 0 bytes; or OSError: stream having not enough data.
Pitfall 4: PostgreSQL Init Script GRANT Timing
What goes wrong: docuvault_app user gets permission denied on tables even after GRANT ... ON ALL TABLES.
Why it happens: GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA public only applies to tables that exist at the time of the GRANT. Tables created by Alembic after the init script runs are not covered.
How to avoid: Run ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO docuvault_app; in the Alembic initial migration (as docuvault_migrate user, which owns the tables). This covers all future tables created by the same migration user.
Warning signs: First docker compose up works; second run after alembic upgrade head fails with 403 DB errors.
Pitfall 5: Redis Healthcheck Without Authentication
What goes wrong: redis-cli ping returns NOAUTH Authentication required when Redis is started with requirepass. Docker Compose treats non-zero exit as unhealthy. Backend never starts.
Why it happens: redis-cli ping without -a doesn't pass the password.
How to avoid: Use redis-cli -a ${REDIS_PASSWORD} ping in the healthcheck test field. Note that this logs a warning about passing password on command line — acceptable for a healthcheck, not for production scripts.
Warning signs: backend service stuck at Waiting for redis to be healthy; redis-cli ping showing NOAUTH in container logs.
Pitfall 6: MinIO mc ready local Healthcheck Not Available Without mc
What goes wrong: mc is present in the official minio/minio Docker image, so mc ready local works as a healthcheck. If using a third-party or stripped MinIO image, mc may be absent.
How to avoid: Stick to the official minio/minio:latest image. If a custom image is needed, use the /minio/health/live HTTP endpoint probed from a sidecar or from the host — not from inside the container without curl.
Pitfall 7: Celery Worker Cannot Import FastAPI App Module
What goes wrong: Celery worker Docker container imports celery_app.py, which transitively imports the FastAPI app or lifespan, which tries to open database connections or access app.state.
Why it happens: Shared imports between the FastAPI app and Celery tasks create circular dependencies at module load time.
How to avoid: Keep celery_app.py minimal (Celery configuration only). Task functions in tasks/ import services directly, not via main.py or any router. The Celery worker starts with celery -A celery_app worker — it never starts FastAPI.
Code Examples
Full v1 SQLAlchemy ORM Schema (Phase 1 Migration Target)
# db/models.py
import uuid
from datetime import datetime, timezone
from sqlalchemy import (
Boolean, BigInteger, ForeignKey, Index, String, Text,
TIMESTAMP, UniqueConstraint, Integer
)
from sqlalchemy.dialects.postgresql import UUID, INET, JSONB
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy.sql import func
def now_utc():
return datetime.now(timezone.utc)
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
handle: Mapped[str] = mapped_column(String, unique=True, nullable=False)
email: Mapped[str] = mapped_column(String, unique=True, nullable=False)
password_hash: Mapped[str] = mapped_column(Text, nullable=False)
totp_secret: Mapped[str | None] = mapped_column(Text, nullable=True)
totp_enabled: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
role: Mapped[str] = mapped_column(String, nullable=False, default="user")
is_active: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
ai_provider: Mapped[str | None] = mapped_column(Text, nullable=True)
ai_model: Mapped[str | None] = mapped_column(Text, nullable=True)
default_storage_backend: Mapped[str] = mapped_column(String, nullable=False, default="minio")
created_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True), nullable=False, server_default=func.now())
class Quota(Base):
__tablename__ = "quotas"
user_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"), primary_key=True)
limit_bytes: Mapped[int] = mapped_column(BigInteger, nullable=False, default=104857600) # 100 MB
used_bytes: Mapped[int] = mapped_column(BigInteger, nullable=False, default=0)
class RefreshToken(Base):
__tablename__ = "refresh_tokens"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
user_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"), nullable=False)
token_hash: Mapped[str] = mapped_column(Text, unique=True, nullable=False)
expires_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True), nullable=False)
revoked: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
created_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True), nullable=False, server_default=func.now())
__table_args__ = (Index("ix_refresh_tokens_user_revoked", "user_id", "revoked"),)
class Folder(Base):
__tablename__ = "folders"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
user_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"), nullable=False)
parent_id: Mapped[uuid.UUID | None] = mapped_column(UUID(as_uuid=True), ForeignKey("folders.id", ondelete="CASCADE"), nullable=True)
name: Mapped[str] = mapped_column(Text, nullable=False)
created_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True), nullable=False, server_default=func.now())
__table_args__ = (UniqueConstraint("user_id", "parent_id", "name"),)
class Document(Base):
__tablename__ = "documents"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
# user_id is NULLABLE in Phase 1 (D-03); Phase 2 migration adds NOT NULL
user_id: Mapped[uuid.UUID | None] = mapped_column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"), nullable=True)
folder_id: Mapped[uuid.UUID | None] = mapped_column(UUID(as_uuid=True), ForeignKey("folders.id", ondelete="SET NULL"), nullable=True)
filename: Mapped[str] = mapped_column(Text, nullable=False) # original human-readable name
object_key: Mapped[str] = mapped_column(Text, nullable=False) # MinIO key: {user_id}/{doc_id}/{uuid4}{ext}
content_type: Mapped[str] = mapped_column(Text, nullable=False)
size_bytes: Mapped[int] = mapped_column(BigInteger, nullable=False, default=0)
storage_backend: Mapped[str] = mapped_column(String, nullable=False, default="minio")
extracted_text: Mapped[str | None] = mapped_column(Text, nullable=True)
status: Mapped[str] = mapped_column(String, nullable=False, default="pending")
created_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True), nullable=False, server_default=func.now())
updated_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True), nullable=False, server_default=func.now())
__table_args__ = (
Index("ix_documents_user_folder", "user_id", "folder_id"),
Index("ix_documents_user_created", "user_id", "created_at"),
)
class Topic(Base):
__tablename__ = "topics"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
user_id: Mapped[uuid.UUID | None] = mapped_column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"), nullable=True)
name: Mapped[str] = mapped_column(Text, nullable=False)
description: Mapped[str] = mapped_column(Text, nullable=False, default="")
color: Mapped[str] = mapped_column(String(7), nullable=False, default="#6366f1")
__table_args__ = (UniqueConstraint("user_id", "name"),)
class DocumentTopic(Base):
__tablename__ = "document_topics"
document_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("documents.id", ondelete="CASCADE"), primary_key=True)
topic_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("topics.id", ondelete="CASCADE"), primary_key=True)
class Share(Base):
__tablename__ = "shares"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
document_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("documents.id", ondelete="CASCADE"), nullable=False)
owner_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"), nullable=False)
recipient_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"), nullable=False)
permission: Mapped[str] = mapped_column(String, nullable=False, default="view")
created_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True), nullable=False, server_default=func.now())
__table_args__ = (
UniqueConstraint("document_id", "recipient_id"),
Index("ix_shares_recipient", "recipient_id"),
)
class AuditLog(Base):
__tablename__ = "audit_log"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
user_id: Mapped[uuid.UUID | None] = mapped_column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True)
actor_id: Mapped[uuid.UUID | None] = mapped_column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True)
event_type: Mapped[str] = mapped_column(Text, nullable=False)
resource_id: Mapped[uuid.UUID | None] = mapped_column(UUID(as_uuid=True), nullable=True)
ip_address: Mapped[str | None] = mapped_column(INET, nullable=True)
metadata: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
created_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True), nullable=False, server_default=func.now())
__table_args__ = (
Index("ix_audit_user_created", "user_id", "created_at"),
Index("ix_audit_event_created", "event_type", "created_at"),
)
class CloudConnection(Base):
__tablename__ = "cloud_connections"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
user_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"), nullable=False)
provider: Mapped[str] = mapped_column(String, nullable=False)
display_name: Mapped[str] = mapped_column(Text, nullable=False)
credentials_enc: Mapped[str] = mapped_column(Text, nullable=False)
status: Mapped[str] = mapped_column(String, nullable=False, default="ACTIVE")
connected_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True), nullable=False, server_default=func.now())
__table_args__ = (Index("ix_cloud_connections_user", "user_id"),)
class Group(Base):
"""v2 stub — empty table, seeded for schema completeness (PROJECT.md)."""
__tablename__ = "groups"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name: Mapped[str] = mapped_column(Text, unique=True, nullable=False)
created_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True), nullable=False, server_default=func.now())
Config Extension for New Env Vars
# config.py (extended)
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
# Existing
data_dir: str = "/app/data"
# Phase 1 additions
database_url: str = "postgresql+psycopg://docuvault_app:changeme@postgres/docuvault"
database_migrate_url: str = "postgresql+psycopg://docuvault_migrate:changeme@postgres/docuvault"
minio_endpoint: str = "minio:9000"
minio_access_key: str = "docuvault_app"
minio_secret_key: str = "changeme"
minio_bucket: str = "docuvault"
redis_url: str = "redis://:changeme@redis:6379/0"
secret_key: str = "CHANGEME" # documented for Phase 2; not read in Phase 1
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
settings = Settings()
State of the Art
| Old Approach | Current Approach | When Changed | Impact |
|---|---|---|---|
asyncpg as the only async PostgreSQL dialect |
psycopg v3 supports both sync + async via one package |
2022 (psycopg v3 release) | Single driver for Alembic + FastAPI; no separate sync/async packages |
alembic init (sync template) |
alembic init -t async for async engine migrations |
Alembic 1.7+ | env.py template pre-configured for asyncio; no manual async wiring |
async_sessionmaker equivalent was sessionmaker with separate import |
async_sessionmaker is a first-class API in SQLAlchemy 2.0 |
SQLAlchemy 2.0 (2023) | Cleaner factory pattern without subclassing |
MinIO Docker image included curl for healthchecks |
curl removed from image; mc ready local is the new healthcheck |
October 2023 | Existing tutorials with curl -f healthcheck will silently fail on current images |
FastAPI BackgroundTasks for async post-request work |
Celery + Redis for distributed, reliable task queues | Ongoing | BackgroundTasks is per-instance and has no retry; Celery is cross-instance |
Deprecated/outdated:
filelockdependency: can be removed frombackend/requirements.txtonceservices/storage.pyis replaced (CONCERNS.md item 14 identifies the unusedshutilimport; same cleanup applies tofilelock).- Per-document
.lockfiles indata/metadata/: deleted withdata/directory contents (D-04). psycopg2(old driver): not installed and not needed;psycopgv3 is the replacement.- Sync file I/O in async handlers (CONCERNS.md item 6): resolved entirely by switching to async SQLAlchemy.
Assumptions Log
| # | Claim | Section | Risk if Wrong |
|---|---|---|---|
| A1 | Running GRANT ... ON ALL TABLES inside the Alembic initial migration as docuvault_migrate is the standard pattern for privilege handoff to docuvault_app |
Pattern 7 (PostgreSQL init script) | If the migration user lacks permission to GRANT to another user, privileges must be set manually or via a separate script — delays testing |
| A2 | The Celery worker container can import db/models.py and services/ directly without starting FastAPI (no circular import) |
Pattern 5 (Celery) | If service modules import FastAPI components at module level, a refactor is needed before worker tasks can import services |
| A3 | minio/minio:latest Docker image includes mc for the mc ready local healthcheck |
Pattern 6 (Docker Compose) | If mc is not in the image, healthcheck must use a shell-based TCP probe or alternative; confirmed via GitHub issue discussion [CITED: github.com/minio/minio/issues/18389] but version-specific |
Open Questions
-
PostgreSQL version to pin in Docker Compose
- What we know: Any PostgreSQL 14+ supports
gen_random_uuid(),JSONB,INET, andTIMESTAMPTZused in the schema. - What's unclear: Whether to use
postgres:16,postgres:17, orpostgres:17-alpine. - Recommendation: Use
postgres:17-alpine(smallest image, current stable, alpine is well-suited for Docker Compose dev setups).
- What we know: Any PostgreSQL 14+ supports
-
MinIO version pinning
- What we know:
minio/minio:latesthasmcavailable for healthchecks;curlwas removed in late 2023. - What's unclear: Whether to pin to a specific release tag (e.g.,
RELEASE.2025-09-07T16-13-09Z) or use:latest. - Recommendation: Pin to a specific RELEASE tag for reproducibility; update as part of a maintenance task. [ASSUMED — no strong official guidance on whether
:latestis appropriate for production-adjacent Docker Compose]
- What we know:
-
Topics table migration: existing topic names from
data/topics.json- What we know: D-04 deletes
data/contents. Topics stored intopics.jsonare test data and are deleted. - What's unclear: The existing
api/topics.pyandfrontend/src/stores/topics.jsneed updating to read from PostgreSQL instead of the flat file. The API shape should remain the same (list of objects withid,name,description,color). - Recommendation: The planner must include a task for updating
api/topics.pyto use async SQLAlchemy ORM queries against thetopicstable.
- What we know: D-04 deletes
-
Celery task vs direct service call for text extraction + classification
- What we know: The current
api/documents.pycallsawait classifier.classify_document()inside the route handler. This needs to move to a Celery task. - What's unclear: Whether Phase 1 should move ALL of extraction + classification into a Celery task (full async flow) or just wire up the infrastructure with a placeholder task and migrate the logic in Phase 3.
- Recommendation: Phase 1 should wire the full task (extract + classify) in Celery — the walking skeleton requirement says "AI classification workflow completes successfully." A placeholder task that doesn't classify would fail the success criteria.
- What we know: The current
Environment Availability
| Dependency | Required By | Available | Version | Fallback |
|---|---|---|---|---|
| Docker | Docker Compose services | ✓ | 29.5.0 | — |
| Python 3.12 | Backend (in Docker image) | ✓ (host: 3.14.5; Docker: 3.12 pinned) | 3.12 in image | — |
| PostgreSQL (via Docker) | Database tier | ✓ (via Docker) | 17 (image) | — |
| MinIO (via Docker) | Object storage | ✓ (via Docker) | latest | — |
| Redis (via Docker) | Celery broker, Phase 2 rate limiting | ✓ (via Docker) | 7-alpine | — |
| pytest | Backend test runner | ✓ (host pip3) | existing | — |
Missing dependencies with no fallback: None. Missing dependencies with fallback: None.
Validation Architecture
Test Framework
| Property | Value |
|---|---|
| Framework | pytest with pytest-asyncio (existing) |
| Config file | backend/pytest.ini (existing; asyncio_mode = auto) |
| Quick run command | cd backend && pytest tests/test_health.py tests/test_documents.py tests/test_storage.py -x |
| Full suite command | cd backend && pytest -v |
Phase Requirements → Test Map
| Req ID | Behavior | Test Type | Automated Command | File Exists? |
|---|---|---|---|---|
| STORE-01 | Upload stores metadata in PostgreSQL and bytes in MinIO | integration | pytest tests/test_documents.py::test_upload_stores_to_postgres_and_minio -x |
❌ Wave 0 |
| STORE-01 | List documents reads from PostgreSQL (not filesystem) | integration | pytest tests/test_documents.py::test_list_reads_from_db -x |
❌ Wave 0 |
| STORE-02 | MinIO object key matches {user_id}/{document_id}/{uuid4}{ext} pattern |
unit | pytest tests/test_storage.py::test_object_key_schema -x |
❌ Wave 0 |
| STORE-02 | Human-readable filename is NOT in the object key | unit | pytest tests/test_storage.py::test_filename_not_in_object_key -x |
❌ Wave 0 |
| STORE-07 | /health returns PostgreSQL + MinIO connectivity (not just {"status": "ok"}) |
smoke | pytest tests/test_health.py::test_health_checks_postgres_and_minio -x |
❌ Wave 0 |
| STORE-07 (implicit) | Storage service has no file locks; concurrent uploads do not corrupt state | integration | pytest tests/test_documents.py::test_concurrent_uploads -x |
❌ Wave 0 |
Sampling Rate
- Per task commit:
cd backend && pytest tests/test_health.py tests/test_storage.py -x - Per wave merge:
cd backend && pytest -v - Phase gate: Full suite green before
/gsd:verify-work
Wave 0 Gaps
tests/test_storage.py— covers STORE-02 (object key schema, filename isolation)tests/test_documents.py— extend for PostgreSQL/MinIO-backed upload/list (STORE-01)tests/test_health.py— extend for PostgreSQL + MinIO connectivity probes (STORE-07)tests/conftest.py— add async engine + session fixtures; add MinIO mock or test bucket fixture- Update
tests/conftest.pyto monkeypatchdb/session.pypaths (not justconfig.pypaths)
Existing tests: test_documents.py, test_topics.py, test_settings.py test the OLD flat-file storage layer. They will break after services/storage.py is replaced. These must be ported (not deleted) as part of Phase 1.
Security Domain
Applicable ASVS Categories
| ASVS Category | Applies | Standard Control |
|---|---|---|
| V2 Authentication | No — Phase 1 has no auth | Phase 2 |
| V3 Session Management | No — Phase 1 has no sessions | Phase 2 |
| V4 Access Control | Partial — object key isolation in MinIO backend | user_id prefix enforced in MinIOBackend.put_object() |
| V5 Input Validation | Yes — file upload content type + size | Existing ALLOWED_MIME_TYPES enforcement (currently unenforced per CONCERNS.md item 1) |
| V6 Cryptography | No — Phase 1 has no credential encryption | Phase 5 |
Known Threat Patterns for This Phase
| Pattern | STRIDE | Standard Mitigation |
|---|---|---|
| Object key prediction / path traversal | Tampering | UUID-based object keys ({user_id}/{document_id}/{uuid4}{ext}); never accept object keys from request parameters |
| Database superuser credentials in app DSN | Elevation of Privilege | Two-DSN pattern: docuvault_app (restricted) for runtime, docuvault_migrate (DDL) for Alembic only |
| MinIO credentials with bucket admin rights | Elevation of Privilege | App-level access key pair (MINIO_ACCESS_KEY / MINIO_SECRET_KEY) with read/write on docuvault bucket only; root credentials not used by app |
| Redis unauthenticated in Docker network | Information Disclosure | requirepass set on Redis; REDIS_URL includes password; Celery broker and app use authenticated URL |
| SQL injection via ORM | Tampering | SQLAlchemy ORM / parameterized queries throughout; zero raw string interpolation (matches CLAUDE.md SEC-03) |
| Sensitive data in MinIO object key | Information Disclosure | Human-readable filenames stored in DB only; object key is UUID-based and non-predictable |
Sources
Primary (HIGH confidence)
- docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html — async engine setup,
async_sessionmaker,expire_on_commit=False, FastAPI lifespan integration - alembic.sqlalchemy.org/en/latest/cookbook.html#using-asyncio-with-alembic — async
env.pypattern - github.com/sqlalchemy/alembic/blob/main/alembic/templates/async/env.py — official async env.py template code
- github.com/minio/minio-py/blob/master/docs/API.md —
put_object,presigned_get_object, constructor signatures - github.com/minio/minio/issues/18389 —
curlremoval from MinIO image;mc ready localas replacement - docs.min.io/enterprise/aistor-object-store/operations/monitoring/healthcheck-probe/ —
/minio/health/liveendpoint documented - docs.docker.com/reference/compose-file/services/#healthcheck —
healthcheck+depends_on: condition: service_healthysyntax
Secondary (MEDIUM confidence)
- docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html — Redis URL format verified via WebSearch; Celery docs site was unreachable during research session
- testdriven.io/blog/fastapi-and-celery/ — Celery + FastAPI project structure and
.delay()pattern - WebSearch results cross-referenced with official docs for psycopg install extras, Redis broker URL format, PostgreSQL init script pattern
Tertiary (LOW confidence)
- None — all key claims cross-verified with at least one authoritative source
Metadata
Confidence breakdown:
- Standard stack: HIGH — all packages verified on PyPI via
pip3 index versions, slopcheck [OK] for all 6 core packages - Architecture: HIGH — patterns drawn from SQLAlchemy official docs, Alembic official template, and MinIO official GitHub
- Pitfalls: HIGH — each pitfall sourced from official documentation or confirmed GitHub issues (not community blog posts only)
- Celery configuration: MEDIUM — Celery docs site was unreachable; URL format cross-verified via WebSearch + community sources
Research date: 2026-05-21 Valid until: 2026-06-21 for stable stack; MinIO healthcheck pattern should be re-verified if the Docker image version changes significantly