Files
curo1305 fdc32d431d docs(03): create Phase 3 execution plan — document migration & multi-user isolation
5 plans across 5 sequential waves covering: Alembic migration 0003 (null-user
cleanup, NOT NULL constraint, quota reconciliation), presigned MinIO PUT upload
flow with atomic quota enforcement, auth guards on all document/topic endpoints,
flat-file settings retirement + per-user AI classification, and frontend quota bar
with 3-step XHR upload progress.

Verification passed across all 12 dimensions. All 8 phase requirements covered
(STORE-03/04/05/06, SEC-04, DOC-03/04/05).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-23 13:36:28 +02:00

40 KiB

Phase 3: Document Migration & Multi-User Isolation - Pattern Map

Mapped: 2026-05-23 Files analyzed: 14 new/modified files Analogs found: 14 / 14


File Classification

New/Modified File Role Data Flow Closest Analog Match Quality
backend/migrations/versions/0003_multi_user_isolation.py migration batch backend/migrations/versions/0002_add_backup_codes_and_password_must_change.py exact
backend/storage/base.py ABC request-response backend/storage/base.py (self — add abstract method) exact
backend/storage/minio_backend.py service request-response backend/storage/minio_backend.py (self — add methods mirroring presigned_get_url) exact
backend/api/documents.py controller request-response backend/api/auth.py (auth-gated CRUD + ownership) role-match
backend/api/topics.py controller CRUD backend/api/admin.py (auth-gated CRUD with role guard) role-match
backend/api/auth.py controller request-response backend/api/auth.py (self — add GET /me/quota endpoint) exact
backend/services/classifier.py service transform backend/services/classifier.py (self — change signature) exact
backend/services/storage.py service CRUD backend/services/storage.py (self — remove settings, add quota SQL) exact
backend/tasks/document_tasks.py task batch backend/tasks/email_tasks.py (DB lookup within async task body) role-match
backend/celery_app.py config event-driven backend/celery_app.py (self — add beat_schedule) exact
backend/config.py config backend/config.py (self — add optional env vars) exact
frontend/src/stores/documents.js store request-response frontend/src/stores/auth.js (multi-step async action) role-match
frontend/src/components/layout/AppSidebar.vue component request-response frontend/src/components/layout/AppSidebar.vue (self — add footer widget) exact
frontend/src/components/layout/QuotaBar.vue component request-response frontend/src/components/auth/PasswordStrengthBar.vue role-match
frontend/src/components/upload/UploadProgress.vue component event-driven frontend/src/components/upload/UploadProgress.vue (self — add progress bar) exact

Pattern Assignments

backend/migrations/versions/0003_multi_user_isolation.py (migration, batch)

Analog: backend/migrations/versions/0002_add_backup_codes_and_password_must_change.py and backend/migrations/versions/0001_initial_schema.py

Header / revision identifiers pattern (0002, lines 1-18):

"""Add backup_codes table and password_must_change column to users.

Revision ID: 0002
Revises: 0001
Create Date: 2026-05-22
...
"""
from __future__ import annotations

import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
from alembic import op

revision = "0002"
down_revision = "0001"
branch_labels = None
depends_on = None

Phase 3 migration: revision = "0003", down_revision = "0002". Add a plain-language docstring listing all steps (cleanup, NOT NULL, quota reconcile, index).

Data-manipulation ops pattern (0002, lines 26-53):

def upgrade() -> None:
    # ── 1. Add password_must_change to users ──────────────────────────────────
    op.add_column(
        "users",
        sa.Column(
            "password_must_change",
            sa.Boolean(),
            nullable=False,
            server_default="false",
        ),
    )

    # ── 2. Create backup_codes table ──────────────────────────────────────────
    op.create_table(...)
    op.create_index("ix_backup_codes_user_id", "backup_codes", ["user_id"])

    # ── Privilege grants (Pitfall 4) ───────────────────────────────────────────
    op.execute(
        "GRANT SELECT, INSERT, UPDATE, DELETE ON backup_codes TO docuvault_app;"
    )

Phase 3 uses op.execute(text(...)) for DELETE/UPDATE statements (no new tables, so no GRANT needed per Finding 1). Numbered section comments (# ── 1., # ── 2.) are the established style.

Schema change pattern (0002, lines 56-63):

def downgrade() -> None:
    op.drop_index("ix_backup_codes_user_id", table_name="backup_codes")
    op.drop_table("backup_codes")
    op.drop_column("users", "password_must_change")

Phase 3 downgrade() reverses the NOT NULL constraint via op.alter_column('documents', 'user_id', nullable=True) and drops ix_topics_user_id. The MinIO and row-delete steps cannot be reversed — document this explicitly in the downgrade docstring.

Inline synchronous MinIO pattern for migration (RESEARCH.md Finding 1 — no codebase analog, copy from research):

# In upgrade() — call synchronous MinIO SDK directly, NOT the async MinIOBackend wrapper
import os
from minio import Minio

_client = Minio(
    endpoint=os.environ["MINIO_ENDPOINT"],
    access_key=os.environ["MINIO_ACCESS_KEY"],
    secret_key=os.environ["MINIO_SECRET_KEY"],
    secure=False,
)
for key in _keys_to_delete:
    try:
        _client.remove_object(os.environ.get("MINIO_BUCKET", "docuvault"), key)
    except Exception:
        pass  # object already gone — safe to skip

Quota reconciliation SQL (RESEARCH.md D-03 / Finding 4):

op.execute(
    text(
        "UPDATE quotas SET used_bytes = ("
        "  SELECT COALESCE(SUM(size_bytes), 0) FROM documents"
        "  WHERE documents.user_id = quotas.user_id"
        ")"
    )
)

backend/storage/base.py (ABC, request-response)

Analog: backend/storage/base.py lines 47-49 — existing presigned_get_url abstract method; new presigned_put_url follows identical signature style.

Existing abstract method to copy style from (lines 21-34 and 47-49):

@abstractmethod
async def put_object(
    self,
    user_id: str,
    document_id: str,
    file_bytes: bytes,
    extension: str,
    content_type: str,
) -> str:
    """Store bytes and return the generated object key.

    The key MUST follow the STORE-02 schema: {user_id}/{document_id}/{uuid4()}{ext}.
    The human-readable filename MUST NOT appear in the returned key.
    """
    ...

@abstractmethod
async def presigned_get_url(self, object_key: str, expires_minutes: int = 60) -> str:
    """Return a time-limited pre-signed download URL for the object."""
    ...

Add two new abstract methods after presigned_get_url using the same style:

@abstractmethod
async def generate_presigned_put_url(
    self,
    object_key: str,
    expires_minutes: int = 15,
) -> str:
    """Return a time-limited pre-signed PUT URL for direct browser upload.

    The object_key MUST be pre-computed before calling this method and stored
    in the Document row so stat_object() at confirm time uses the same key.
    Uses the public-facing client (not the Docker-internal client) so the URL
    is browser-resolvable (RESEARCH.md Finding 3).
    """
    ...

@abstractmethod
async def stat_object(self, object_key: str) -> int:
    """Return the size in bytes of an existing object.

    Raises minio.error.S3Error with code='NoSuchKey' if the object does not exist.
    Call at confirm time to get the authoritative file size (D-07).
    """
    ...

backend/storage/minio_backend.py (service, request-response)

Analog: backend/storage/minio_backend.py — existing presigned_get_url and delete_object methods demonstrate both asyncio.to_thread patterns.

Imports and constructor pattern (lines 1-46):

import asyncio
import io
import uuid
from datetime import timedelta

from minio import Minio

from storage.base import StorageBackend


class MinIOBackend(StorageBackend):
    def __init__(
        self,
        endpoint: str,
        access_key: str,
        secret_key: str,
        bucket: str,
        secure: bool = False,
    ) -> None:
        self._bucket = bucket
        self._client = Minio(
            endpoint=endpoint,
            access_key=access_key,
            secret_key=secret_key,
            secure=secure,
        )

Phase 3 adds a public_endpoint parameter and creates self._public_client as a second Minio instance. Add after self._client construction:

        # Second client for presigned URL generation — uses the browser-accessible
        # hostname (RESEARCH.md Finding 3 / MINIO_PUBLIC_ENDPOINT env var).
        # Falls back to the internal client if public_endpoint is not configured.
        self._public_client = Minio(
            endpoint=public_endpoint or endpoint,
            access_key=access_key,
            secret_key=secret_key,
            secure=secure,
        )

Simple asyncio.to_thread pattern (lines 86-88):

async def delete_object(self, object_key: str) -> None:
    """Delete an object from MinIO by key."""
    await asyncio.to_thread(self._client.remove_object, self._bucket, object_key)

New stat_object follows the same single-liner pattern:

async def stat_object(self, object_key: str) -> int:
    """Return the authoritative file size in bytes via stat_object (D-07).

    Raises minio.error.S3Error(code='NoSuchKey') if object does not exist.
    """
    result = await asyncio.to_thread(self._client.stat_object, self._bucket, object_key)
    return result.size

Existing presigned_get_url pattern (lines 90-99) — copy for generate_presigned_put_url, using self._public_client instead of self._client:

async def presigned_get_url(
    self, object_key: str, expires_minutes: int = 60
) -> str:
    """Return a time-limited pre-signed download URL."""
    return await asyncio.to_thread(
        self._client.presigned_get_object,
        self._bucket,
        object_key,
        timedelta(minutes=expires_minutes),
    )

New method:

async def generate_presigned_put_url(
    self,
    object_key: str,
    expires_minutes: int = 15,
) -> str:
    """Return a presigned PUT URL using the public-facing client.

    Uses self._public_client so the URL contains the browser-resolvable hostname,
    not the Docker-internal 'minio:9000' hostname (RESEARCH.md Finding 3).
    """
    return await asyncio.to_thread(
        self._public_client.presigned_put_object,
        self._bucket,
        object_key,
        timedelta(minutes=expires_minutes),
    )

backend/api/documents.py (controller, request-response)

Analog: backend/api/auth.py (auth-gated handlers) and backend/api/admin.py (ownership + role guards). Current backend/api/documents.py is replaced.

Imports pattern — copy from api/auth.py lines 22-39, adapted for documents:

from __future__ import annotations

import uuid

from fastapi import APIRouter, Depends, HTTPException, Query, status
from pydantic import BaseModel
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession

from config import settings
from db.models import Document, Quota, User
from deps.auth import get_current_user
from deps.db import get_db
from services import storage
from tasks.document_tasks import extract_and_classify
from storage import get_storage_backend

Auth guard + admin 403 pattern — derive from deps/auth.py get_current_admin (lines 78-92); add a get_regular_user dependency:

async def get_regular_user(user: User = Depends(get_current_user)) -> User:
    """Reject admin accounts on all /api/documents/* endpoints (D-16, SC4).

    Admin accounts cannot access document content (CLAUDE.md + SEC-04).
    Returns 403 (not 404) — the admin knows document endpoints exist.
    """
    if user.role == "admin":
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Admin accounts cannot access document content",
        )
    return user

Ownership assertion pattern — derived from RESEARCH.md Finding 7; matches 404-for-cross-user rule (D-16):

doc = await session.get(Document, uuid.UUID(doc_id))
if doc is None or doc.user_id != current_user.id:
    raise HTTPException(status_code=404, detail="Document not found")

This identical two-condition check appears in every handler that takes a doc_id path parameter.

Atomic quota enforcement pattern — from RESEARCH.md Finding 4; use sqlalchemy.text() with bound params, fetchone() to detect quota exceeded:

from sqlalchemy import text

result = await session.execute(
    text("""
        UPDATE quotas
        SET used_bytes = used_bytes + :delta
        WHERE user_id = :uid
          AND (used_bytes + :delta) <= limit_bytes
        RETURNING used_bytes, limit_bytes
    """),
    {"delta": file_size, "uid": str(current_user.id)},
)
row = result.fetchone()
if row is None:
    quota_row = await session.execute(
        text("SELECT used_bytes, limit_bytes FROM quotas WHERE user_id = :uid"),
        {"uid": str(current_user.id)},
    )
    q = quota_row.fetchone()
    raise HTTPException(
        status_code=413,
        detail={
            "used_bytes": q.used_bytes if q else 0,
            "limit_bytes": q.limit_bytes if q else 0,
            "rejected_bytes": file_size,
        },
    )

Atomic quota decrement pattern — from RESEARCH.md Finding 4 (delete handler):

await session.execute(
    text("""
        UPDATE quotas
        SET used_bytes = GREATEST(0, used_bytes - :delta)
        WHERE user_id = :uid
    """),
    {"delta": doc.size_bytes, "uid": str(doc.user_id)},
)

Existing simple handler pattern to preserve (current api/documents.py lines 70-80):

@router.get("")
async def list_documents(
    topic: Optional[str] = Query(None),
    page: int = Query(1, ge=1),
    per_page: int = Query(20, ge=1, le=100),
    session: AsyncSession = Depends(get_db),
):
    docs = await storage.list_metadata(session, topic=topic)
    ...

Phase 3 adds current_user: User = Depends(get_regular_user) to this and every other handler.


backend/api/topics.py (controller, CRUD)

Analog: backend/api/topics.py (self, modify) with backend/api/admin.py auth guard style.

Current handler without auth (lines 29-35):

@router.get("")
async def list_topics(session: AsyncSession = Depends(get_db)):
    topics = await storage.load_topics(session)
    ...

Phase 3 adds current_user: User = Depends(get_current_user) and passes current_user.id to the service layer for namespace filtering.

Topic namespace query pattern — from RESEARCH.md Finding 6; use or_ / is_:

from sqlalchemy import or_
from db.models import Topic

stmt = select(Topic).where(
    or_(Topic.user_id == current_user.id, Topic.user_id.is_(None))
).order_by(Topic.name)

Admin-only endpoint pattern — copy from api/admin.py lines 133-137:

@router.get("/users")
async def list_users(
    session: AsyncSession = Depends(get_db),
    _admin: User = Depends(get_current_admin),
) -> dict:

For POST /api/admin/topics, use _admin: User = Depends(get_current_admin) as the guard. The new admin topics router prefix is /api/admin and lives in api/admin.py (append there, do not create a new file unless the planner decides otherwise).


backend/api/auth.py (controller, request-response — add GET /me/quota)

Analog: backend/api/auth.py lines 383-387 — the existing GET /api/auth/me endpoint is the direct model.

Existing get_me handler pattern (lines 383-387):

@router.get("/me")
async def get_me(current_user: User = Depends(get_current_user)):
    """Return the current user's profile (requires valid Bearer token)."""
    return _user_dict(current_user)

New GET /api/auth/me/quota follows the same pattern but adds session:

@router.get("/me/quota")
async def get_my_quota(
    current_user: User = Depends(get_current_user),
    session: AsyncSession = Depends(get_db),
):
    """Return the current user's quota usage (STORE-04)."""
    q = await session.get(Quota, current_user.id)
    if q is None:
        raise HTTPException(status_code=404, detail="Quota not found")
    return {"used_bytes": q.used_bytes, "limit_bytes": q.limit_bytes}

Add from db.models import Quota to the imports at the top of api/auth.py (the User import is already there on line 33).


backend/services/classifier.py (service, transform)

Analog: backend/services/classifier.py (self — signature change) and backend/ai/__init__.py (factory).

Current classify_document signature (lines 18-23):

async def classify_document(
    session: AsyncSession,
    doc_id: str,
    topic_names: list[str] | None = None,
) -> list[str]:

Phase 3 signature adds two optional keyword parameters:

async def classify_document(
    session: AsyncSession,
    doc_id: str,
    topic_names: list[str] | None = None,
    ai_provider: str | None = None,   # NEW — from user.ai_provider (D-14)
    ai_model: str | None = None,      # NEW — from user.ai_model (D-15)
) -> list[str]:

Current provider construction (to replace) (lines 33-35):

settings = storage.load_settings()
system_prompt = settings.get("system_prompt", "")
provider = get_provider(settings)

Replace with minimal-change dict construction (RESEARCH.md Finding 12):

from config import settings as app_settings

_ai_provider = ai_provider or app_settings.default_ai_provider
_ai_model = ai_model or app_settings.default_ai_model
system_prompt = app_settings.system_prompt or _DEFAULT_SYSTEM_PROMPT
_settings = {
    "active_provider": _ai_provider,
    "providers": {
        _ai_provider: {"model": _ai_model},
    },
}
provider = get_provider(_settings)

_DEFAULT_SYSTEM_PROMPT is a module-level constant with the hardcoded fallback (D-13).

Topic namespace filter for classifier — use or_ query from RESEARCH.md Finding 6. The load_topics(session) call in line 39 must be replaced with a user-scoped query. Pass user_id into the topic lookup:

# Replaces: all_topics = await storage.load_topics(session)
all_topics = await storage.load_topics_for_user(session, user_id=doc.user_id)

load_topics_for_user is a new function to add to services/storage.py.

New topic creation scoped to user namespace (D-11) — replace line 52:

# Before:
await storage.create_topic(session, name.strip())
# After (user_id from doc.user_id ensures per-user namespace):
await storage.create_topic(session, name.strip(), user_id=doc.user_id)

backend/services/storage.py (service, CRUD)

Analog: backend/services/storage.py (self — remove functions, add new ones).

Functions to remove entirely (lines 416-450):

  • load_settings() — lines 416-426
  • save_settings() — lines 428-434
  • mask_api_key() — lines 437-440
  • settings_masked() — lines 443-449

Imports to remove from lines 36-37:

from config import DEFAULT_SETTINGS, SETTINGS_FILE

Replace with:

from config import settings as app_settings

save_upload removal (lines 86-136) — the function that accepted file bytes and uploaded to MinIO is deleted in Phase 3. The upload-url handler in api/documents.py creates the Document row and presigned URL directly. The function signature to retain is save_metadata, get_metadata, list_metadata.

list_metadata must add user filter (current lines 178-196):

async def list_metadata(
    session: AsyncSession, topic: Optional[str] = None
) -> list:
    stmt = select(Document).order_by(Document.created_at.desc())

Phase 3: add user_id parameter:

async def list_metadata(
    session: AsyncSession,
    user_id: uuid.UUID,
    topic: Optional[str] = None,
) -> list:
    stmt = select(Document).where(Document.user_id == user_id).order_by(Document.created_at.desc())

delete_document quota decrement (currently lines 199-222 — no quota logic): After the MinIO delete attempt, add the quota decrement before await session.delete(doc):

# Atomic quota decrement (STORE-06, D-07)
await session.execute(
    text("UPDATE quotas SET used_bytes = GREATEST(0, used_bytes - :delta) WHERE user_id = :uid"),
    {"delta": doc.size_bytes, "uid": str(doc.user_id)},
)

create_topic user_id parameter (current signature, lines 328-355):

async def create_topic(
    session: AsyncSession,
    name: str,
    description: str = "",
    color: str = "#6366f1",
) -> dict:
    q = await session.execute(
        select(Topic).where(sql_func.lower(Topic.name) == name.lower())
    )

Phase 3 adds user_id: uuid.UUID | None = None and scopes the deduplication query (RESEARCH.md Finding 6 Risk 5):

async def create_topic(
    session: AsyncSession,
    name: str,
    description: str = "",
    color: str = "#6366f1",
    user_id: uuid.UUID | None = None,  # NEW — None = system topic
) -> dict:
    q = await session.execute(
        select(Topic).where(
            sql_func.lower(Topic.name) == name.lower(),
            Topic.user_id == user_id,  # scope deduplication by namespace
        )
    )

New load_topics_for_user function (add after load_topics):

async def load_topics_for_user(
    session: AsyncSession,
    user_id: uuid.UUID,
) -> list:
    """Return system topics (user_id IS NULL) + user's own topics, ordered by name."""
    from sqlalchemy import or_
    q = await session.execute(
        select(Topic).where(
            or_(Topic.user_id == user_id, Topic.user_id.is_(None))
        ).order_by(Topic.name)
    )
    return [
        {"id": str(t.id), "name": t.name, "description": t.description, "color": t.color}
        for t in q.scalars()
    ]

backend/tasks/document_tasks.py (task, batch)

Analog: backend/tasks/email_tasks.py _run_send_security_alert (lines 50-73) — the pattern for doing a DB lookup inside an async task body; and existing _run function for the overall structure.

Existing task entry-point pattern (document_tasks.py lines 22-25):

@celery_app.task(name="tasks.document_tasks.extract_and_classify")
def extract_and_classify(document_id: str) -> dict:
    """Synchronous Celery entry-point — delegates to async _run via asyncio.run."""
    return asyncio.run(_run(document_id))

New cleanup_abandoned_uploads task follows the same pattern:

@celery_app.task(name="tasks.document_tasks.cleanup_abandoned_uploads")
def cleanup_abandoned_uploads() -> dict:
    """Synchronous entry-point — delegates to async _cleanup via asyncio.run."""
    return asyncio.run(_cleanup_abandoned())

Existing _run DB lookup and deferred import style (lines 28-48):

async def _run(document_id: str) -> dict:
    import uuid as _uuid

    from db.session import AsyncSessionLocal
    from db.models import Document
    from services import extractor, classifier
    from storage import get_storage_backend

    async with AsyncSessionLocal() as session:
        try:
            doc_uuid = _uuid.UUID(document_id)
        except ValueError:
            return {"document_id": document_id, "status": "invalid_id"}

        doc = await session.get(Document, doc_uuid)
        if doc is None:
            return {"document_id": document_id, "status": "not_found"}

Phase 3 _run adds a second DB lookup for AI config after fetching the document — copy the session.get pattern for the User model:

        # Resolve per-user AI config (D-14, DOC-03, DOC-05)
        from db.models import User
        user = await session.get(User, doc.user_id)
        ai_provider = (user.ai_provider if user else None) or settings.default_ai_provider
        ai_model = (user.ai_model if user else None) or settings.default_ai_model

Then pass to classifier:

        topics = await classifier.classify_document(
            session, document_id,
            ai_provider=ai_provider,
            ai_model=ai_model,
        )

_cleanup_abandoned structure (modeled on email_tasks.py _run_send_security_alert, lines 51-73):

async def _cleanup_abandoned() -> dict:
    from datetime import datetime, timezone, timedelta
    from sqlalchemy import select

    from db.session import AsyncSessionLocal
    from db.models import Document
    from storage import get_storage_backend

    cutoff = datetime.now(timezone.utc) - timedelta(hours=1)
    async with AsyncSessionLocal() as session:
        result = await session.execute(
            select(Document).where(
                Document.status == "pending",
                Document.created_at < cutoff,
            )
        )
        docs = result.scalars().all()
        backend = get_storage_backend()
        cleaned = 0
        for doc in docs:
            try:
                if doc.object_key:
                    await backend.delete_object(doc.object_key)
            except Exception:
                pass  # MinIO object may not exist yet — safe to ignore
            await session.delete(doc)
            cleaned += 1
        await session.commit()
    return {"cleaned": cleaned}

backend/celery_app.py (config, event-driven)

Analog: backend/celery_app.py (self — add beat_schedule). Existing task_routes block is the model.

Existing task_routes pattern (lines 31-34):

celery_app.conf.task_routes = {
    "tasks.document_tasks.*": {"queue": "documents"},
    "tasks.email_tasks.*": {"queue": "email"},
}

Add beat schedule after task_routes using the same conf assignment style:

from datetime import timedelta as _timedelta

celery_app.conf.beat_schedule = {
    "cleanup-abandoned-uploads": {
        "task": "tasks.document_tasks.cleanup_abandoned_uploads",
        "schedule": _timedelta(minutes=30),
    },
}
celery_app.conf.timezone = "UTC"

Import timedelta with an underscore alias (_timedelta) to follow the module's "keep imports minimal, avoid pulling in config" convention (see module docstring line 6).


backend/config.py (config)

Analog: backend/config.py (self — add three optional env vars). Existing optional-with-default pattern:

Existing optional string field pattern (lines 40-44):

# SMTP (Phase 2 — D-01)
smtp_host: str = ""
smtp_port: int = 587
smtp_user: str = ""
smtp_password: str = ""

Phase 3 additions follow the same pattern — typed field with a documented default:

# MinIO public endpoint (Phase 3 — presigned URL Docker hostname fix, RESEARCH.md Finding 3)
minio_public_endpoint: str = ""  # falls back to minio_endpoint if empty

# AI classification defaults (Phase 3 — D-13, D-15)
system_prompt: str = ""              # SYSTEM_PROMPT env var; hardcoded fallback in classifier.py
default_ai_provider: str = "ollama"  # DEFAULT_AI_PROVIDER env var
default_ai_model: str = "llama3.2"   # DEFAULT_AI_MODEL env var

Remove from config.py: SETTINGS_FILE = Path(...) (line 60), DEFAULT_SYSTEM_PROMPT (lines 62-67), and DEFAULT_SETTINGS dict (lines 69-91). These are flat-file artifacts (D-12).


frontend/src/stores/documents.js (store, request-response)

Analog: frontend/src/stores/documents.js (self — replace upload) and frontend/src/stores/auth.js (multi-step async action with error handling).

Existing store skeleton pattern (documents.js lines 1-10):

import { defineStore } from 'pinia'
import { ref } from 'vue'
import * as api from '../api/client.js'

export const useDocumentsStore = defineStore('documents', () => {
  const documents = ref([])
  const total = ref(0)
  const loading = ref(false)
  const error = ref(null)

Add per-upload progress state:

  const uploadProgress = ref({})  // { [filename]: 0-100 }

Multi-step async action pattern — from stores/auth.js login function (lines 55-83), which handles branching outcomes:

  async function login(email, password, options = {}) {
    loading.value = true
    error.value = null
    try {
      const data = await api.login({...})
      if (data.requires_totp) { return { requires_totp: true } }
      // ...
      accessToken.value = data.access_token
      user.value = data.user
    } catch (e) {
      error.value = e.message
      throw e
    } finally {
      loading.value = false
    }
  }

Phase 3 upload action follows the same try/catch/finally pattern for the three-step flow. The XHR PUT step uses a helper (not api.* which uses fetch):

  async function upload(file, autoClassify = true) {
    const key = file.name
    uploadProgress.value[key] = 0
    try {
      // Step 1: get presigned URL
      const { upload_url, document_id } = await api.getUploadUrl(file.name, file.type)
      // Step 2: PUT directly to MinIO with XHR progress
      await uploadToMinIO(upload_url, file, (pct) => {
        uploadProgress.value[key] = pct
      })
      // Step 3: confirm
      const doc = await api.confirmUpload(document_id)
      documents.value.unshift(doc)
      total.value++
      return doc
    } catch (e) {
      error.value = e.message
      throw e
    } finally {
      delete uploadProgress.value[key]
    }
  }

XHR upload helper (RESEARCH.md Finding 9 — no codebase analog; define in api/client.js or at top of stores/documents.js):

function uploadToMinIO(url, file, onProgress) {
  return new Promise((resolve, reject) => {
    const xhr = new XMLHttpRequest()
    xhr.upload.addEventListener('progress', e => {
      if (e.lengthComputable) onProgress(Math.round(e.loaded / e.total * 100))
    })
    xhr.addEventListener('load', () =>
      xhr.status < 400 ? resolve() : reject(new Error(`PUT failed: ${xhr.status}`))
    )
    xhr.addEventListener('error', () => reject(new Error('Network error during upload')))
    xhr.open('PUT', url)
    xhr.setRequestHeader('Content-Type', file.type || 'application/octet-stream')
    xhr.send(file)
  })
}

Note: XHR does NOT send the Authorization: Bearer header — presigned URLs are self-authenticating. Do not add the auth header to the PUT request.


frontend/src/components/layout/AppSidebar.vue (component, request-response)

Analog: frontend/src/components/layout/AppSidebar.vue (self — add quota bar widget in the bottom section).

Existing bottom section pattern (lines 57-103):

<!-- Settings + Admin link -->
<div class="px-3 py-4 border-t border-gray-100">
  <!-- Admin link (admin users only) -->
  <router-link v-if="authStore.user?.role === 'admin'" to="/admin" class="nav-link">...</router-link>
  <router-link to="/settings" class="nav-link">...</router-link>

  <!-- User identity footer -->
  <div v-if="authStore.user" class="flex items-center gap-3 px-4 py-3 border-t border-gray-100 mt-2 -mx-3">
    ...sign out button...
  </div>
</div>

Insert <QuotaBar /> component between the nav links and the user identity footer. Import via <script setup>.

Existing <script setup> import pattern (lines 107-119):

import { useRouter } from 'vue-router'
import { useTopicsStore } from '../../stores/topics.js'
import { useAuthStore } from '../../stores/auth.js'

const topicsStore = useTopicsStore()
const authStore = useAuthStore()

Add:

import QuotaBar from '../layout/QuotaBar.vue'

QuotaBar fetches its own data on onMounted — no props needed from AppSidebar.


frontend/src/components/layout/QuotaBar.vue (component, NEW — request-response)

Analog: frontend/src/components/auth/PasswordStrengthBar.vue for the visual progress-bar pattern, and frontend/src/components/layout/AppSidebar.vue for the <script setup> + store consumption style.

<script setup> with reactive data fetched on mount — pattern from stores/auth.js and the sidebar's store usage:

<script setup>
import { ref, computed, onMounted } from 'vue'
import * as api from '../../api/client.js'

const usedBytes = ref(0)
const limitBytes = ref(0)

const pct = computed(() =>
  limitBytes.value > 0 ? Math.min(100, (usedBytes.value / limitBytes.value) * 100) : 0
)
const barColor = computed(() =>
  pct.value >= 95 ? 'bg-red-500' : pct.value >= 80 ? 'bg-amber-400' : 'bg-indigo-400'
)
const label = computed(() => {
  const used = (usedBytes.value / 1048576).toFixed(1)
  const limit = (limitBytes.value / 1048576).toFixed(0)
  return `${used} MB of ${limit} MB`
})

async function fetchQuota() {
  try {
    const data = await api.getMyQuota()
    usedBytes.value = data.used_bytes
    limitBytes.value = data.limit_bytes
  } catch {
    // Not yet authenticated or endpoint not yet available — silently ignore
  }
}

onMounted(fetchQuota)
</script>

Template pattern — Tailwind progress bar (mirrors PasswordStrengthBar.vue's bar-within-container structure):

<template>
  <div class="px-4 pb-3">
    <p class="text-xs text-gray-400 mb-1">Storage</p>
    <div class="w-full h-1.5 bg-gray-200 rounded-full overflow-hidden">
      <div
        class="h-full rounded-full transition-all"
        :class="barColor"
        :style="{ width: `${pct}%` }"
      ></div>
    </div>
    <p class="text-xs text-gray-400 mt-1">{{ label }}</p>
  </div>
</template>

frontend/src/components/upload/UploadProgress.vue (component, event-driven)

Analog: frontend/src/components/upload/UploadProgress.vue (self — add progress bar inside existing row).

Existing row structure (lines 6-28):

<div v-for="item in items" :key="item.name" class="flex items-center gap-3 ...">
  <div class="flex-1 min-w-0">
    <p class="text-sm font-medium text-gray-800 truncate">{{ item.name }}</p>
    <p v-if="item.error" class="text-xs text-red-500 mt-0.5">{{ item.error }}</p>
    <p v-else-if="item.done" class="text-xs text-green-600 mt-0.5">
      Done{{ ... }}
    </p>
    <p v-else class="text-xs text-gray-400 mt-0.5">Uploading…</p>
  </div>
  <div class="shrink-0">
    <!-- spinner / check / error icons -->
  </div>
</div>

Add progress bar after the status text lines, inside <div class="flex-1 min-w-0">:

<!-- Progress bar during upload (item.progress is 0-100, undefined when done/error) -->
<div
  v-if="!item.done && !item.error && item.progress !== undefined"
  class="w-full h-1 bg-gray-200 rounded-full mt-1 overflow-hidden"
>
  <div
    class="h-full bg-indigo-400 rounded-full transition-all"
    :style="{ width: `${item.progress}%` }"
  ></div>
</div>

The item shape gains a progress field (0-100, set from uploadProgress state in useDocumentsStore).

Existing defineProps pattern (lines 33-35):

defineProps({
  items: { type: Array, default: () => [] },
})

No change needed — progress is a field on each item object, not a separate prop.


Shared Patterns

Authentication / Auth Guard

Source: backend/deps/auth.py lines 38-75 (get_current_user) and lines 78-92 (get_current_admin) Apply to: All handlers in api/documents.py, api/topics.py, and the new GET /api/auth/me/quota endpoint

from deps.auth import get_current_user, get_current_admin
from db.models import User

# Regular user guard (all document endpoints):
current_user: User = Depends(get_current_user)

# Admin-only guard (admin topics, admin quota):
_admin: User = Depends(get_current_admin)

# Derived "no admin" guard — define once in api/documents.py, reuse across all handlers:
async def get_regular_user(user: User = Depends(get_current_user)) -> User:
    if user.role == "admin":
        raise HTTPException(403, "Admin accounts cannot access document content")
    return user

Ownership Assertion (SEC-04)

Source: RESEARCH.md Finding 7 / CONTEXT.md D-16 Apply to: Every handler in api/documents.py that accepts a doc_id path parameter

doc = await session.get(Document, uuid.UUID(doc_id))
if doc is None or doc.user_id != current_user.id:
    raise HTTPException(status_code=404, detail="Document not found")
# Returns 404 (not 403) for both "not found" and "wrong owner" — SEC-04 / D-16

Atomic Quota SQL

Source: RESEARCH.md Finding 4 / CONTEXT.md D-07 Apply to: api/documents.py confirm endpoint (increment) and services/storage.py delete_document (decrement)

from sqlalchemy import text

# Increment (confirm endpoint):
result = await session.execute(
    text("""
        UPDATE quotas
        SET used_bytes = used_bytes + :delta
        WHERE user_id = :uid
          AND (used_bytes + :delta) <= limit_bytes
        RETURNING used_bytes, limit_bytes
    """),
    {"delta": file_size, "uid": str(user_id)},
)
# Decrement (delete):
await session.execute(
    text("UPDATE quotas SET used_bytes = GREATEST(0, used_bytes - :delta) WHERE user_id = :uid"),
    {"delta": size_bytes, "uid": str(user_id)},
)

asyncio.to_thread for MinIO SDK

Source: backend/storage/minio_backend.py lines 63-70, 86-88, 90-99 Apply to: All new methods in MinIOBackend (generate_presigned_put_url, stat_object)

# Simple single-call pattern:
await asyncio.to_thread(self._client.remove_object, self._bucket, object_key)

# With return value:
return await asyncio.to_thread(
    self._client.presigned_get_object,
    self._bucket,
    object_key,
    timedelta(minutes=expires_minutes),
)

Celery Task Structure (sync entry-point + async body)

Source: backend/tasks/document_tasks.py lines 22-25 and backend/tasks/email_tasks.py lines 19-36 Apply to: New cleanup_abandoned_uploads task in document_tasks.py

@celery_app.task(name="tasks.document_tasks.cleanup_abandoned_uploads")
def cleanup_abandoned_uploads() -> dict:
    return asyncio.run(_cleanup_abandoned())

async def _cleanup_abandoned() -> dict:
    # All imports deferred here (celery_app.py circular import rule)
    from db.session import AsyncSessionLocal
    ...
    async with AsyncSessionLocal() as session:
        ...

Frontend API Client request() Helper

Source: frontend/src/api/client.js lines 11-41 Apply to: All new API functions (getUploadUrl, confirmUpload, getMyQuota) added to api/client.js

// GET:
export function getMyQuota() {
  return request('/api/auth/me/quota')
}

// POST with JSON body:
export function getUploadUrl(filename, contentType) {
  return request('/api/documents/upload-url', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ filename, content_type: contentType }),
  })
}

export function confirmUpload(documentId) {
  return request(`/api/documents/${documentId}/confirm`, { method: 'POST' })
}

Pinia Store Action Pattern

Source: frontend/src/stores/documents.js lines 25-29 and frontend/src/stores/auth.js lines 55-83 Apply to: Modified upload action in stores/documents.js

// Pattern: loading flag + try/catch/finally + error ref
async function someAction(...args) {
  loading.value = true
  error.value = null
  try {
    const data = await api.someApiCall(args)
    // update state
    return data
  } catch (e) {
    error.value = e.message
    throw e
  } finally {
    loading.value = false
  }
}

No Analog Found

All files have analogs. The following files have partial analogs (research patterns used instead of codebase patterns for specific sub-sections):

File Sub-section Reason Source
backend/migrations/versions/0003_multi_user_isolation.py Inline MinIO SDK calls No existing migration has side-effect MinIO calls RESEARCH.md Finding 1
backend/api/documents.py Atomic quota SQL No existing handler uses raw text() UPDATE RESEARCH.md Finding 4
frontend/src/stores/documents.js XHR upload helper No existing XHR code in codebase (fetch-only) RESEARCH.md Finding 9
frontend/src/components/layout/QuotaBar.vue Full component (NEW) No progress-bar component with onMounted fetch exists PasswordStrengthBar.vue (visual style only)

Metadata

Analog search scope: backend/ (all .py), frontend/src/ (all .js, .vue) Files scanned: 14 source files read in full Pattern extraction date: 2026-05-23