--- phase: 01-infrastructure-foundation plan: 05 type: execute wave: 4 depends_on: - 01-04 files_modified: - backend/main.py - backend/api/documents.py - backend/api/topics.py - backend/celery_app.py - backend/tasks/__init__.py - backend/tasks/document_tasks.py - backend/services/classifier.py - backend/config.py - backend/tests/conftest.py - backend/tests/test_documents.py - backend/data autonomous: false requirements: - STORE-01 - STORE-07 user_setup: [] tags: - api-wiring - lifespan - celery - cutover - walking-skeleton must_haves: truths: - "`backend/main.py` lifespan opens the MinIO client, auto-creates the `docuvault` bucket if missing, attaches it to `app.state.minio`, and disposes the SQLAlchemy engine on shutdown" - "`backend/main.py` `/health` endpoint returns `{\"status\": \"ok\"|\"degraded\", \"checks\": {\"postgres\": \"ok\"|\"error: ...\", \"minio\": \"ok\"|\"error: ...\"}}` (D-07)" - "`backend/api/documents.py` upload, list, get, delete, classify endpoints all inject `session: AsyncSession = Depends(get_db)` and call the async `services.storage.*` functions" - "`backend/api/topics.py` list/create/update/delete/suggest endpoints all inject the session dependency" - "`backend/celery_app.py` instantiates a Celery app with broker + result_backend from `REDIS_URL`, JSON serialization, and a `documents` queue route" - "`backend/tasks/document_tasks.py` declares a sync `def extract_and_classify(document_id: str) -> dict` Celery task that the upload handler calls via `.delay(...)`" - "FastAPI `BackgroundTasks` usage is removed (STORE-08 satisfied for Phase 1; was never directly used in current codebase but the `await classifier.classify_document` inline call in the upload handler is replaced with `.delay()`)" - "All legacy flat-file constants and helpers (`UPLOADS_DIR`, `METADATA_DIR`, `TOPICS_FILE`, `ensure_data_dirs`) are removed from `backend/config.py`; the `data/` directory contents are deleted (D-04)" - "Existing `tests/test_documents.py` sync tests are DELETED (cutover) and the `_async` variants from Plan 02 have their `@pytest.mark.xfail` markers removed; the legacy `client` (sync TestClient) fixture is removed from conftest.py" - "Walking-skeleton end-to-end verification passes: `docker compose up` boots all services healthy; a real PDF upload through `POST /api/documents/upload` persists to PostgreSQL + MinIO; Celery worker logs show `extract_and_classify` ran; the document appears in `GET /api/documents`" artifacts: - path: "backend/main.py" provides: "Lifespan with engine + MinIO bucket init; extended /health" contains: "app.state.minio" - path: "backend/celery_app.py" provides: "Celery app with Redis broker + JSON serialization + documents queue" contains: "Celery(\"docuvault\")" - path: "backend/tasks/document_tasks.py" provides: "extract_and_classify Celery task" contains: "@celery_app.task" - path: "backend/api/documents.py" provides: "Async route handlers using AsyncSession + Celery .delay()" contains: "session: AsyncSession = Depends(get_db)" - path: "backend/api/topics.py" provides: "Async route handlers using AsyncSession" contains: "session: AsyncSession = Depends(get_db)" - path: "backend/services/classifier.py" provides: "Updated to accept session and called from a sync wrapper inside Celery tasks" contains: "async def classify_document" - path: "backend/config.py" provides: "Cleaned up Pydantic Settings — legacy data-dir constants removed" contains: "class Settings(BaseSettings)" key_links: - from: "backend/api/documents.py upload handler" to: "backend/tasks/document_tasks.extract_and_classify" via: "extract_and_classify.delay(str(saved['id']))" pattern: "extract_and_classify\\.delay" - from: "backend/main.py lifespan" to: "MinIO bucket auto-create" via: "make_bucket if not bucket_exists" pattern: "make_bucket|bucket_exists" - from: "backend/main.py /health" to: "AsyncSessionLocal + minio_client.bucket_exists" via: "probe queries" pattern: "AsyncSessionLocal|bucket_exists" - from: "backend/celery_app.py" to: "REDIS_URL env var" via: "os.environ.get('REDIS_URL', ...)" pattern: "REDIS_URL" --- Complete the Phase 1 cutover: wire every API route to the async storage layer, replace inline classification with a Celery `.delay()` call, extend the FastAPI lifespan with MinIO bucket creation + engine disposal, rewrite `/health` to probe PostgreSQL + MinIO (D-07), introduce `celery_app.py` + `tasks/document_tasks.py`, remove every legacy flat-file artifact (D-04), delete the legacy sync tests + sync TestClient fixture, and verify the walking skeleton end-to-end against a live Docker stack. Purpose: This plan closes the loop. After it ships, ROADMAP.md Phase 1 success criteria #1, #3, and #4 are all satisfied (criterion #2 was satisfied in Plan 03). Phase 1 ends with a usable single-user app whose entire internal architecture is the multi-user-ready PostgreSQL + MinIO + Celery stack. Output: Updated `backend/main.py`, `backend/api/documents.py`, `backend/api/topics.py`, `backend/services/classifier.py`; new `backend/celery_app.py` + `backend/tasks/document_tasks.py`; cleaned `backend/config.py`; final test-suite cutover; deletion of `data/` directory; and a passing end-to-end walking-skeleton verification checkpoint. @$HOME/.claude/get-shit-done/workflows/execute-plan.md @$HOME/.claude/get-shit-done/templates/summary.md @CLAUDE.md @.planning/PROJECT.md @.planning/ROADMAP.md @.planning/STATE.md @.planning/phases/01-infrastructure-foundation/01-CONTEXT.md @.planning/phases/01-infrastructure-foundation/01-RESEARCH.md @.planning/phases/01-infrastructure-foundation/01-PATTERNS.md @.planning/phases/01-infrastructure-foundation/SKELETON.md @.planning/phases/01-infrastructure-foundation/01-04-SUMMARY.md @backend/services/storage.py @backend/db/models.py @backend/db/session.py @backend/deps/db.py @backend/storage/__init__.py After Plan 04, the async storage layer is in place. This plan wires consumers. Existing `api/documents.py` consumer points (must be ported to async + session injection): - `storage.save_upload(content, file.filename, mime)` → `await storage.save_upload(session, content, file.filename, mime)` - `storage.save_metadata(meta)` → `await storage.save_metadata(session, meta)` - `storage.list_metadata(topic=topic)` → `await storage.list_metadata(session, topic=topic)` - `storage.get_metadata(doc_id)` → `await storage.get_metadata(session, doc_id)` - `storage.delete_document(doc_id)` → `await storage.delete_document(session, doc_id)` - `await classifier.classify_document(saved["id"])` → `extract_and_classify.delay(saved["id"])` (Celery task — STORE-08) Existing `api/topics.py` consumer points: - `storage.load_topics()` → `await storage.load_topics(session)` - `storage.topic_doc_counts()` → `await storage.topic_doc_counts(session)` - `storage.create_topic(...)` → `await storage.create_topic(session, ...)` - `storage.update_topic(...)` → `await storage.update_topic(session, ...)` - `storage.delete_topic(...)` → `await storage.delete_topic(session, ...)` - `storage.get_metadata(...)` → `await storage.get_metadata(session, ...)` (used by `/suggest`) Existing `services/classifier.py` consumer points (called by both the soon-removed inline upload path and the new Celery task; module signature changes from `async def classify_document(doc_id)` accepting no session to `async def classify_document(session, doc_id)`): - Used inside the Celery task wrapper via `asyncio.run(classify_document(session, doc_id))` after manually opening a session `api/settings.py` — KEEP AS-IS. The `settings.json` flat file lives until Phase 2 (D-03 settings deferred); the `services/storage.load_settings()` / `save_settings()` functions remain sync per Plan 04. `main.py` lifespan contract (current → new): ```python # current async def lifespan(app): ensure_data_dirs() yield # new async def lifespan(app): # MinIO bucket auto-create minio_client = Minio(settings.minio_endpoint, access_key=..., secret_key=..., secure=False) if not await asyncio.to_thread(minio_client.bucket_exists, settings.minio_bucket): await asyncio.to_thread(minio_client.make_bucket, settings.minio_bucket) app.state.minio = minio_client yield await engine.dispose() ``` `/health` response contract (D-07): ```json { "status": "ok", "checks": {"postgres": "ok", "minio": "ok"} } ``` Or `"status": "degraded"` if any check is not `"ok"`. Task 1: Introduce backend/celery_app.py + backend/tasks/document_tasks.py and update services/classifier.py backend/celery_app.py, backend/tasks/__init__.py, backend/tasks/document_tasks.py, backend/services/classifier.py - `from celery_app import celery_app` imports the configured Celery instance - `celery_app.conf.broker_url` and `celery_app.conf.result_backend` both read from `REDIS_URL` env var (falling back to `redis://redis:6379/0` if unset) - `celery_app.conf.task_serializer == "json"` and `celery_app.conf.accept_content == ["json"]` - `celery_app.conf.task_routes` routes `tasks.document_tasks.*` to the `documents` queue - `tasks.document_tasks.extract_and_classify(document_id: str)` is a plain `def` (NOT `async def`) decorated with `@celery_app.task(name="tasks.document_tasks.extract_and_classify")` - The task opens a fresh `AsyncSession` via `asyncio.run(...)` around the async body, calls `services.extractor.extract_text(...)` on the bytes pulled from MinIO via `MinIOBackend.get_object`, persists the extracted text via `services.storage.save_metadata`, then calls `services.classifier.classify_document(session, doc_id)` and persists the result - Failures in classification do not raise — they update the document's `status` to `"classification_failed"` and store the error string in a `classification_error` field on the returned dict (parity with the existing non-fatal-classification pattern in `api/documents.py`) - `services/classifier.py` is updated to accept a `session: AsyncSession` as its first arg; the previous `storage.get_metadata(doc_id)` becomes `await storage.get_metadata(session, doc_id)`; same pattern for `storage.load_settings()` (still sync, no change), `storage.load_topics(session)`, `storage.create_topic(session, name)`, `storage.update_document_topics(session, doc_id, topics)` - `tasks/__init__.py` exists (empty file is acceptable) so `tasks/` is recognized as a package - .planning/phases/01-infrastructure-foundation/01-RESEARCH.md (Pattern 5 — Celery + Redis configuration; Pitfall 7 — keep celery_app.py minimal to avoid circular imports; Anti-Pattern: do not use async def for Celery task functions) - .planning/phases/01-infrastructure-foundation/01-PATTERNS.md (backend/celery_app.py + backend/tasks/document_tasks.py sections) - .planning/phases/01-infrastructure-foundation/01-CONTEXT.md (D-08 Celery+Redis; D-10 celery-worker service exists per Plan 01) - backend/services/classifier.py (read in full — every `storage.*` call site needs an `await ... (session, ...)` rewrite) - backend/services/extractor.py (read once — verify which function name is used, then call it from the Celery task; do NOT modify this file) - backend/db/session.py (Plan 03 output — confirm `AsyncSessionLocal` is the exported symbol) - backend/services/storage.py (Plan 04 output — confirm async function signatures the task will call) Create `backend/celery_app.py` with minimal imports per Pitfall 7: `import os`, `from celery import Celery`. Instantiate `celery_app = Celery("docuvault")`. Configure: `celery_app.conf.broker_url = os.environ.get("REDIS_URL", "redis://redis:6379/0")`, `celery_app.conf.result_backend = os.environ.get("REDIS_URL", "redis://redis:6379/0")`, `celery_app.conf.task_serializer = "json"`, `celery_app.conf.result_serializer = "json"`, `celery_app.conf.accept_content = ["json"]`, `celery_app.conf.task_routes = {"tasks.document_tasks.*": {"queue": "documents"}}`. Then `celery_app.autodiscover_tasks(["tasks"], force=True)` so registering tasks under `tasks/` works without an explicit import. Critically, DO NOT import `from config import settings` here — `config.py` triggers Pydantic Settings env-loading that may pull in FastAPI-related side effects in some setups. Read REDIS_URL directly from `os.environ`. Create `backend/tasks/__init__.py` as an empty file. Create `backend/tasks/document_tasks.py`. Imports: `import asyncio`, `from celery_app import celery_app`, `from db.session import AsyncSessionLocal`, `from services import storage, extractor, classifier`, `from storage import get_storage_backend`. Define the task: ```python @celery_app.task(name="tasks.document_tasks.extract_and_classify") def extract_and_classify(document_id: str) -> dict: return asyncio.run(_run(document_id)) async def _run(document_id: str) -> dict: async with AsyncSessionLocal() as session: meta = await storage.get_metadata(session, document_id) if meta is None: return {"document_id": document_id, "status": "not_found"} # Fetch the bytes from MinIO so the extractor can read them backend = get_storage_backend() try: obj_key = meta.get("object_key") or meta.get("path") # The object_key shape is {user_id}/{doc_id}/{uuid4}{ext} — retrieve via storage_backend # We don't have object_key on the metadata dict in v1 — read from DB directly: from db.models import Document import uuid as _uuid doc = await session.get(Document, _uuid.UUID(document_id)) if doc is None or not doc.object_key: return {"document_id": document_id, "status": "missing_object"} file_bytes = await backend.get_object(doc.object_key) text = extractor.extract_text_from_bytes(file_bytes, doc.content_type) if hasattr(extractor, "extract_text_from_bytes") else extractor.extract_text_bytes(file_bytes, doc.content_type) meta["extracted_text"] = text await storage.save_metadata(session, meta) except Exception as e: return {"document_id": document_id, "status": "extract_failed", "error": str(e)} try: topics = await classifier.classify_document(session, document_id) return {"document_id": document_id, "status": "classified", "topics": topics} except Exception as e: # Non-fatal — preserve the existing convention from api/documents.py line 54-56 doc.status = "classification_failed" await session.commit() return {"document_id": document_id, "status": "classification_failed", "error": str(e)} ``` Note: If `services/extractor.py` only exposes `extract_text(path, mime)` (file-path-based), add a new helper `extract_text_from_bytes(file_bytes: bytes, mime: str)` to `services/extractor.py` that writes `file_bytes` to a `tempfile.NamedTemporaryFile(suffix=...)`, calls the existing `extract_text(tmp.name, mime)`, and unlinks the temp file. Do not modify any other behavior in `services/extractor.py`. Update `backend/services/classifier.py`: change `async def classify_document(doc_id: str, topic_names: list[str] | None = None)` to `async def classify_document(session: AsyncSession, doc_id: str, topic_names: list[str] | None = None)`. Add `from sqlalchemy.ext.asyncio import AsyncSession` at the top. Replace `storage.get_metadata(doc_id)` → `await storage.get_metadata(session, doc_id)`. Replace `storage.load_settings()` → `storage.load_settings()` (unchanged — Phase 1 keeps the flat file; this is sync). Replace `storage.load_topics()` → `await storage.load_topics(session)` (note signature change — adapter call). Replace `storage.create_topic(name.strip())` → `await storage.create_topic(session, name.strip())`. Replace `storage.update_document_topics(doc_id, final_topics)` → `await storage.update_document_topics(session, doc_id, final_topics)`. Apply the same session-injection treatment to `suggest_topics_for_document(session, doc_id)`. Preserve `MAX_AI_CHARS = 8_000` and every other line verbatim. cd /Users/nik/Documents/Progamming/document_scanner/backend && python3 -c " import os os.environ.setdefault('REDIS_URL', 'redis://localhost:6379/0') from celery_app import celery_app assert celery_app.conf.task_serializer == 'json' assert celery_app.conf.accept_content == ['json'] assert 'tasks.document_tasks.*' in celery_app.conf.task_routes assert celery_app.conf.task_routes['tasks.document_tasks.*'] == {'queue': 'documents'} from tasks.document_tasks import extract_and_classify import inspect assert not inspect.iscoroutinefunction(extract_and_classify), 'Celery task must be sync def (not async def)' # Verify the task is registered with Celery registered = celery_app.tasks assert 'tasks.document_tasks.extract_and_classify' in registered, f'task not registered; have: {list(registered.keys())[-5:]}' import services.classifier as cl sig = inspect.signature(cl.classify_document) assert list(sig.parameters.keys())[0] == 'session', f'classify_document first param should be session, got: {list(sig.parameters.keys())}' print('celery-task-ok') " - `backend/celery_app.py` exists and contains `celery_app = Celery("docuvault")` - `backend/celery_app.py` does NOT import from `config` (Pitfall 7 — verifiable: `grep -c "from config\|import config" backend/celery_app.py | grep -q "^0$"`) - `backend/celery_app.py` contains `task_routes = {"tasks.document_tasks.*": {"queue": "documents"}}` - `backend/tasks/__init__.py` exists - `backend/tasks/document_tasks.py` contains `@celery_app.task(name="tasks.document_tasks.extract_and_classify")` - `backend/tasks/document_tasks.py` defines `def extract_and_classify` as a sync `def` (NOT `async def`) — verifiable via the inline `inspect.iscoroutinefunction` assertion - `backend/tasks/document_tasks.py` uses `asyncio.run` to invoke the async body (verifiable: `grep -c "asyncio.run" backend/tasks/document_tasks.py >= 1`) - `backend/services/classifier.py` first parameter of `classify_document` is `session` (verified by the inline signature inspection) - `backend/services/classifier.py` calls `await storage.get_metadata(session, doc_id)` and `await storage.update_document_topics(session, doc_id, ...)` - `services/extractor.py` either already exposes a bytes-based extraction function OR a new `extract_text_from_bytes` helper is added; in either case the Celery task can import and call it without raising on import — verifiable via `python3 -c "from services import extractor; assert hasattr(extractor, 'extract_text_from_bytes') or hasattr(extractor, 'extract_text_bytes') or hasattr(extractor, 'extract_text')"` exits 0 - The Verify command prints `celery-task-ok` Celery is wired with a Redis-backed broker; the `extract_and_classify` task is registered and discoverable; `services/classifier.py` is session-aware; the Phase 1 background worker contract is in place. Task 2: Wire backend/main.py lifespan + /health, rewrite backend/api/documents.py and backend/api/topics.py to async session injection backend/main.py, backend/api/documents.py, backend/api/topics.py - `GET /health` returns HTTP 200 with body `{"status": "ok", "checks": {"postgres": "ok", "minio": "ok"}}` when both services are healthy - `GET /health` returns HTTP 200 with body `{"status": "degraded", "checks": {"postgres": "error: ...", "minio": "ok"}}` (or analogous shape) when one service is unreachable — `/health` never returns 5xx - `POST /api/documents/upload` calls `await storage.save_upload(session, ...)` then `extract_and_classify.delay(str(saved["id"]))` if `auto_classify` is true; the response shape preserves `{"id", "original_name", "filename", "mime_type", "size_bytes", "extracted_text", "topics", "created_at", "classified_at"}` so the frontend continues to work - `GET /api/documents` calls `await storage.list_metadata(session, topic=topic)` and paginates the result - `GET /api/documents/{doc_id}` and `DELETE /api/documents/{doc_id}` use the session dependency - `POST /api/documents/{doc_id}/classify` injects the session and either calls the Celery task with `.delay(...)` or `await classifier.classify_document(session, doc_id, topic_names)` synchronously and returns the result; choose the synchronous in-route call for this endpoint because it has historically returned the topic list (preserve behavior — Phase 4 may change this) - `GET /api/topics`, `POST /api/topics`, `PATCH /api/topics/{topic_id}`, `DELETE /api/topics/{topic_id}`, `POST /api/topics/suggest` all inject the session dependency - `backend/main.py` lifespan creates the MinIO client, auto-creates the `docuvault` bucket if absent, stores it on `app.state.minio`, and disposes `engine` on shutdown - `backend/main.py` no longer calls `ensure_data_dirs()` (legacy) - backend/main.py (current 34-line file — preserve `app = FastAPI(...)`, `app.add_middleware(CORSMiddleware, ...)`, `app.include_router(...)` calls; replace only the lifespan body and the `/health` handler) - backend/api/documents.py (current 102 lines — read every route handler; preserve every `HTTPException` message and the `ALLOWED_MIME_TYPES` set verbatim) - backend/api/topics.py (current 73 lines — read every route handler; preserve Pydantic models `TopicCreate`, `TopicUpdate`, `SuggestRequest` verbatim) - backend/services/storage.py (Plan 04 output — async function signatures) - backend/db/session.py (Plan 03 output — `AsyncSessionLocal`, `engine`) - backend/deps/db.py (Plan 03 output — `get_db`) - backend/tasks/document_tasks.py (Task 1 output — `extract_and_classify`) - .planning/phases/01-infrastructure-foundation/01-PATTERNS.md (backend/main.py + backend/api/documents.py + backend/api/topics.py sections) - .planning/phases/01-infrastructure-foundation/01-RESEARCH.md (Pattern 4 — MinIO bucket initialization at startup) - .planning/phases/01-infrastructure-foundation/01-CONTEXT.md (D-07 /health extended) Rewrite `backend/main.py`. Imports: keep `from contextlib import asynccontextmanager`, `from fastapi import FastAPI, Request`, `from fastapi.middleware.cors import CORSMiddleware`, `from api.documents import router as documents_router`, `from api.topics import router as topics_router`, `from api.settings import router as settings_router`. Add `import asyncio`, `from minio import Minio`, `from sqlalchemy import text`, `from db.session import engine, AsyncSessionLocal`, `from config import settings`. DO NOT import `ensure_data_dirs`. Rewrite the lifespan function: ```python @asynccontextmanager async def lifespan(app: FastAPI): # MinIO bucket initialization (RESEARCH.md Pattern 4) minio_client = Minio( settings.minio_endpoint, access_key=settings.minio_access_key, secret_key=settings.minio_secret_key, secure=False, ) exists = await asyncio.to_thread(minio_client.bucket_exists, settings.minio_bucket) if not exists: await asyncio.to_thread(minio_client.make_bucket, settings.minio_bucket) app.state.minio = minio_client yield await engine.dispose() ``` Rewrite the `/health` handler (preserving `@app.get("/health")` and `async def`): ```python @app.get("/health") async def health(request: Request): checks = {} try: async with AsyncSessionLocal() as session: await session.execute(text("SELECT 1")) checks["postgres"] = "ok" except Exception as e: checks["postgres"] = f"error: {type(e).__name__}: {e}" try: ok = await asyncio.to_thread(request.app.state.minio.bucket_exists, settings.minio_bucket) checks["minio"] = "ok" if ok else "error: bucket missing" except Exception as e: checks["minio"] = f"error: {type(e).__name__}: {e}" status = "ok" if all(v == "ok" for v in checks.values()) else "degraded" return {"status": status, "checks": checks} ``` Keep the existing CORS middleware (`allow_origins=["*"]` for Phase 1 — Phase 2 locks down). Rewrite `backend/api/documents.py`. Imports: replace `from services import storage, extractor, classifier` with `from sqlalchemy.ext.asyncio import AsyncSession`, `from deps.db import get_db`, `from services import storage, extractor`, `from tasks.document_tasks import extract_and_classify`, `from services import classifier` (only used by the `/classify` endpoint), and keep `from fastapi import APIRouter, UploadFile, File, Form, HTTPException, Query, Depends` (add `Depends`). Preserve the router definition and `ALLOWED_MIME_TYPES` set. For every route, append `session: AsyncSession = Depends(get_db)` to the signature. For each storage call, prepend `await` and pass `session` as the first arg: - `upload_document`: read content, validate empty, generate filename + mime, call `await storage.save_upload(session, content, file.filename or "upload", mime)`, then `text = extractor.extract_text(saved["path"], mime)` — wait: `saved["path"]` is now the object_key, not a filesystem path. CHANGE the extraction step: pull bytes already in memory (`content`) and call the new `extractor.extract_text_from_bytes(content, mime)` helper introduced in Task 1. Build the `meta` dict exactly as before (preserve the keys), call `await storage.save_metadata(session, meta)`. If `auto_classify` is True, call `extract_and_classify.delay(saved["id"])` (NOTE: this re-fetches from MinIO inside the worker — acceptable for Phase 1; Phase 3 will pass the bytes through Redis directly if perf demands). Return `meta`. CHANGE: since classification is now async via Celery, the response no longer includes `topics` populated by the inline classifier call — set `meta["topics"] = []` and `meta["classified_at"] = None` and rely on the worker to update the DB row. Document this in a comment as the Phase 1 cutover behavior. - `list_documents`: `docs = await storage.list_metadata(session, topic=topic)` then preserve the existing pagination math. - `get_document`: `meta = await storage.get_metadata(session, doc_id)`; raise 404 if `None`. - `delete_document`: `ok = await storage.delete_document(session, doc_id)`; raise 404 if `False`. - `classify_document` (the `/classify` route): `meta = await storage.get_metadata(session, doc_id)`; raise 404 if None; preserve the inline `await classifier.classify_document(session, doc_id, topic_names)` call (this endpoint historically returned the topic list synchronously — keep that behavior; the upload-time path is the async one). Rewrite `backend/api/topics.py` similarly: add `Depends` import, `from sqlalchemy.ext.asyncio import AsyncSession`, `from deps.db import get_db`. Add `session: AsyncSession = Depends(get_db)` to every route. Wrap every `storage.*` call with `await` and prepend `session`: - `list_topics`: `topics = await storage.load_topics(session)`, `counts = await storage.topic_doc_counts(session)`. - `create_topic`: `topic = await storage.create_topic(session, body.name, body.description, body.color)`. - `update_topic`: `topic = await storage.update_topic(session, topic_id, name=body.name, description=body.description, color=body.color)`. - `delete_topic`: `name = await storage.delete_topic(session, topic_id)`. - `suggest_topics`: `meta = await storage.get_metadata(session, body.document_id)`; if None, 404; then `await classifier.suggest_topics_for_document(session, body.document_id)`. Preserve every Pydantic model and HTTPException message verbatim. cd /Users/nik/Documents/Progamming/document_scanner/backend && python3 -c " import os os.environ.setdefault('REDIS_URL', 'redis://localhost:6379/0') os.environ.setdefault('DATABASE_URL', 'postgresql+psycopg://docuvault_app:changeme_app@localhost:5432/docuvault') import inspect from main import app # Confirm /health route exists with new shape routes = {r.path for r in app.routes} assert '/health' in routes, 'health route missing' # Confirm session injection on documents and topics from api.documents import upload_document, list_documents, get_document, delete_document from api.topics import list_topics, create_topic, update_topic, delete_topic for fn in [upload_document, list_documents, get_document, delete_document, list_topics, create_topic, update_topic, delete_topic]: sig = inspect.signature(fn) params = list(sig.parameters) assert 'session' in params, f'{fn.__name__} missing session param: {params}' print('routes-wired-ok') " - `backend/main.py` no longer contains `ensure_data_dirs` (verifiable: `grep -c "ensure_data_dirs" backend/main.py | grep -q "^0$"`) - `backend/main.py` contains `from minio import Minio`, `from db.session import engine, AsyncSessionLocal`, `from config import settings` - `backend/main.py` lifespan contains `app.state.minio = minio_client` - `backend/main.py` lifespan contains `await engine.dispose()` - `backend/main.py` `/health` contains both `SELECT 1` and `bucket_exists` probes - `backend/main.py` `/health` returns the shape `{"status": ..., "checks": {"postgres": ..., "minio": ...}}` (verifiable by inspecting the return statement and by Task 3 live test) - `backend/api/documents.py` contains `from deps.db import get_db` and `from tasks.document_tasks import extract_and_classify` - `backend/api/documents.py` upload handler contains `extract_and_classify.delay(` (Celery enqueue) — verifiable via `grep -c "extract_and_classify.delay" backend/api/documents.py >= 1` - `backend/api/documents.py` upload handler no longer contains `await classifier.classify_document` in the upload path (the only remaining classifier call is on the `/classify` endpoint) — verifiable via `grep -c "await classifier.classify_document" backend/api/documents.py | grep -q "^1$"` - Every route in `backend/api/documents.py` and `backend/api/topics.py` contains `session: AsyncSession = Depends(get_db)` (verifiable via `grep -c "session: AsyncSession = Depends(get_db)" backend/api/documents.py >= 5` and similarly `>= 5` for topics.py) - The Verify command prints `routes-wired-ok` - `cd backend && python3 -m pytest tests/ -v --collect-only` exits 0 (collection succeeds — no import errors) FastAPI lifespan creates the MinIO bucket and disposes the engine; `/health` probes both services; all document and topic routes use async session injection; upload-time classification is queued via Celery; the `/classify` endpoint remains synchronous for compatibility. Task 3: Final cutover — delete legacy data/, prune config.py, prune conftest.py + test_documents.py legacy fixtures and sync tests, unfail async ports backend/config.py, backend/tests/conftest.py, backend/tests/test_documents.py, backend/tests/test_health.py, backend/data - `backend/data/` directory and all its contents are deleted (D-04). The git repo no longer tracks this directory. - `backend/config.py` no longer declares `DATA_DIR`, `UPLOADS_DIR`, `METADATA_DIR`, `TOPICS_FILE`, `ensure_data_dirs` (legacy flat-file constants). It retains `DEFAULT_SETTINGS`, `DEFAULT_SYSTEM_PROMPT`, the Pydantic `Settings` class, and the module-level `settings = Settings()`. `SETTINGS_FILE` is RETAINED (still used for the Phase-2-deferred settings JSON file path) but its value is rebased onto the new `settings.data_dir` field rather than a removed module-level constant. - `backend/tests/conftest.py` no longer defines the autouse `isolated_data_dir` fixture (the flat-file scaffold). It no longer defines the sync `client` fixture (which built a `TestClient`). The only fixtures remaining are `db_session`, `async_client`, `sample_txt`, `sample_pdf`. Tests are rewired to use `async_client` everywhere. - `backend/tests/test_documents.py` no longer contains any of the legacy sync test functions (`test_upload_txt_no_classify`, `test_upload_pdf_no_classify`, `test_list_documents`, `test_list_documents_filter_by_topic`, `test_get_document`, `test_get_document_not_found`, `test_delete_document`, `test_delete_document_not_found`, `test_upload_empty_file`) — they are DELETED. The `_async`-suffixed tests from Plan 02 have their `@pytest.mark.xfail` markers REMOVED and now run as live tests. - `backend/tests/test_health.py` has the `@pytest.mark.xfail` marker removed from `test_health_checks_postgres_and_minio`; the old sync `test_health(client)` is REPLACED with `test_health_status_ok_sync_deprecated` that is `@pytest.mark.skip(reason="legacy sync client removed in plan 05")` OR deleted entirely. Choose deletion — cleaner. - The full pytest run reports zero XFAIL/SKIPPED for Phase 1 tests except where the underlying service (live PostgreSQL/MinIO/Redis) is unavailable in the test env (in which case the integration tests are skipped via a fixture that probes for service availability — see action below). - backend/config.py (Plan 01 output — identify exactly which legacy constants must be removed) - backend/tests/conftest.py (Plan 02 output — verify which fixtures need to go) - backend/tests/test_documents.py (Plan 02 output — verify which test functions to delete vs keep) - backend/tests/test_health.py (Plan 02 output — same scrutiny) - backend/services/storage.py (Plan 04 output — confirm it still imports `SETTINGS_FILE` from config; we will preserve `SETTINGS_FILE` as a derived path) - backend/api/settings.py (read — uses `services.storage.load_settings/save_settings` which depend on `SETTINGS_FILE`) - .planning/phases/01-infrastructure-foundation/01-CONTEXT.md (D-04 delete `data/` contents) Step 1 — delete legacy data: run `git rm -rf backend/data/` (if tracked) and `rm -rf backend/data/`. Add `backend/data/` to `.gitignore`. If `services/storage.py` or any other file still references `UPLOADS_DIR`/`METADATA_DIR`/`TOPICS_FILE`, fix those references first (per Plan 04 they should already be gone for documents/topics; `SETTINGS_FILE` is the only remaining legacy path). Step 2 — prune `backend/config.py`. Read the current file (post-Plan 01). Remove: - `DATA_DIR = Path(os.environ.get("DATA_DIR", "/app/data"))` - `UPLOADS_DIR = DATA_DIR / "uploads"` - `METADATA_DIR = DATA_DIR / "metadata"` - `TOPICS_FILE = DATA_DIR / "topics.json"` - the `def ensure_data_dirs():` function - the `import json` at the top if no longer used (keep if `DEFAULT_SETTINGS` interpolates from JSON anywhere — currently no) - the `import os` (no longer needed since `Settings` reads env via Pydantic) Preserve: - `from pathlib import Path` (used by `SETTINGS_FILE`) - `DEFAULT_SYSTEM_PROMPT` (used by `services/classifier.py`) - `DEFAULT_SETTINGS` (used by `services/storage.load_settings` fallback) - `class Settings(BaseSettings):` with all Phase 1 fields (Plan 01 output) - `settings = Settings()` module instance Rebase `SETTINGS_FILE` as a derived path computed from `settings.data_dir`: `SETTINGS_FILE = Path(settings.data_dir) / "settings.json"` — placed AFTER the `settings = Settings()` line. Add a comment: `# SETTINGS_FILE: still flat-file in Phase 1; migrates to users.ai_provider in Phase 2`. Step 3 — prune `backend/tests/conftest.py`: - DELETE the entire `isolated_data_dir` fixture (the autouse one that monkey-patches `config.DATA_DIR` etc.) - DELETE the sync `client` fixture (`with TestClient(app) as c: yield c`) - KEEP the `db_session`, `async_client`, `sample_txt`, `sample_pdf` fixtures introduced in Plan 02. Promote `async_client` so the previous behavior — fail gracefully if `deps.db` does not exist — is replaced with a hard dependency: remove the `try/except ImportError: pytest.skip(...)` wrapper inside the async fixtures because `deps.db.get_db` now exists. - ADD a new `pytest_asyncio.fixture(scope="session")` named `live_services_available` that probes localhost:5432, localhost:9000, localhost:6379 via `socket.create_connection(..., timeout=1)`; if any probe fails, the fixture yields `False`; otherwise `True`. Update the `async_client` fixture (or add a new `live_async_client` fixture) to use an actual PostgreSQL + MinIO when `live_services_available` is True, falling back to the in-memory aiosqlite engine when False. Use `pytest.mark.skipif(not live_services_available, reason="docker compose not running")` on integration tests that need real MinIO; unit tests using only the in-memory DB do not need the skip marker. (Simpler approach acceptable: detect via env var `INTEGRATION=1`; if unset, skip integration tests.) Step 4 — prune `backend/tests/test_documents.py`: - DELETE the legacy sync tests: `test_upload_txt_no_classify`, `test_upload_pdf_no_classify`, `test_list_documents`, `test_list_documents_filter_by_topic`, `test_get_document`, `test_get_document_not_found`, `test_delete_document`, `test_delete_document_not_found`, `test_upload_empty_file`. (Plan 02 left them in place during the cutover; this plan completes the deletion.) - On every `_async`-suffixed test added in Plan 02, REMOVE the `@pytest.mark.xfail(strict=False, reason="async storage layer implemented in plan 05")` marker. - Update any test that previously referenced `import services.storage as st; st.update_document_topics(...)` to use the async ORM API via the `db_session` fixture: `from db.models import Document, DocumentTopic; from sqlalchemy import insert; ...`. For tests that need a topic-tagged document, build it via the API itself (call `POST /api/topics` then `PATCH /api/documents/.../classify`). Step 5 — prune `backend/tests/test_health.py`: - DELETE the legacy `def test_health(client):` (it used the sync TestClient fixture which is gone). - REMOVE the `@pytest.mark.xfail` marker from `test_health_checks_postgres_and_minio`. - If `live_services_available` is False, this test should be skipped via `pytest.mark.skipif(...)`. Step 6 — run the full suite end-to-end against the in-memory engine: `cd backend && python3 -m pytest tests/ -v` should exit 0 with the storage tests PASSED, the alembic tests PASSED (or SKIPPED if no PostgreSQL available — the in-memory aiosqlite test path covers them), the health test SKIPPED or PASSED, and the async document tests PASSED or SKIPPED depending on `live_services_available`. cd /Users/nik/Documents/Progamming/document_scanner && [ ! -d backend/data ] && echo "data-dir-deleted" && grep -c "DATA_DIR\|UPLOADS_DIR\|METADATA_DIR\|TOPICS_FILE\|ensure_data_dirs" backend/config.py | awk '{exit ($1 == 0) ? 0 : 1}' && echo "config-pruned" && cd backend && python3 -m pytest tests/ -v 2>&1 | tail -20 - `backend/data/` directory does not exist (verifiable: `[ ! -d backend/data ]` exits 0) - `.gitignore` contains `backend/data/` (verifiable: `grep -Fx "backend/data/" .gitignore` exits 0) - `backend/config.py` no longer mentions `DATA_DIR`, `UPLOADS_DIR`, `METADATA_DIR`, `TOPICS_FILE`, `ensure_data_dirs` (verifiable via the Verify command's grep-c check) - `backend/config.py` still defines `DEFAULT_SETTINGS`, `DEFAULT_SYSTEM_PROMPT`, `class Settings(BaseSettings)`, `settings = Settings()`, and `SETTINGS_FILE = Path(...) / "settings.json"` - `backend/tests/conftest.py` no longer defines `isolated_data_dir` (verifiable: `grep -c "def isolated_data_dir" backend/tests/conftest.py | grep -q "^0$"`) - `backend/tests/conftest.py` no longer defines a sync `client` fixture using `TestClient` (verifiable: `grep -c "TestClient" backend/tests/conftest.py | grep -q "^0$"`) - `backend/tests/test_documents.py` no longer contains the legacy sync test names (verifiable: `grep -cE "^def test_(upload_txt_no_classify|upload_pdf_no_classify|list_documents|get_document|delete_document|upload_empty_file)\b" backend/tests/test_documents.py | grep -q "^0$"`) - `backend/tests/test_documents.py` no longer contains `@pytest.mark.xfail` markers (the cutover removes them — verifiable: `grep -c "@pytest.mark.xfail" backend/tests/test_documents.py | grep -q "^0$"`) - `backend/tests/test_health.py` no longer contains `@pytest.mark.xfail` (verifiable: `grep -c "@pytest.mark.xfail" backend/tests/test_health.py | grep -q "^0$"`) - `cd backend && python3 -m pytest tests/ -v 2>&1` shows 0 FAILED and 0 ERROR lines (verifiable: `python3 -m pytest tests/ 2>&1 | grep -E "^FAILED|^ERROR" | wc -l | grep -q "^0$"`) - The Verify command output shows `data-dir-deleted` and `config-pruned` The Phase 1 cutover is complete: no flat-file artifacts remain in code or on disk; the test suite uses only async fixtures; the legacy tests have been deleted; the async ports of every legacy test run as first-class tests. Task 4: End-to-end walking-skeleton verification — docker compose up + real PDF upload + Celery worker (verification only) - .planning/phases/01-infrastructure-foundation/SKELETON.md (the success contract for this checkpoint) - .planning/phases/01-infrastructure-foundation/01-03-SUMMARY.md (the Alembic upgrade output from Plan 03) - .planning/phases/01-infrastructure-foundation/01-04-SUMMARY.md (the storage rewrite summary from Plan 04) Plans 01-05 together: a fully wired DocuVault backend running on Docker Compose with PostgreSQL + MinIO + Redis + Celery + FastAPI. This checkpoint verifies the walking-skeleton end-to-end: a real document upload via the rewritten API persists metadata to PostgreSQL, stores bytes in MinIO with a UUID-based object key, enqueues extraction + classification on Redis, and the Celery worker processes the task. `GET /health` returns `postgres: ok` and `minio: ok`. ROADMAP.md Phase 1 success criteria #1, #3, and #4 are verified live here (#2 was verified in Plan 03). From the project root: 1. Ensure `.env` exists with all variables from `.env.example` filled in: `cp .env.example .env` (if not present) and replace each `changeme_*` placeholder with a value of your choice. The DATABASE_URL/DATABASE_MIGRATE_URL passwords MUST match the hardcoded passwords in `docker/postgres/initdb.d/01-init-users.sql` from Plan 01 (which itself was committed during Wave 1). The REDIS_URL password must match REDIS_PASSWORD. 2. Tear down any prior state: `docker compose down -v` (the `-v` deletes the postgres_data and minio_data named volumes so the init script will re-run). 3. Boot everything: `docker compose up --build -d`. Wait ~30 seconds. 4. Verify all services are healthy: `docker compose ps`. The `STATUS` column must show `Up (healthy)` for `postgres`, `minio`, `redis`, `backend`, AND `celery-worker`. If any is `unhealthy`, capture `docker compose logs ` and resolve before continuing. 5. Apply the migration against the live DB: `docker compose exec backend bash -lc "cd /app && alembic upgrade head"`. Must exit 0 with `Running upgrade -> 0001`. 6. Hit the health endpoint: `curl -s http://localhost:8000/health | python3 -m json.tool`. The response MUST be: ``` { "status": "ok", "checks": { "postgres": "ok", "minio": "ok" } } ``` 7. Upload a real PDF or text file. Pick any small PDF (or use `printf 'Test document about invoices and contracts.' > /tmp/test.txt` first). Then: ``` curl -s -X POST http://localhost:8000/api/documents/upload \ -F "file=@/tmp/test.txt;type=text/plain" \ -F "auto_classify=true" | python3 -m json.tool ``` Confirm the response includes: - `"id"` — a 36-character UUID string - `"original_name": "test.txt"` - `"size_bytes"` matching the file size - `"topics": []` (classification is async — the Celery worker fills this in seconds later) 8. Confirm the document landed in PostgreSQL: `docker compose exec postgres psql -U docuvault_app -d docuvault -c "SELECT id, filename, object_key, status FROM documents ORDER BY created_at DESC LIMIT 1;"` — exactly one row; `object_key` starts with `null-user/` (D-03 sentinel from Plan 04); `status` is `pending` initially then `classified` or `classification_failed` after the worker runs. 9. Confirm the document landed in MinIO. The object key from step 8 will look like `null-user//.txt`. Either use the MinIO web console at `http://localhost:9001` (login with `MINIO_ROOT_USER` / `MINIO_ROOT_PASSWORD` from `.env`) and navigate to `docuvault` bucket → confirm the object exists with non-zero size — OR use `mc`: `docker compose exec minio mc alias set local http://localhost:9000 $MINIO_ROOT_USER $MINIO_ROOT_PASSWORD` then `docker compose exec minio mc ls local/docuvault/null-user/`. 10. Confirm the Celery worker processed the task: `docker compose logs celery-worker | tail -30` — look for a `Task tasks.document_tasks.extract_and_classify[...] received` line followed by `succeeded` or a structured error. If the task succeeded, run: `curl -s http://localhost:8000/api/documents | python3 -m json.tool` — the response should show one item with `extracted_text` populated and possibly `topics` populated by the AI classifier (depending on AI provider config; if no `ANTHROPIC_API_KEY` / `OPENAI_API_KEY` is set, classification will fail gracefully and `status` will be `classification_failed` — that is acceptable for this walking-skeleton check; the storage layer worked.). 11. Delete the document: `curl -s -X DELETE http://localhost:8000/api/documents/` returns `{"success": true}`. Then confirm the MinIO object is gone: `docker compose exec minio mc ls local/docuvault/null-user//` returns empty or "Object does not exist". 12. Run the test suite against the live stack: `docker compose exec -e INTEGRATION=1 backend bash -lc "cd /app && pytest tests/ -v"` — every test PASSED, zero FAILED, zero XFAIL (skips for integration tests when INTEGRATION=0 are acceptable on a host-only run; when INTEGRATION=1 inside the container with live services, they must run and pass). All 12 verification steps succeed. The walking skeleton is live: PDF → API → PostgreSQL + MinIO + Celery → extracted text → classification → DB row. ROADMAP.md Phase 1 is complete. Common failures and fixes: - `/health` reports `postgres: error: ConnectionRefusedError`: postgres healthcheck didn't gate startup; check `depends_on: condition: service_healthy` is set on `backend` and `celery-worker`. Inspect `docker compose ps` and `docker compose logs postgres`. - `/health` reports `minio: error: bucket missing`: the lifespan bucket-create failed. Check `docker compose logs backend` for the `make_bucket` error. Likely cause: `MINIO_ACCESS_KEY` / `MINIO_SECRET_KEY` mismatch — the lifespan client connects with app-level keys but MinIO only knows about the root user on first boot. Workaround for Phase 1: temporarily set `MINIO_ACCESS_KEY=$MINIO_ROOT_USER` and `MINIO_SECRET_KEY=$MINIO_ROOT_PASSWORD` in `.env` (Phase 2 will set up an app-level access policy via `mc admin user add` during MinIO init). - Celery worker logs show `[ERROR/MainProcess] consumer: Cannot connect to redis://...`: the REDIS_PASSWORD or REDIS_URL is wrong, or the password contains a special character not URL-encoded. Re-confirm `REDIS_URL` form `redis://:@redis:6379/0`. - Upload returns 500 with `MissingGreenlet`: a session attribute access happened after commit; verify `expire_on_commit=False` in `db/session.py`. - Task never runs: `docker compose logs celery-worker` shows it can't import `tasks.document_tasks`; verify `tasks/__init__.py` exists and `celery_app.autodiscover_tasks(["tasks"])` is called. Type "approved" once steps 1-12 all pass. If any step fails, describe the failure mode and we resume with a fix plan. ## Trust Boundaries | Boundary | Description | |----------|-------------| | Browser → FastAPI | HTTP/JSON (Phase 1: CORS `*` — Phase 2 locks down); multipart upload bytes traverse this boundary | | FastAPI → Celery / Redis | Task payload is the document_id string only; no user input passed | | FastAPI lifespan → MinIO | Bucket auto-create at startup; client persists on `app.state.minio` | | Celery worker → MinIO + PostgreSQL | Worker re-fetches bytes from MinIO and reads/writes Document row | ## STRIDE Threat Register | Threat ID | Category | Component | Disposition | Mitigation Plan | |-----------|----------|-----------|-------------|-----------------| | T-01-05-01 | Spoofing | Unauthenticated upload endpoint accepting any client | accept | Phase 1 has no auth (D-03 — user_id nullable); upload accessible to anyone reaching `localhost:8000`. Phase 2 adds JWT + CSRF + rate-limit. Documented in SKELETON.md "Out of Scope". | | T-01-05-02 | Tampering | MIME-type spoofing on upload | mitigate | The existing `ALLOWED_MIME_TYPES` set in `api/documents.py` is preserved verbatim. Phase 4 (DOC-02) adds magic-byte verification before download/preview. | | T-01-05-03 | Information Disclosure | `/health` revealing internal error class names | mitigate | `/health` error strings format as `f"error: {type(e).__name__}: {e}"` — exposes Python exception class name, which is acceptable for an internal/dev endpoint in Phase 1. Phase 2 will trim to `"error"` or `"unhealthy"` once the endpoint is reachable from the internet. Documented note in `main.py`. | | T-01-05-04 | Tampering | Celery task receives untrusted document_id and might query arbitrary rows | mitigate | `extract_and_classify` only takes a `document_id` string from the upload path — never from a user query parameter. Task code does `session.get(Document, uuid.UUID(document_id))` which raises `ValueError` for non-UUID input; no SQL injection vector. Document row lookup is single-row by primary key only. | | T-01-05-05 | Denial of Service | Lifespan bucket-create on every reboot blocks startup | mitigate | `if not bucket_exists: make_bucket` is idempotent — fast on warm starts. If MinIO is unreachable at startup, lifespan raises and the FastAPI app fails to boot — this is intentional and surfaces the failure to Compose's `depends_on: condition: service_healthy` (which gated startup but cannot catch a post-startup MinIO crash). | | T-01-05-06 | Information Disclosure | `app.state.minio` reused across handlers | accept | The client holds connection state but no per-user credentials. All app handlers see the same `app.state.minio` — acceptable since Phase 1 has no per-user isolation. Phase 5 will introduce per-user `StorageBackend` instances for cloud backends. | | T-01-05-SC | Tampering | npm/pip installs | N/A | No new package installs in this plan — all dependencies were added in Plan 01 and verified via RESEARCH.md Package Legitimacy Audit. | - Tasks 1-3 are autonomous; Task 4 is a blocking human-verify checkpoint. - After Task 4 approval, ROADMAP.md Phase 1 success criteria #1 (docker compose up healthy), #3 (extract + classify pipeline works), and #4 (MinIO key schema enforced) are all live-verified. Criterion #2 was verified in Plan 03 Task 3. - `docker compose exec -e INTEGRATION=1 backend bash -lc "cd /app && pytest tests/ -v"` exits 0 with zero FAILED. - `backend/main.py` lifespan creates the MinIO bucket and disposes the engine; `/health` returns the postgres+minio shape per D-07. - `backend/api/documents.py` and `backend/api/topics.py` are entirely async-session-driven; upload-time classification is queued via Celery `.delay()`. - `backend/celery_app.py` and `backend/tasks/document_tasks.py` are wired and discoverable. - `backend/services/classifier.py` accepts an `AsyncSession`. - `backend/config.py` is pruned of legacy flat-file constants. - `backend/data/` is deleted; `tests/test_documents.py` is async-only; `tests/conftest.py` no longer ships a sync TestClient fixture. - `docker compose up` boots healthy; the walking skeleton end-to-end check from Task 4 passes. Create `.planning/phases/01-infrastructure-foundation/01-05-SUMMARY.md` when done. Include: the exact `/health` JSON response observed at step 6 of Task 4, the actual MinIO object key produced at step 7-9, the Celery task log line from step 10, and any deviations from the plan (e.g., the temporary MinIO-root-as-app-key workaround called out in `if-broken`).