# Phase 3: Document Migration & Multi-User Isolation - Pattern Map **Mapped:** 2026-05-23 **Files analyzed:** 14 new/modified files **Analogs found:** 14 / 14 --- ## File Classification | New/Modified File | Role | Data Flow | Closest Analog | Match Quality | |---|---|---|---|---| | `backend/migrations/versions/0003_multi_user_isolation.py` | migration | batch | `backend/migrations/versions/0002_add_backup_codes_and_password_must_change.py` | exact | | `backend/storage/base.py` | ABC | request-response | `backend/storage/base.py` (self — add abstract method) | exact | | `backend/storage/minio_backend.py` | service | request-response | `backend/storage/minio_backend.py` (self — add methods mirroring `presigned_get_url`) | exact | | `backend/api/documents.py` | controller | request-response | `backend/api/auth.py` (auth-gated CRUD + ownership) | role-match | | `backend/api/topics.py` | controller | CRUD | `backend/api/admin.py` (auth-gated CRUD with role guard) | role-match | | `backend/api/auth.py` | controller | request-response | `backend/api/auth.py` (self — add `GET /me/quota` endpoint) | exact | | `backend/services/classifier.py` | service | transform | `backend/services/classifier.py` (self — change signature) | exact | | `backend/services/storage.py` | service | CRUD | `backend/services/storage.py` (self — remove settings, add quota SQL) | exact | | `backend/tasks/document_tasks.py` | task | batch | `backend/tasks/email_tasks.py` (DB lookup within async task body) | role-match | | `backend/celery_app.py` | config | event-driven | `backend/celery_app.py` (self — add beat_schedule) | exact | | `backend/config.py` | config | — | `backend/config.py` (self — add optional env vars) | exact | | `frontend/src/stores/documents.js` | store | request-response | `frontend/src/stores/auth.js` (multi-step async action) | role-match | | `frontend/src/components/layout/AppSidebar.vue` | component | request-response | `frontend/src/components/layout/AppSidebar.vue` (self — add footer widget) | exact | | `frontend/src/components/layout/QuotaBar.vue` | component | request-response | `frontend/src/components/auth/PasswordStrengthBar.vue` | role-match | | `frontend/src/components/upload/UploadProgress.vue` | component | event-driven | `frontend/src/components/upload/UploadProgress.vue` (self — add progress bar) | exact | --- ## Pattern Assignments ### `backend/migrations/versions/0003_multi_user_isolation.py` (migration, batch) **Analog:** `backend/migrations/versions/0002_add_backup_codes_and_password_must_change.py` and `backend/migrations/versions/0001_initial_schema.py` **Header / revision identifiers pattern** (0002, lines 1-18): ```python """Add backup_codes table and password_must_change column to users. Revision ID: 0002 Revises: 0001 Create Date: 2026-05-22 ... """ from __future__ import annotations import sqlalchemy as sa from sqlalchemy.dialects import postgresql from alembic import op revision = "0002" down_revision = "0001" branch_labels = None depends_on = None ``` Phase 3 migration: `revision = "0003"`, `down_revision = "0002"`. Add a plain-language docstring listing all steps (cleanup, NOT NULL, quota reconcile, index). **Data-manipulation ops pattern** (0002, lines 26-53): ```python def upgrade() -> None: # ── 1. Add password_must_change to users ────────────────────────────────── op.add_column( "users", sa.Column( "password_must_change", sa.Boolean(), nullable=False, server_default="false", ), ) # ── 2. Create backup_codes table ────────────────────────────────────────── op.create_table(...) op.create_index("ix_backup_codes_user_id", "backup_codes", ["user_id"]) # ── Privilege grants (Pitfall 4) ─────────────────────────────────────────── op.execute( "GRANT SELECT, INSERT, UPDATE, DELETE ON backup_codes TO docuvault_app;" ) ``` Phase 3 uses `op.execute(text(...))` for DELETE/UPDATE statements (no new tables, so no GRANT needed per Finding 1). Numbered section comments (`# ── 1.`, `# ── 2.`) are the established style. **Schema change pattern** (0002, lines 56-63): ```python def downgrade() -> None: op.drop_index("ix_backup_codes_user_id", table_name="backup_codes") op.drop_table("backup_codes") op.drop_column("users", "password_must_change") ``` Phase 3 `downgrade()` reverses the `NOT NULL` constraint via `op.alter_column('documents', 'user_id', nullable=True)` and drops `ix_topics_user_id`. The MinIO and row-delete steps cannot be reversed — document this explicitly in the downgrade docstring. **Inline synchronous MinIO pattern for migration** (RESEARCH.md Finding 1 — no codebase analog, copy from research): ```python # In upgrade() — call synchronous MinIO SDK directly, NOT the async MinIOBackend wrapper import os from minio import Minio _client = Minio( endpoint=os.environ["MINIO_ENDPOINT"], access_key=os.environ["MINIO_ACCESS_KEY"], secret_key=os.environ["MINIO_SECRET_KEY"], secure=False, ) for key in _keys_to_delete: try: _client.remove_object(os.environ.get("MINIO_BUCKET", "docuvault"), key) except Exception: pass # object already gone — safe to skip ``` **Quota reconciliation SQL** (RESEARCH.md D-03 / Finding 4): ```python op.execute( text( "UPDATE quotas SET used_bytes = (" " SELECT COALESCE(SUM(size_bytes), 0) FROM documents" " WHERE documents.user_id = quotas.user_id" ")" ) ) ``` --- ### `backend/storage/base.py` (ABC, request-response) **Analog:** `backend/storage/base.py` lines 47-49 — existing `presigned_get_url` abstract method; new `presigned_put_url` follows identical signature style. **Existing abstract method to copy style from** (lines 21-34 and 47-49): ```python @abstractmethod async def put_object( self, user_id: str, document_id: str, file_bytes: bytes, extension: str, content_type: str, ) -> str: """Store bytes and return the generated object key. The key MUST follow the STORE-02 schema: {user_id}/{document_id}/{uuid4()}{ext}. The human-readable filename MUST NOT appear in the returned key. """ ... @abstractmethod async def presigned_get_url(self, object_key: str, expires_minutes: int = 60) -> str: """Return a time-limited pre-signed download URL for the object.""" ... ``` Add two new abstract methods after `presigned_get_url` using the same style: ```python @abstractmethod async def generate_presigned_put_url( self, object_key: str, expires_minutes: int = 15, ) -> str: """Return a time-limited pre-signed PUT URL for direct browser upload. The object_key MUST be pre-computed before calling this method and stored in the Document row so stat_object() at confirm time uses the same key. Uses the public-facing client (not the Docker-internal client) so the URL is browser-resolvable (RESEARCH.md Finding 3). """ ... @abstractmethod async def stat_object(self, object_key: str) -> int: """Return the size in bytes of an existing object. Raises minio.error.S3Error with code='NoSuchKey' if the object does not exist. Call at confirm time to get the authoritative file size (D-07). """ ... ``` --- ### `backend/storage/minio_backend.py` (service, request-response) **Analog:** `backend/storage/minio_backend.py` — existing `presigned_get_url` and `delete_object` methods demonstrate both `asyncio.to_thread` patterns. **Imports and constructor pattern** (lines 1-46): ```python import asyncio import io import uuid from datetime import timedelta from minio import Minio from storage.base import StorageBackend class MinIOBackend(StorageBackend): def __init__( self, endpoint: str, access_key: str, secret_key: str, bucket: str, secure: bool = False, ) -> None: self._bucket = bucket self._client = Minio( endpoint=endpoint, access_key=access_key, secret_key=secret_key, secure=secure, ) ``` Phase 3 adds a `public_endpoint` parameter and creates `self._public_client` as a second `Minio` instance. Add after `self._client` construction: ```python # Second client for presigned URL generation — uses the browser-accessible # hostname (RESEARCH.md Finding 3 / MINIO_PUBLIC_ENDPOINT env var). # Falls back to the internal client if public_endpoint is not configured. self._public_client = Minio( endpoint=public_endpoint or endpoint, access_key=access_key, secret_key=secret_key, secure=secure, ) ``` **Simple `asyncio.to_thread` pattern** (lines 86-88): ```python async def delete_object(self, object_key: str) -> None: """Delete an object from MinIO by key.""" await asyncio.to_thread(self._client.remove_object, self._bucket, object_key) ``` New `stat_object` follows the same single-liner pattern: ```python async def stat_object(self, object_key: str) -> int: """Return the authoritative file size in bytes via stat_object (D-07). Raises minio.error.S3Error(code='NoSuchKey') if object does not exist. """ result = await asyncio.to_thread(self._client.stat_object, self._bucket, object_key) return result.size ``` **Existing `presigned_get_url` pattern** (lines 90-99) — copy for `generate_presigned_put_url`, using `self._public_client` instead of `self._client`: ```python async def presigned_get_url( self, object_key: str, expires_minutes: int = 60 ) -> str: """Return a time-limited pre-signed download URL.""" return await asyncio.to_thread( self._client.presigned_get_object, self._bucket, object_key, timedelta(minutes=expires_minutes), ) ``` New method: ```python async def generate_presigned_put_url( self, object_key: str, expires_minutes: int = 15, ) -> str: """Return a presigned PUT URL using the public-facing client. Uses self._public_client so the URL contains the browser-resolvable hostname, not the Docker-internal 'minio:9000' hostname (RESEARCH.md Finding 3). """ return await asyncio.to_thread( self._public_client.presigned_put_object, self._bucket, object_key, timedelta(minutes=expires_minutes), ) ``` --- ### `backend/api/documents.py` (controller, request-response) **Analog:** `backend/api/auth.py` (auth-gated handlers) and `backend/api/admin.py` (ownership + role guards). Current `backend/api/documents.py` is replaced. **Imports pattern** — copy from `api/auth.py` lines 22-39, adapted for documents: ```python from __future__ import annotations import uuid from fastapi import APIRouter, Depends, HTTPException, Query, status from pydantic import BaseModel from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession from config import settings from db.models import Document, Quota, User from deps.auth import get_current_user from deps.db import get_db from services import storage from tasks.document_tasks import extract_and_classify from storage import get_storage_backend ``` **Auth guard + admin 403 pattern** — derive from `deps/auth.py` `get_current_admin` (lines 78-92); add a `get_regular_user` dependency: ```python async def get_regular_user(user: User = Depends(get_current_user)) -> User: """Reject admin accounts on all /api/documents/* endpoints (D-16, SC4). Admin accounts cannot access document content (CLAUDE.md + SEC-04). Returns 403 (not 404) — the admin knows document endpoints exist. """ if user.role == "admin": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Admin accounts cannot access document content", ) return user ``` **Ownership assertion pattern** — derived from RESEARCH.md Finding 7; matches 404-for-cross-user rule (D-16): ```python doc = await session.get(Document, uuid.UUID(doc_id)) if doc is None or doc.user_id != current_user.id: raise HTTPException(status_code=404, detail="Document not found") ``` This identical two-condition check appears in every handler that takes a `doc_id` path parameter. **Atomic quota enforcement pattern** — from RESEARCH.md Finding 4; use `sqlalchemy.text()` with bound params, `fetchone()` to detect quota exceeded: ```python from sqlalchemy import text result = await session.execute( text(""" UPDATE quotas SET used_bytes = used_bytes + :delta WHERE user_id = :uid AND (used_bytes + :delta) <= limit_bytes RETURNING used_bytes, limit_bytes """), {"delta": file_size, "uid": str(current_user.id)}, ) row = result.fetchone() if row is None: quota_row = await session.execute( text("SELECT used_bytes, limit_bytes FROM quotas WHERE user_id = :uid"), {"uid": str(current_user.id)}, ) q = quota_row.fetchone() raise HTTPException( status_code=413, detail={ "used_bytes": q.used_bytes if q else 0, "limit_bytes": q.limit_bytes if q else 0, "rejected_bytes": file_size, }, ) ``` **Atomic quota decrement pattern** — from RESEARCH.md Finding 4 (delete handler): ```python await session.execute( text(""" UPDATE quotas SET used_bytes = GREATEST(0, used_bytes - :delta) WHERE user_id = :uid """), {"delta": doc.size_bytes, "uid": str(doc.user_id)}, ) ``` **Existing simple handler pattern to preserve** (current `api/documents.py` lines 70-80): ```python @router.get("") async def list_documents( topic: Optional[str] = Query(None), page: int = Query(1, ge=1), per_page: int = Query(20, ge=1, le=100), session: AsyncSession = Depends(get_db), ): docs = await storage.list_metadata(session, topic=topic) ... ``` Phase 3 adds `current_user: User = Depends(get_regular_user)` to this and every other handler. --- ### `backend/api/topics.py` (controller, CRUD) **Analog:** `backend/api/topics.py` (self, modify) with `backend/api/admin.py` auth guard style. **Current handler without auth** (lines 29-35): ```python @router.get("") async def list_topics(session: AsyncSession = Depends(get_db)): topics = await storage.load_topics(session) ... ``` Phase 3 adds `current_user: User = Depends(get_current_user)` and passes `current_user.id` to the service layer for namespace filtering. **Topic namespace query pattern** — from RESEARCH.md Finding 6; use `or_` / `is_`: ```python from sqlalchemy import or_ from db.models import Topic stmt = select(Topic).where( or_(Topic.user_id == current_user.id, Topic.user_id.is_(None)) ).order_by(Topic.name) ``` **Admin-only endpoint pattern** — copy from `api/admin.py` lines 133-137: ```python @router.get("/users") async def list_users( session: AsyncSession = Depends(get_db), _admin: User = Depends(get_current_admin), ) -> dict: ``` For `POST /api/admin/topics`, use `_admin: User = Depends(get_current_admin)` as the guard. The new admin topics router prefix is `/api/admin` and lives in `api/admin.py` (append there, do not create a new file unless the planner decides otherwise). --- ### `backend/api/auth.py` (controller, request-response — add `GET /me/quota`) **Analog:** `backend/api/auth.py` lines 383-387 — the existing `GET /api/auth/me` endpoint is the direct model. **Existing `get_me` handler pattern** (lines 383-387): ```python @router.get("/me") async def get_me(current_user: User = Depends(get_current_user)): """Return the current user's profile (requires valid Bearer token).""" return _user_dict(current_user) ``` New `GET /api/auth/me/quota` follows the same pattern but adds `session`: ```python @router.get("/me/quota") async def get_my_quota( current_user: User = Depends(get_current_user), session: AsyncSession = Depends(get_db), ): """Return the current user's quota usage (STORE-04).""" q = await session.get(Quota, current_user.id) if q is None: raise HTTPException(status_code=404, detail="Quota not found") return {"used_bytes": q.used_bytes, "limit_bytes": q.limit_bytes} ``` Add `from db.models import Quota` to the imports at the top of `api/auth.py` (the `User` import is already there on line 33). --- ### `backend/services/classifier.py` (service, transform) **Analog:** `backend/services/classifier.py` (self — signature change) and `backend/ai/__init__.py` (factory). **Current `classify_document` signature** (lines 18-23): ```python async def classify_document( session: AsyncSession, doc_id: str, topic_names: list[str] | None = None, ) -> list[str]: ``` Phase 3 signature adds two optional keyword parameters: ```python async def classify_document( session: AsyncSession, doc_id: str, topic_names: list[str] | None = None, ai_provider: str | None = None, # NEW — from user.ai_provider (D-14) ai_model: str | None = None, # NEW — from user.ai_model (D-15) ) -> list[str]: ``` **Current provider construction (to replace)** (lines 33-35): ```python settings = storage.load_settings() system_prompt = settings.get("system_prompt", "") provider = get_provider(settings) ``` Replace with minimal-change dict construction (RESEARCH.md Finding 12): ```python from config import settings as app_settings _ai_provider = ai_provider or app_settings.default_ai_provider _ai_model = ai_model or app_settings.default_ai_model system_prompt = app_settings.system_prompt or _DEFAULT_SYSTEM_PROMPT _settings = { "active_provider": _ai_provider, "providers": { _ai_provider: {"model": _ai_model}, }, } provider = get_provider(_settings) ``` `_DEFAULT_SYSTEM_PROMPT` is a module-level constant with the hardcoded fallback (D-13). **Topic namespace filter for classifier** — use `or_` query from RESEARCH.md Finding 6. The `load_topics(session)` call in line 39 must be replaced with a user-scoped query. Pass `user_id` into the topic lookup: ```python # Replaces: all_topics = await storage.load_topics(session) all_topics = await storage.load_topics_for_user(session, user_id=doc.user_id) ``` `load_topics_for_user` is a new function to add to `services/storage.py`. **New topic creation scoped to user namespace** (D-11) — replace line 52: ```python # Before: await storage.create_topic(session, name.strip()) # After (user_id from doc.user_id ensures per-user namespace): await storage.create_topic(session, name.strip(), user_id=doc.user_id) ``` --- ### `backend/services/storage.py` (service, CRUD) **Analog:** `backend/services/storage.py` (self — remove functions, add new ones). **Functions to remove entirely** (lines 416-450): - `load_settings()` — lines 416-426 - `save_settings()` — lines 428-434 - `mask_api_key()` — lines 437-440 - `settings_masked()` — lines 443-449 **Imports to remove** from lines 36-37: ```python from config import DEFAULT_SETTINGS, SETTINGS_FILE ``` Replace with: ```python from config import settings as app_settings ``` **`save_upload` removal** (lines 86-136) — the function that accepted file bytes and uploaded to MinIO is deleted in Phase 3. The upload-url handler in `api/documents.py` creates the `Document` row and presigned URL directly. The function signature to retain is `save_metadata`, `get_metadata`, `list_metadata`. **`list_metadata` must add user filter** (current lines 178-196): ```python async def list_metadata( session: AsyncSession, topic: Optional[str] = None ) -> list: stmt = select(Document).order_by(Document.created_at.desc()) ``` Phase 3: add `user_id` parameter: ```python async def list_metadata( session: AsyncSession, user_id: uuid.UUID, topic: Optional[str] = None, ) -> list: stmt = select(Document).where(Document.user_id == user_id).order_by(Document.created_at.desc()) ``` **`delete_document` quota decrement** (currently lines 199-222 — no quota logic): After the MinIO delete attempt, add the quota decrement before `await session.delete(doc)`: ```python # Atomic quota decrement (STORE-06, D-07) await session.execute( text("UPDATE quotas SET used_bytes = GREATEST(0, used_bytes - :delta) WHERE user_id = :uid"), {"delta": doc.size_bytes, "uid": str(doc.user_id)}, ) ``` **`create_topic` user_id parameter** (current signature, lines 328-355): ```python async def create_topic( session: AsyncSession, name: str, description: str = "", color: str = "#6366f1", ) -> dict: q = await session.execute( select(Topic).where(sql_func.lower(Topic.name) == name.lower()) ) ``` Phase 3 adds `user_id: uuid.UUID | None = None` and scopes the deduplication query (RESEARCH.md Finding 6 Risk 5): ```python async def create_topic( session: AsyncSession, name: str, description: str = "", color: str = "#6366f1", user_id: uuid.UUID | None = None, # NEW — None = system topic ) -> dict: q = await session.execute( select(Topic).where( sql_func.lower(Topic.name) == name.lower(), Topic.user_id == user_id, # scope deduplication by namespace ) ) ``` **New `load_topics_for_user` function** (add after `load_topics`): ```python async def load_topics_for_user( session: AsyncSession, user_id: uuid.UUID, ) -> list: """Return system topics (user_id IS NULL) + user's own topics, ordered by name.""" from sqlalchemy import or_ q = await session.execute( select(Topic).where( or_(Topic.user_id == user_id, Topic.user_id.is_(None)) ).order_by(Topic.name) ) return [ {"id": str(t.id), "name": t.name, "description": t.description, "color": t.color} for t in q.scalars() ] ``` --- ### `backend/tasks/document_tasks.py` (task, batch) **Analog:** `backend/tasks/email_tasks.py` `_run_send_security_alert` (lines 50-73) — the pattern for doing a DB lookup inside an async task body; and existing `_run` function for the overall structure. **Existing task entry-point pattern** (document_tasks.py lines 22-25): ```python @celery_app.task(name="tasks.document_tasks.extract_and_classify") def extract_and_classify(document_id: str) -> dict: """Synchronous Celery entry-point — delegates to async _run via asyncio.run.""" return asyncio.run(_run(document_id)) ``` New `cleanup_abandoned_uploads` task follows the same pattern: ```python @celery_app.task(name="tasks.document_tasks.cleanup_abandoned_uploads") def cleanup_abandoned_uploads() -> dict: """Synchronous entry-point — delegates to async _cleanup via asyncio.run.""" return asyncio.run(_cleanup_abandoned()) ``` **Existing `_run` DB lookup and deferred import style** (lines 28-48): ```python async def _run(document_id: str) -> dict: import uuid as _uuid from db.session import AsyncSessionLocal from db.models import Document from services import extractor, classifier from storage import get_storage_backend async with AsyncSessionLocal() as session: try: doc_uuid = _uuid.UUID(document_id) except ValueError: return {"document_id": document_id, "status": "invalid_id"} doc = await session.get(Document, doc_uuid) if doc is None: return {"document_id": document_id, "status": "not_found"} ``` Phase 3 `_run` adds a second DB lookup for AI config after fetching the document — copy the `session.get` pattern for the `User` model: ```python # Resolve per-user AI config (D-14, DOC-03, DOC-05) from db.models import User user = await session.get(User, doc.user_id) ai_provider = (user.ai_provider if user else None) or settings.default_ai_provider ai_model = (user.ai_model if user else None) or settings.default_ai_model ``` Then pass to classifier: ```python topics = await classifier.classify_document( session, document_id, ai_provider=ai_provider, ai_model=ai_model, ) ``` **`_cleanup_abandoned` structure** (modeled on `email_tasks.py` `_run_send_security_alert`, lines 51-73): ```python async def _cleanup_abandoned() -> dict: from datetime import datetime, timezone, timedelta from sqlalchemy import select from db.session import AsyncSessionLocal from db.models import Document from storage import get_storage_backend cutoff = datetime.now(timezone.utc) - timedelta(hours=1) async with AsyncSessionLocal() as session: result = await session.execute( select(Document).where( Document.status == "pending", Document.created_at < cutoff, ) ) docs = result.scalars().all() backend = get_storage_backend() cleaned = 0 for doc in docs: try: if doc.object_key: await backend.delete_object(doc.object_key) except Exception: pass # MinIO object may not exist yet — safe to ignore await session.delete(doc) cleaned += 1 await session.commit() return {"cleaned": cleaned} ``` --- ### `backend/celery_app.py` (config, event-driven) **Analog:** `backend/celery_app.py` (self — add beat_schedule). Existing `task_routes` block is the model. **Existing task_routes pattern** (lines 31-34): ```python celery_app.conf.task_routes = { "tasks.document_tasks.*": {"queue": "documents"}, "tasks.email_tasks.*": {"queue": "email"}, } ``` Add beat schedule after `task_routes` using the same conf assignment style: ```python from datetime import timedelta as _timedelta celery_app.conf.beat_schedule = { "cleanup-abandoned-uploads": { "task": "tasks.document_tasks.cleanup_abandoned_uploads", "schedule": _timedelta(minutes=30), }, } celery_app.conf.timezone = "UTC" ``` Import `timedelta` with an underscore alias (`_timedelta`) to follow the module's "keep imports minimal, avoid pulling in config" convention (see module docstring line 6). --- ### `backend/config.py` (config) **Analog:** `backend/config.py` (self — add three optional env vars). Existing optional-with-default pattern: **Existing optional string field pattern** (lines 40-44): ```python # SMTP (Phase 2 — D-01) smtp_host: str = "" smtp_port: int = 587 smtp_user: str = "" smtp_password: str = "" ``` Phase 3 additions follow the same pattern — typed field with a documented default: ```python # MinIO public endpoint (Phase 3 — presigned URL Docker hostname fix, RESEARCH.md Finding 3) minio_public_endpoint: str = "" # falls back to minio_endpoint if empty # AI classification defaults (Phase 3 — D-13, D-15) system_prompt: str = "" # SYSTEM_PROMPT env var; hardcoded fallback in classifier.py default_ai_provider: str = "ollama" # DEFAULT_AI_PROVIDER env var default_ai_model: str = "llama3.2" # DEFAULT_AI_MODEL env var ``` Remove from `config.py`: `SETTINGS_FILE = Path(...)` (line 60), `DEFAULT_SYSTEM_PROMPT` (lines 62-67), and `DEFAULT_SETTINGS` dict (lines 69-91). These are flat-file artifacts (D-12). --- ### `frontend/src/stores/documents.js` (store, request-response) **Analog:** `frontend/src/stores/documents.js` (self — replace `upload`) and `frontend/src/stores/auth.js` (multi-step async action with error handling). **Existing store skeleton pattern** (documents.js lines 1-10): ```javascript import { defineStore } from 'pinia' import { ref } from 'vue' import * as api from '../api/client.js' export const useDocumentsStore = defineStore('documents', () => { const documents = ref([]) const total = ref(0) const loading = ref(false) const error = ref(null) ``` Add per-upload progress state: ```javascript const uploadProgress = ref({}) // { [filename]: 0-100 } ``` **Multi-step async action pattern** — from `stores/auth.js` `login` function (lines 55-83), which handles branching outcomes: ```javascript async function login(email, password, options = {}) { loading.value = true error.value = null try { const data = await api.login({...}) if (data.requires_totp) { return { requires_totp: true } } // ... accessToken.value = data.access_token user.value = data.user } catch (e) { error.value = e.message throw e } finally { loading.value = false } } ``` Phase 3 `upload` action follows the same try/catch/finally pattern for the three-step flow. The XHR PUT step uses a helper (not `api.*` which uses fetch): ```javascript async function upload(file, autoClassify = true) { const key = file.name uploadProgress.value[key] = 0 try { // Step 1: get presigned URL const { upload_url, document_id } = await api.getUploadUrl(file.name, file.type) // Step 2: PUT directly to MinIO with XHR progress await uploadToMinIO(upload_url, file, (pct) => { uploadProgress.value[key] = pct }) // Step 3: confirm const doc = await api.confirmUpload(document_id) documents.value.unshift(doc) total.value++ return doc } catch (e) { error.value = e.message throw e } finally { delete uploadProgress.value[key] } } ``` **XHR upload helper** (RESEARCH.md Finding 9 — no codebase analog; define in `api/client.js` or at top of `stores/documents.js`): ```javascript function uploadToMinIO(url, file, onProgress) { return new Promise((resolve, reject) => { const xhr = new XMLHttpRequest() xhr.upload.addEventListener('progress', e => { if (e.lengthComputable) onProgress(Math.round(e.loaded / e.total * 100)) }) xhr.addEventListener('load', () => xhr.status < 400 ? resolve() : reject(new Error(`PUT failed: ${xhr.status}`)) ) xhr.addEventListener('error', () => reject(new Error('Network error during upload'))) xhr.open('PUT', url) xhr.setRequestHeader('Content-Type', file.type || 'application/octet-stream') xhr.send(file) }) } ``` Note: XHR does NOT send the `Authorization: Bearer` header — presigned URLs are self-authenticating. Do not add the auth header to the PUT request. --- ### `frontend/src/components/layout/AppSidebar.vue` (component, request-response) **Analog:** `frontend/src/components/layout/AppSidebar.vue` (self — add quota bar widget in the bottom section). **Existing bottom section pattern** (lines 57-103): ```html
... ...
...sign out button...
``` Insert `` component between the nav links and the user identity footer. Import via ` ``` **Template pattern — Tailwind progress bar** (mirrors `PasswordStrengthBar.vue`'s bar-within-container structure): ```html ``` --- ### `frontend/src/components/upload/UploadProgress.vue` (component, event-driven) **Analog:** `frontend/src/components/upload/UploadProgress.vue` (self — add progress bar inside existing row). **Existing row structure** (lines 6-28): ```html

{{ item.name }}

{{ item.error }}

Done{{ ... }}

Uploading…

``` Add progress bar after the status text lines, inside `
`: ```html
``` The `item` shape gains a `progress` field (0-100, set from `uploadProgress` state in `useDocumentsStore`). **Existing `defineProps` pattern** (lines 33-35): ```javascript defineProps({ items: { type: Array, default: () => [] }, }) ``` No change needed — `progress` is a field on each item object, not a separate prop. --- ## Shared Patterns ### Authentication / Auth Guard **Source:** `backend/deps/auth.py` lines 38-75 (`get_current_user`) and lines 78-92 (`get_current_admin`) **Apply to:** All handlers in `api/documents.py`, `api/topics.py`, and the new `GET /api/auth/me/quota` endpoint ```python from deps.auth import get_current_user, get_current_admin from db.models import User # Regular user guard (all document endpoints): current_user: User = Depends(get_current_user) # Admin-only guard (admin topics, admin quota): _admin: User = Depends(get_current_admin) # Derived "no admin" guard — define once in api/documents.py, reuse across all handlers: async def get_regular_user(user: User = Depends(get_current_user)) -> User: if user.role == "admin": raise HTTPException(403, "Admin accounts cannot access document content") return user ``` ### Ownership Assertion (SEC-04) **Source:** RESEARCH.md Finding 7 / CONTEXT.md D-16 **Apply to:** Every handler in `api/documents.py` that accepts a `doc_id` path parameter ```python doc = await session.get(Document, uuid.UUID(doc_id)) if doc is None or doc.user_id != current_user.id: raise HTTPException(status_code=404, detail="Document not found") # Returns 404 (not 403) for both "not found" and "wrong owner" — SEC-04 / D-16 ``` ### Atomic Quota SQL **Source:** RESEARCH.md Finding 4 / CONTEXT.md D-07 **Apply to:** `api/documents.py` confirm endpoint (increment) and `services/storage.py` `delete_document` (decrement) ```python from sqlalchemy import text # Increment (confirm endpoint): result = await session.execute( text(""" UPDATE quotas SET used_bytes = used_bytes + :delta WHERE user_id = :uid AND (used_bytes + :delta) <= limit_bytes RETURNING used_bytes, limit_bytes """), {"delta": file_size, "uid": str(user_id)}, ) # Decrement (delete): await session.execute( text("UPDATE quotas SET used_bytes = GREATEST(0, used_bytes - :delta) WHERE user_id = :uid"), {"delta": size_bytes, "uid": str(user_id)}, ) ``` ### asyncio.to_thread for MinIO SDK **Source:** `backend/storage/minio_backend.py` lines 63-70, 86-88, 90-99 **Apply to:** All new methods in `MinIOBackend` (`generate_presigned_put_url`, `stat_object`) ```python # Simple single-call pattern: await asyncio.to_thread(self._client.remove_object, self._bucket, object_key) # With return value: return await asyncio.to_thread( self._client.presigned_get_object, self._bucket, object_key, timedelta(minutes=expires_minutes), ) ``` ### Celery Task Structure (sync entry-point + async body) **Source:** `backend/tasks/document_tasks.py` lines 22-25 and `backend/tasks/email_tasks.py` lines 19-36 **Apply to:** New `cleanup_abandoned_uploads` task in `document_tasks.py` ```python @celery_app.task(name="tasks.document_tasks.cleanup_abandoned_uploads") def cleanup_abandoned_uploads() -> dict: return asyncio.run(_cleanup_abandoned()) async def _cleanup_abandoned() -> dict: # All imports deferred here (celery_app.py circular import rule) from db.session import AsyncSessionLocal ... async with AsyncSessionLocal() as session: ... ``` ### Frontend API Client `request()` Helper **Source:** `frontend/src/api/client.js` lines 11-41 **Apply to:** All new API functions (`getUploadUrl`, `confirmUpload`, `getMyQuota`) added to `api/client.js` ```javascript // GET: export function getMyQuota() { return request('/api/auth/me/quota') } // POST with JSON body: export function getUploadUrl(filename, contentType) { return request('/api/documents/upload-url', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ filename, content_type: contentType }), }) } export function confirmUpload(documentId) { return request(`/api/documents/${documentId}/confirm`, { method: 'POST' }) } ``` ### Pinia Store Action Pattern **Source:** `frontend/src/stores/documents.js` lines 25-29 and `frontend/src/stores/auth.js` lines 55-83 **Apply to:** Modified `upload` action in `stores/documents.js` ```javascript // Pattern: loading flag + try/catch/finally + error ref async function someAction(...args) { loading.value = true error.value = null try { const data = await api.someApiCall(args) // update state return data } catch (e) { error.value = e.message throw e } finally { loading.value = false } } ``` --- ## No Analog Found All files have analogs. The following files have partial analogs (research patterns used instead of codebase patterns for specific sub-sections): | File | Sub-section | Reason | Source | |---|---|---|---| | `backend/migrations/versions/0003_multi_user_isolation.py` | Inline MinIO SDK calls | No existing migration has side-effect MinIO calls | RESEARCH.md Finding 1 | | `backend/api/documents.py` | Atomic quota SQL | No existing handler uses raw `text()` UPDATE | RESEARCH.md Finding 4 | | `frontend/src/stores/documents.js` | XHR upload helper | No existing XHR code in codebase (fetch-only) | RESEARCH.md Finding 9 | | `frontend/src/components/layout/QuotaBar.vue` | Full component (NEW) | No progress-bar component with `onMounted` fetch exists | PasswordStrengthBar.vue (visual style only) | --- ## Metadata **Analog search scope:** `backend/` (all .py), `frontend/src/` (all .js, .vue) **Files scanned:** 14 source files read in full **Pattern extraction date:** 2026-05-23