Files
kite/.planning/phases/03-document-migration-multi-user-isolation/03-PATTERNS.md
T
curo1305 fdc32d431d docs(03): create Phase 3 execution plan — document migration & multi-user isolation
5 plans across 5 sequential waves covering: Alembic migration 0003 (null-user
cleanup, NOT NULL constraint, quota reconciliation), presigned MinIO PUT upload
flow with atomic quota enforcement, auth guards on all document/topic endpoints,
flat-file settings retirement + per-user AI classification, and frontend quota bar
with 3-step XHR upload progress.

Verification passed across all 12 dimensions. All 8 phase requirements covered
(STORE-03/04/05/06, SEC-04, DOC-03/04/05).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-23 13:36:28 +02:00

1143 lines
40 KiB
Markdown

# Phase 3: Document Migration & Multi-User Isolation - Pattern Map
**Mapped:** 2026-05-23
**Files analyzed:** 14 new/modified files
**Analogs found:** 14 / 14
---
## File Classification
| New/Modified File | Role | Data Flow | Closest Analog | Match Quality |
|---|---|---|---|---|
| `backend/migrations/versions/0003_multi_user_isolation.py` | migration | batch | `backend/migrations/versions/0002_add_backup_codes_and_password_must_change.py` | exact |
| `backend/storage/base.py` | ABC | request-response | `backend/storage/base.py` (self — add abstract method) | exact |
| `backend/storage/minio_backend.py` | service | request-response | `backend/storage/minio_backend.py` (self — add methods mirroring `presigned_get_url`) | exact |
| `backend/api/documents.py` | controller | request-response | `backend/api/auth.py` (auth-gated CRUD + ownership) | role-match |
| `backend/api/topics.py` | controller | CRUD | `backend/api/admin.py` (auth-gated CRUD with role guard) | role-match |
| `backend/api/auth.py` | controller | request-response | `backend/api/auth.py` (self — add `GET /me/quota` endpoint) | exact |
| `backend/services/classifier.py` | service | transform | `backend/services/classifier.py` (self — change signature) | exact |
| `backend/services/storage.py` | service | CRUD | `backend/services/storage.py` (self — remove settings, add quota SQL) | exact |
| `backend/tasks/document_tasks.py` | task | batch | `backend/tasks/email_tasks.py` (DB lookup within async task body) | role-match |
| `backend/celery_app.py` | config | event-driven | `backend/celery_app.py` (self — add beat_schedule) | exact |
| `backend/config.py` | config | — | `backend/config.py` (self — add optional env vars) | exact |
| `frontend/src/stores/documents.js` | store | request-response | `frontend/src/stores/auth.js` (multi-step async action) | role-match |
| `frontend/src/components/layout/AppSidebar.vue` | component | request-response | `frontend/src/components/layout/AppSidebar.vue` (self — add footer widget) | exact |
| `frontend/src/components/layout/QuotaBar.vue` | component | request-response | `frontend/src/components/auth/PasswordStrengthBar.vue` | role-match |
| `frontend/src/components/upload/UploadProgress.vue` | component | event-driven | `frontend/src/components/upload/UploadProgress.vue` (self — add progress bar) | exact |
---
## Pattern Assignments
### `backend/migrations/versions/0003_multi_user_isolation.py` (migration, batch)
**Analog:** `backend/migrations/versions/0002_add_backup_codes_and_password_must_change.py` and `backend/migrations/versions/0001_initial_schema.py`
**Header / revision identifiers pattern** (0002, lines 1-18):
```python
"""Add backup_codes table and password_must_change column to users.
Revision ID: 0002
Revises: 0001
Create Date: 2026-05-22
...
"""
from __future__ import annotations
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
from alembic import op
revision = "0002"
down_revision = "0001"
branch_labels = None
depends_on = None
```
Phase 3 migration: `revision = "0003"`, `down_revision = "0002"`. Add a plain-language docstring listing all steps (cleanup, NOT NULL, quota reconcile, index).
**Data-manipulation ops pattern** (0002, lines 26-53):
```python
def upgrade() -> None:
# ── 1. Add password_must_change to users ──────────────────────────────────
op.add_column(
"users",
sa.Column(
"password_must_change",
sa.Boolean(),
nullable=False,
server_default="false",
),
)
# ── 2. Create backup_codes table ──────────────────────────────────────────
op.create_table(...)
op.create_index("ix_backup_codes_user_id", "backup_codes", ["user_id"])
# ── Privilege grants (Pitfall 4) ───────────────────────────────────────────
op.execute(
"GRANT SELECT, INSERT, UPDATE, DELETE ON backup_codes TO docuvault_app;"
)
```
Phase 3 uses `op.execute(text(...))` for DELETE/UPDATE statements (no new tables, so no GRANT needed per Finding 1). Numbered section comments (`# ── 1.`, `# ── 2.`) are the established style.
**Schema change pattern** (0002, lines 56-63):
```python
def downgrade() -> None:
op.drop_index("ix_backup_codes_user_id", table_name="backup_codes")
op.drop_table("backup_codes")
op.drop_column("users", "password_must_change")
```
Phase 3 `downgrade()` reverses the `NOT NULL` constraint via `op.alter_column('documents', 'user_id', nullable=True)` and drops `ix_topics_user_id`. The MinIO and row-delete steps cannot be reversed — document this explicitly in the downgrade docstring.
**Inline synchronous MinIO pattern for migration** (RESEARCH.md Finding 1 — no codebase analog, copy from research):
```python
# In upgrade() — call synchronous MinIO SDK directly, NOT the async MinIOBackend wrapper
import os
from minio import Minio
_client = Minio(
endpoint=os.environ["MINIO_ENDPOINT"],
access_key=os.environ["MINIO_ACCESS_KEY"],
secret_key=os.environ["MINIO_SECRET_KEY"],
secure=False,
)
for key in _keys_to_delete:
try:
_client.remove_object(os.environ.get("MINIO_BUCKET", "docuvault"), key)
except Exception:
pass # object already gone — safe to skip
```
**Quota reconciliation SQL** (RESEARCH.md D-03 / Finding 4):
```python
op.execute(
text(
"UPDATE quotas SET used_bytes = ("
" SELECT COALESCE(SUM(size_bytes), 0) FROM documents"
" WHERE documents.user_id = quotas.user_id"
")"
)
)
```
---
### `backend/storage/base.py` (ABC, request-response)
**Analog:** `backend/storage/base.py` lines 47-49 — existing `presigned_get_url` abstract method; new `presigned_put_url` follows identical signature style.
**Existing abstract method to copy style from** (lines 21-34 and 47-49):
```python
@abstractmethod
async def put_object(
self,
user_id: str,
document_id: str,
file_bytes: bytes,
extension: str,
content_type: str,
) -> str:
"""Store bytes and return the generated object key.
The key MUST follow the STORE-02 schema: {user_id}/{document_id}/{uuid4()}{ext}.
The human-readable filename MUST NOT appear in the returned key.
"""
...
@abstractmethod
async def presigned_get_url(self, object_key: str, expires_minutes: int = 60) -> str:
"""Return a time-limited pre-signed download URL for the object."""
...
```
Add two new abstract methods after `presigned_get_url` using the same style:
```python
@abstractmethod
async def generate_presigned_put_url(
self,
object_key: str,
expires_minutes: int = 15,
) -> str:
"""Return a time-limited pre-signed PUT URL for direct browser upload.
The object_key MUST be pre-computed before calling this method and stored
in the Document row so stat_object() at confirm time uses the same key.
Uses the public-facing client (not the Docker-internal client) so the URL
is browser-resolvable (RESEARCH.md Finding 3).
"""
...
@abstractmethod
async def stat_object(self, object_key: str) -> int:
"""Return the size in bytes of an existing object.
Raises minio.error.S3Error with code='NoSuchKey' if the object does not exist.
Call at confirm time to get the authoritative file size (D-07).
"""
...
```
---
### `backend/storage/minio_backend.py` (service, request-response)
**Analog:** `backend/storage/minio_backend.py` — existing `presigned_get_url` and `delete_object` methods demonstrate both `asyncio.to_thread` patterns.
**Imports and constructor pattern** (lines 1-46):
```python
import asyncio
import io
import uuid
from datetime import timedelta
from minio import Minio
from storage.base import StorageBackend
class MinIOBackend(StorageBackend):
def __init__(
self,
endpoint: str,
access_key: str,
secret_key: str,
bucket: str,
secure: bool = False,
) -> None:
self._bucket = bucket
self._client = Minio(
endpoint=endpoint,
access_key=access_key,
secret_key=secret_key,
secure=secure,
)
```
Phase 3 adds a `public_endpoint` parameter and creates `self._public_client` as a second `Minio` instance. Add after `self._client` construction:
```python
# Second client for presigned URL generation — uses the browser-accessible
# hostname (RESEARCH.md Finding 3 / MINIO_PUBLIC_ENDPOINT env var).
# Falls back to the internal client if public_endpoint is not configured.
self._public_client = Minio(
endpoint=public_endpoint or endpoint,
access_key=access_key,
secret_key=secret_key,
secure=secure,
)
```
**Simple `asyncio.to_thread` pattern** (lines 86-88):
```python
async def delete_object(self, object_key: str) -> None:
"""Delete an object from MinIO by key."""
await asyncio.to_thread(self._client.remove_object, self._bucket, object_key)
```
New `stat_object` follows the same single-liner pattern:
```python
async def stat_object(self, object_key: str) -> int:
"""Return the authoritative file size in bytes via stat_object (D-07).
Raises minio.error.S3Error(code='NoSuchKey') if object does not exist.
"""
result = await asyncio.to_thread(self._client.stat_object, self._bucket, object_key)
return result.size
```
**Existing `presigned_get_url` pattern** (lines 90-99) — copy for `generate_presigned_put_url`, using `self._public_client` instead of `self._client`:
```python
async def presigned_get_url(
self, object_key: str, expires_minutes: int = 60
) -> str:
"""Return a time-limited pre-signed download URL."""
return await asyncio.to_thread(
self._client.presigned_get_object,
self._bucket,
object_key,
timedelta(minutes=expires_minutes),
)
```
New method:
```python
async def generate_presigned_put_url(
self,
object_key: str,
expires_minutes: int = 15,
) -> str:
"""Return a presigned PUT URL using the public-facing client.
Uses self._public_client so the URL contains the browser-resolvable hostname,
not the Docker-internal 'minio:9000' hostname (RESEARCH.md Finding 3).
"""
return await asyncio.to_thread(
self._public_client.presigned_put_object,
self._bucket,
object_key,
timedelta(minutes=expires_minutes),
)
```
---
### `backend/api/documents.py` (controller, request-response)
**Analog:** `backend/api/auth.py` (auth-gated handlers) and `backend/api/admin.py` (ownership + role guards). Current `backend/api/documents.py` is replaced.
**Imports pattern** — copy from `api/auth.py` lines 22-39, adapted for documents:
```python
from __future__ import annotations
import uuid
from fastapi import APIRouter, Depends, HTTPException, Query, status
from pydantic import BaseModel
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from config import settings
from db.models import Document, Quota, User
from deps.auth import get_current_user
from deps.db import get_db
from services import storage
from tasks.document_tasks import extract_and_classify
from storage import get_storage_backend
```
**Auth guard + admin 403 pattern** — derive from `deps/auth.py` `get_current_admin` (lines 78-92); add a `get_regular_user` dependency:
```python
async def get_regular_user(user: User = Depends(get_current_user)) -> User:
"""Reject admin accounts on all /api/documents/* endpoints (D-16, SC4).
Admin accounts cannot access document content (CLAUDE.md + SEC-04).
Returns 403 (not 404) — the admin knows document endpoints exist.
"""
if user.role == "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin accounts cannot access document content",
)
return user
```
**Ownership assertion pattern** — derived from RESEARCH.md Finding 7; matches 404-for-cross-user rule (D-16):
```python
doc = await session.get(Document, uuid.UUID(doc_id))
if doc is None or doc.user_id != current_user.id:
raise HTTPException(status_code=404, detail="Document not found")
```
This identical two-condition check appears in every handler that takes a `doc_id` path parameter.
**Atomic quota enforcement pattern** — from RESEARCH.md Finding 4; use `sqlalchemy.text()` with bound params, `fetchone()` to detect quota exceeded:
```python
from sqlalchemy import text
result = await session.execute(
text("""
UPDATE quotas
SET used_bytes = used_bytes + :delta
WHERE user_id = :uid
AND (used_bytes + :delta) <= limit_bytes
RETURNING used_bytes, limit_bytes
"""),
{"delta": file_size, "uid": str(current_user.id)},
)
row = result.fetchone()
if row is None:
quota_row = await session.execute(
text("SELECT used_bytes, limit_bytes FROM quotas WHERE user_id = :uid"),
{"uid": str(current_user.id)},
)
q = quota_row.fetchone()
raise HTTPException(
status_code=413,
detail={
"used_bytes": q.used_bytes if q else 0,
"limit_bytes": q.limit_bytes if q else 0,
"rejected_bytes": file_size,
},
)
```
**Atomic quota decrement pattern** — from RESEARCH.md Finding 4 (delete handler):
```python
await session.execute(
text("""
UPDATE quotas
SET used_bytes = GREATEST(0, used_bytes - :delta)
WHERE user_id = :uid
"""),
{"delta": doc.size_bytes, "uid": str(doc.user_id)},
)
```
**Existing simple handler pattern to preserve** (current `api/documents.py` lines 70-80):
```python
@router.get("")
async def list_documents(
topic: Optional[str] = Query(None),
page: int = Query(1, ge=1),
per_page: int = Query(20, ge=1, le=100),
session: AsyncSession = Depends(get_db),
):
docs = await storage.list_metadata(session, topic=topic)
...
```
Phase 3 adds `current_user: User = Depends(get_regular_user)` to this and every other handler.
---
### `backend/api/topics.py` (controller, CRUD)
**Analog:** `backend/api/topics.py` (self, modify) with `backend/api/admin.py` auth guard style.
**Current handler without auth** (lines 29-35):
```python
@router.get("")
async def list_topics(session: AsyncSession = Depends(get_db)):
topics = await storage.load_topics(session)
...
```
Phase 3 adds `current_user: User = Depends(get_current_user)` and passes `current_user.id` to the service layer for namespace filtering.
**Topic namespace query pattern** — from RESEARCH.md Finding 6; use `or_` / `is_`:
```python
from sqlalchemy import or_
from db.models import Topic
stmt = select(Topic).where(
or_(Topic.user_id == current_user.id, Topic.user_id.is_(None))
).order_by(Topic.name)
```
**Admin-only endpoint pattern** — copy from `api/admin.py` lines 133-137:
```python
@router.get("/users")
async def list_users(
session: AsyncSession = Depends(get_db),
_admin: User = Depends(get_current_admin),
) -> dict:
```
For `POST /api/admin/topics`, use `_admin: User = Depends(get_current_admin)` as the guard. The new admin topics router prefix is `/api/admin` and lives in `api/admin.py` (append there, do not create a new file unless the planner decides otherwise).
---
### `backend/api/auth.py` (controller, request-response — add `GET /me/quota`)
**Analog:** `backend/api/auth.py` lines 383-387 — the existing `GET /api/auth/me` endpoint is the direct model.
**Existing `get_me` handler pattern** (lines 383-387):
```python
@router.get("/me")
async def get_me(current_user: User = Depends(get_current_user)):
"""Return the current user's profile (requires valid Bearer token)."""
return _user_dict(current_user)
```
New `GET /api/auth/me/quota` follows the same pattern but adds `session`:
```python
@router.get("/me/quota")
async def get_my_quota(
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_db),
):
"""Return the current user's quota usage (STORE-04)."""
q = await session.get(Quota, current_user.id)
if q is None:
raise HTTPException(status_code=404, detail="Quota not found")
return {"used_bytes": q.used_bytes, "limit_bytes": q.limit_bytes}
```
Add `from db.models import Quota` to the imports at the top of `api/auth.py` (the `User` import is already there on line 33).
---
### `backend/services/classifier.py` (service, transform)
**Analog:** `backend/services/classifier.py` (self — signature change) and `backend/ai/__init__.py` (factory).
**Current `classify_document` signature** (lines 18-23):
```python
async def classify_document(
session: AsyncSession,
doc_id: str,
topic_names: list[str] | None = None,
) -> list[str]:
```
Phase 3 signature adds two optional keyword parameters:
```python
async def classify_document(
session: AsyncSession,
doc_id: str,
topic_names: list[str] | None = None,
ai_provider: str | None = None, # NEW — from user.ai_provider (D-14)
ai_model: str | None = None, # NEW — from user.ai_model (D-15)
) -> list[str]:
```
**Current provider construction (to replace)** (lines 33-35):
```python
settings = storage.load_settings()
system_prompt = settings.get("system_prompt", "")
provider = get_provider(settings)
```
Replace with minimal-change dict construction (RESEARCH.md Finding 12):
```python
from config import settings as app_settings
_ai_provider = ai_provider or app_settings.default_ai_provider
_ai_model = ai_model or app_settings.default_ai_model
system_prompt = app_settings.system_prompt or _DEFAULT_SYSTEM_PROMPT
_settings = {
"active_provider": _ai_provider,
"providers": {
_ai_provider: {"model": _ai_model},
},
}
provider = get_provider(_settings)
```
`_DEFAULT_SYSTEM_PROMPT` is a module-level constant with the hardcoded fallback (D-13).
**Topic namespace filter for classifier** — use `or_` query from RESEARCH.md Finding 6. The `load_topics(session)` call in line 39 must be replaced with a user-scoped query. Pass `user_id` into the topic lookup:
```python
# Replaces: all_topics = await storage.load_topics(session)
all_topics = await storage.load_topics_for_user(session, user_id=doc.user_id)
```
`load_topics_for_user` is a new function to add to `services/storage.py`.
**New topic creation scoped to user namespace** (D-11) — replace line 52:
```python
# Before:
await storage.create_topic(session, name.strip())
# After (user_id from doc.user_id ensures per-user namespace):
await storage.create_topic(session, name.strip(), user_id=doc.user_id)
```
---
### `backend/services/storage.py` (service, CRUD)
**Analog:** `backend/services/storage.py` (self — remove functions, add new ones).
**Functions to remove entirely** (lines 416-450):
- `load_settings()` — lines 416-426
- `save_settings()` — lines 428-434
- `mask_api_key()` — lines 437-440
- `settings_masked()` — lines 443-449
**Imports to remove** from lines 36-37:
```python
from config import DEFAULT_SETTINGS, SETTINGS_FILE
```
Replace with:
```python
from config import settings as app_settings
```
**`save_upload` removal** (lines 86-136) — the function that accepted file bytes and uploaded to MinIO is deleted in Phase 3. The upload-url handler in `api/documents.py` creates the `Document` row and presigned URL directly. The function signature to retain is `save_metadata`, `get_metadata`, `list_metadata`.
**`list_metadata` must add user filter** (current lines 178-196):
```python
async def list_metadata(
session: AsyncSession, topic: Optional[str] = None
) -> list:
stmt = select(Document).order_by(Document.created_at.desc())
```
Phase 3: add `user_id` parameter:
```python
async def list_metadata(
session: AsyncSession,
user_id: uuid.UUID,
topic: Optional[str] = None,
) -> list:
stmt = select(Document).where(Document.user_id == user_id).order_by(Document.created_at.desc())
```
**`delete_document` quota decrement** (currently lines 199-222 — no quota logic):
After the MinIO delete attempt, add the quota decrement before `await session.delete(doc)`:
```python
# Atomic quota decrement (STORE-06, D-07)
await session.execute(
text("UPDATE quotas SET used_bytes = GREATEST(0, used_bytes - :delta) WHERE user_id = :uid"),
{"delta": doc.size_bytes, "uid": str(doc.user_id)},
)
```
**`create_topic` user_id parameter** (current signature, lines 328-355):
```python
async def create_topic(
session: AsyncSession,
name: str,
description: str = "",
color: str = "#6366f1",
) -> dict:
q = await session.execute(
select(Topic).where(sql_func.lower(Topic.name) == name.lower())
)
```
Phase 3 adds `user_id: uuid.UUID | None = None` and scopes the deduplication query (RESEARCH.md Finding 6 Risk 5):
```python
async def create_topic(
session: AsyncSession,
name: str,
description: str = "",
color: str = "#6366f1",
user_id: uuid.UUID | None = None, # NEW — None = system topic
) -> dict:
q = await session.execute(
select(Topic).where(
sql_func.lower(Topic.name) == name.lower(),
Topic.user_id == user_id, # scope deduplication by namespace
)
)
```
**New `load_topics_for_user` function** (add after `load_topics`):
```python
async def load_topics_for_user(
session: AsyncSession,
user_id: uuid.UUID,
) -> list:
"""Return system topics (user_id IS NULL) + user's own topics, ordered by name."""
from sqlalchemy import or_
q = await session.execute(
select(Topic).where(
or_(Topic.user_id == user_id, Topic.user_id.is_(None))
).order_by(Topic.name)
)
return [
{"id": str(t.id), "name": t.name, "description": t.description, "color": t.color}
for t in q.scalars()
]
```
---
### `backend/tasks/document_tasks.py` (task, batch)
**Analog:** `backend/tasks/email_tasks.py` `_run_send_security_alert` (lines 50-73) — the pattern for doing a DB lookup inside an async task body; and existing `_run` function for the overall structure.
**Existing task entry-point pattern** (document_tasks.py lines 22-25):
```python
@celery_app.task(name="tasks.document_tasks.extract_and_classify")
def extract_and_classify(document_id: str) -> dict:
"""Synchronous Celery entry-point — delegates to async _run via asyncio.run."""
return asyncio.run(_run(document_id))
```
New `cleanup_abandoned_uploads` task follows the same pattern:
```python
@celery_app.task(name="tasks.document_tasks.cleanup_abandoned_uploads")
def cleanup_abandoned_uploads() -> dict:
"""Synchronous entry-point — delegates to async _cleanup via asyncio.run."""
return asyncio.run(_cleanup_abandoned())
```
**Existing `_run` DB lookup and deferred import style** (lines 28-48):
```python
async def _run(document_id: str) -> dict:
import uuid as _uuid
from db.session import AsyncSessionLocal
from db.models import Document
from services import extractor, classifier
from storage import get_storage_backend
async with AsyncSessionLocal() as session:
try:
doc_uuid = _uuid.UUID(document_id)
except ValueError:
return {"document_id": document_id, "status": "invalid_id"}
doc = await session.get(Document, doc_uuid)
if doc is None:
return {"document_id": document_id, "status": "not_found"}
```
Phase 3 `_run` adds a second DB lookup for AI config after fetching the document — copy the `session.get` pattern for the `User` model:
```python
# Resolve per-user AI config (D-14, DOC-03, DOC-05)
from db.models import User
user = await session.get(User, doc.user_id)
ai_provider = (user.ai_provider if user else None) or settings.default_ai_provider
ai_model = (user.ai_model if user else None) or settings.default_ai_model
```
Then pass to classifier:
```python
topics = await classifier.classify_document(
session, document_id,
ai_provider=ai_provider,
ai_model=ai_model,
)
```
**`_cleanup_abandoned` structure** (modeled on `email_tasks.py` `_run_send_security_alert`, lines 51-73):
```python
async def _cleanup_abandoned() -> dict:
from datetime import datetime, timezone, timedelta
from sqlalchemy import select
from db.session import AsyncSessionLocal
from db.models import Document
from storage import get_storage_backend
cutoff = datetime.now(timezone.utc) - timedelta(hours=1)
async with AsyncSessionLocal() as session:
result = await session.execute(
select(Document).where(
Document.status == "pending",
Document.created_at < cutoff,
)
)
docs = result.scalars().all()
backend = get_storage_backend()
cleaned = 0
for doc in docs:
try:
if doc.object_key:
await backend.delete_object(doc.object_key)
except Exception:
pass # MinIO object may not exist yet — safe to ignore
await session.delete(doc)
cleaned += 1
await session.commit()
return {"cleaned": cleaned}
```
---
### `backend/celery_app.py` (config, event-driven)
**Analog:** `backend/celery_app.py` (self — add beat_schedule). Existing `task_routes` block is the model.
**Existing task_routes pattern** (lines 31-34):
```python
celery_app.conf.task_routes = {
"tasks.document_tasks.*": {"queue": "documents"},
"tasks.email_tasks.*": {"queue": "email"},
}
```
Add beat schedule after `task_routes` using the same conf assignment style:
```python
from datetime import timedelta as _timedelta
celery_app.conf.beat_schedule = {
"cleanup-abandoned-uploads": {
"task": "tasks.document_tasks.cleanup_abandoned_uploads",
"schedule": _timedelta(minutes=30),
},
}
celery_app.conf.timezone = "UTC"
```
Import `timedelta` with an underscore alias (`_timedelta`) to follow the module's "keep imports minimal, avoid pulling in config" convention (see module docstring line 6).
---
### `backend/config.py` (config)
**Analog:** `backend/config.py` (self — add three optional env vars). Existing optional-with-default pattern:
**Existing optional string field pattern** (lines 40-44):
```python
# SMTP (Phase 2 — D-01)
smtp_host: str = ""
smtp_port: int = 587
smtp_user: str = ""
smtp_password: str = ""
```
Phase 3 additions follow the same pattern — typed field with a documented default:
```python
# MinIO public endpoint (Phase 3 — presigned URL Docker hostname fix, RESEARCH.md Finding 3)
minio_public_endpoint: str = "" # falls back to minio_endpoint if empty
# AI classification defaults (Phase 3 — D-13, D-15)
system_prompt: str = "" # SYSTEM_PROMPT env var; hardcoded fallback in classifier.py
default_ai_provider: str = "ollama" # DEFAULT_AI_PROVIDER env var
default_ai_model: str = "llama3.2" # DEFAULT_AI_MODEL env var
```
Remove from `config.py`: `SETTINGS_FILE = Path(...)` (line 60), `DEFAULT_SYSTEM_PROMPT` (lines 62-67), and `DEFAULT_SETTINGS` dict (lines 69-91). These are flat-file artifacts (D-12).
---
### `frontend/src/stores/documents.js` (store, request-response)
**Analog:** `frontend/src/stores/documents.js` (self — replace `upload`) and `frontend/src/stores/auth.js` (multi-step async action with error handling).
**Existing store skeleton pattern** (documents.js lines 1-10):
```javascript
import { defineStore } from 'pinia'
import { ref } from 'vue'
import * as api from '../api/client.js'
export const useDocumentsStore = defineStore('documents', () => {
const documents = ref([])
const total = ref(0)
const loading = ref(false)
const error = ref(null)
```
Add per-upload progress state:
```javascript
const uploadProgress = ref({}) // { [filename]: 0-100 }
```
**Multi-step async action pattern** — from `stores/auth.js` `login` function (lines 55-83), which handles branching outcomes:
```javascript
async function login(email, password, options = {}) {
loading.value = true
error.value = null
try {
const data = await api.login({...})
if (data.requires_totp) { return { requires_totp: true } }
// ...
accessToken.value = data.access_token
user.value = data.user
} catch (e) {
error.value = e.message
throw e
} finally {
loading.value = false
}
}
```
Phase 3 `upload` action follows the same try/catch/finally pattern for the three-step flow. The XHR PUT step uses a helper (not `api.*` which uses fetch):
```javascript
async function upload(file, autoClassify = true) {
const key = file.name
uploadProgress.value[key] = 0
try {
// Step 1: get presigned URL
const { upload_url, document_id } = await api.getUploadUrl(file.name, file.type)
// Step 2: PUT directly to MinIO with XHR progress
await uploadToMinIO(upload_url, file, (pct) => {
uploadProgress.value[key] = pct
})
// Step 3: confirm
const doc = await api.confirmUpload(document_id)
documents.value.unshift(doc)
total.value++
return doc
} catch (e) {
error.value = e.message
throw e
} finally {
delete uploadProgress.value[key]
}
}
```
**XHR upload helper** (RESEARCH.md Finding 9 — no codebase analog; define in `api/client.js` or at top of `stores/documents.js`):
```javascript
function uploadToMinIO(url, file, onProgress) {
return new Promise((resolve, reject) => {
const xhr = new XMLHttpRequest()
xhr.upload.addEventListener('progress', e => {
if (e.lengthComputable) onProgress(Math.round(e.loaded / e.total * 100))
})
xhr.addEventListener('load', () =>
xhr.status < 400 ? resolve() : reject(new Error(`PUT failed: ${xhr.status}`))
)
xhr.addEventListener('error', () => reject(new Error('Network error during upload')))
xhr.open('PUT', url)
xhr.setRequestHeader('Content-Type', file.type || 'application/octet-stream')
xhr.send(file)
})
}
```
Note: XHR does NOT send the `Authorization: Bearer` header — presigned URLs are self-authenticating. Do not add the auth header to the PUT request.
---
### `frontend/src/components/layout/AppSidebar.vue` (component, request-response)
**Analog:** `frontend/src/components/layout/AppSidebar.vue` (self — add quota bar widget in the bottom section).
**Existing bottom section pattern** (lines 57-103):
```html
<!-- Settings + Admin link -->
<div class="px-3 py-4 border-t border-gray-100">
<!-- Admin link (admin users only) -->
<router-link v-if="authStore.user?.role === 'admin'" to="/admin" class="nav-link">...</router-link>
<router-link to="/settings" class="nav-link">...</router-link>
<!-- User identity footer -->
<div v-if="authStore.user" class="flex items-center gap-3 px-4 py-3 border-t border-gray-100 mt-2 -mx-3">
...sign out button...
</div>
</div>
```
Insert `<QuotaBar />` component between the nav links and the user identity footer. Import via `<script setup>`.
**Existing `<script setup>` import pattern** (lines 107-119):
```javascript
import { useRouter } from 'vue-router'
import { useTopicsStore } from '../../stores/topics.js'
import { useAuthStore } from '../../stores/auth.js'
const topicsStore = useTopicsStore()
const authStore = useAuthStore()
```
Add:
```javascript
import QuotaBar from '../layout/QuotaBar.vue'
```
QuotaBar fetches its own data on `onMounted` — no props needed from AppSidebar.
---
### `frontend/src/components/layout/QuotaBar.vue` (component, NEW — request-response)
**Analog:** `frontend/src/components/auth/PasswordStrengthBar.vue` for the visual progress-bar pattern, and `frontend/src/components/layout/AppSidebar.vue` for the `<script setup>` + store consumption style.
**`<script setup>` with reactive data fetched on mount** — pattern from `stores/auth.js` and the sidebar's store usage:
```javascript
<script setup>
import { ref, computed, onMounted } from 'vue'
import * as api from '../../api/client.js'
const usedBytes = ref(0)
const limitBytes = ref(0)
const pct = computed(() =>
limitBytes.value > 0 ? Math.min(100, (usedBytes.value / limitBytes.value) * 100) : 0
)
const barColor = computed(() =>
pct.value >= 95 ? 'bg-red-500' : pct.value >= 80 ? 'bg-amber-400' : 'bg-indigo-400'
)
const label = computed(() => {
const used = (usedBytes.value / 1048576).toFixed(1)
const limit = (limitBytes.value / 1048576).toFixed(0)
return `${used} MB of ${limit} MB`
})
async function fetchQuota() {
try {
const data = await api.getMyQuota()
usedBytes.value = data.used_bytes
limitBytes.value = data.limit_bytes
} catch {
// Not yet authenticated or endpoint not yet available — silently ignore
}
}
onMounted(fetchQuota)
</script>
```
**Template pattern — Tailwind progress bar** (mirrors `PasswordStrengthBar.vue`'s bar-within-container structure):
```html
<template>
<div class="px-4 pb-3">
<p class="text-xs text-gray-400 mb-1">Storage</p>
<div class="w-full h-1.5 bg-gray-200 rounded-full overflow-hidden">
<div
class="h-full rounded-full transition-all"
:class="barColor"
:style="{ width: `${pct}%` }"
></div>
</div>
<p class="text-xs text-gray-400 mt-1">{{ label }}</p>
</div>
</template>
```
---
### `frontend/src/components/upload/UploadProgress.vue` (component, event-driven)
**Analog:** `frontend/src/components/upload/UploadProgress.vue` (self — add progress bar inside existing row).
**Existing row structure** (lines 6-28):
```html
<div v-for="item in items" :key="item.name" class="flex items-center gap-3 ...">
<div class="flex-1 min-w-0">
<p class="text-sm font-medium text-gray-800 truncate">{{ item.name }}</p>
<p v-if="item.error" class="text-xs text-red-500 mt-0.5">{{ item.error }}</p>
<p v-else-if="item.done" class="text-xs text-green-600 mt-0.5">
Done{{ ... }}
</p>
<p v-else class="text-xs text-gray-400 mt-0.5">Uploading…</p>
</div>
<div class="shrink-0">
<!-- spinner / check / error icons -->
</div>
</div>
```
Add progress bar after the status text lines, inside `<div class="flex-1 min-w-0">`:
```html
<!-- Progress bar during upload (item.progress is 0-100, undefined when done/error) -->
<div
v-if="!item.done && !item.error && item.progress !== undefined"
class="w-full h-1 bg-gray-200 rounded-full mt-1 overflow-hidden"
>
<div
class="h-full bg-indigo-400 rounded-full transition-all"
:style="{ width: `${item.progress}%` }"
></div>
</div>
```
The `item` shape gains a `progress` field (0-100, set from `uploadProgress` state in `useDocumentsStore`).
**Existing `defineProps` pattern** (lines 33-35):
```javascript
defineProps({
items: { type: Array, default: () => [] },
})
```
No change needed — `progress` is a field on each item object, not a separate prop.
---
## Shared Patterns
### Authentication / Auth Guard
**Source:** `backend/deps/auth.py` lines 38-75 (`get_current_user`) and lines 78-92 (`get_current_admin`)
**Apply to:** All handlers in `api/documents.py`, `api/topics.py`, and the new `GET /api/auth/me/quota` endpoint
```python
from deps.auth import get_current_user, get_current_admin
from db.models import User
# Regular user guard (all document endpoints):
current_user: User = Depends(get_current_user)
# Admin-only guard (admin topics, admin quota):
_admin: User = Depends(get_current_admin)
# Derived "no admin" guard — define once in api/documents.py, reuse across all handlers:
async def get_regular_user(user: User = Depends(get_current_user)) -> User:
if user.role == "admin":
raise HTTPException(403, "Admin accounts cannot access document content")
return user
```
### Ownership Assertion (SEC-04)
**Source:** RESEARCH.md Finding 7 / CONTEXT.md D-16
**Apply to:** Every handler in `api/documents.py` that accepts a `doc_id` path parameter
```python
doc = await session.get(Document, uuid.UUID(doc_id))
if doc is None or doc.user_id != current_user.id:
raise HTTPException(status_code=404, detail="Document not found")
# Returns 404 (not 403) for both "not found" and "wrong owner" — SEC-04 / D-16
```
### Atomic Quota SQL
**Source:** RESEARCH.md Finding 4 / CONTEXT.md D-07
**Apply to:** `api/documents.py` confirm endpoint (increment) and `services/storage.py` `delete_document` (decrement)
```python
from sqlalchemy import text
# Increment (confirm endpoint):
result = await session.execute(
text("""
UPDATE quotas
SET used_bytes = used_bytes + :delta
WHERE user_id = :uid
AND (used_bytes + :delta) <= limit_bytes
RETURNING used_bytes, limit_bytes
"""),
{"delta": file_size, "uid": str(user_id)},
)
# Decrement (delete):
await session.execute(
text("UPDATE quotas SET used_bytes = GREATEST(0, used_bytes - :delta) WHERE user_id = :uid"),
{"delta": size_bytes, "uid": str(user_id)},
)
```
### asyncio.to_thread for MinIO SDK
**Source:** `backend/storage/minio_backend.py` lines 63-70, 86-88, 90-99
**Apply to:** All new methods in `MinIOBackend` (`generate_presigned_put_url`, `stat_object`)
```python
# Simple single-call pattern:
await asyncio.to_thread(self._client.remove_object, self._bucket, object_key)
# With return value:
return await asyncio.to_thread(
self._client.presigned_get_object,
self._bucket,
object_key,
timedelta(minutes=expires_minutes),
)
```
### Celery Task Structure (sync entry-point + async body)
**Source:** `backend/tasks/document_tasks.py` lines 22-25 and `backend/tasks/email_tasks.py` lines 19-36
**Apply to:** New `cleanup_abandoned_uploads` task in `document_tasks.py`
```python
@celery_app.task(name="tasks.document_tasks.cleanup_abandoned_uploads")
def cleanup_abandoned_uploads() -> dict:
return asyncio.run(_cleanup_abandoned())
async def _cleanup_abandoned() -> dict:
# All imports deferred here (celery_app.py circular import rule)
from db.session import AsyncSessionLocal
...
async with AsyncSessionLocal() as session:
...
```
### Frontend API Client `request()` Helper
**Source:** `frontend/src/api/client.js` lines 11-41
**Apply to:** All new API functions (`getUploadUrl`, `confirmUpload`, `getMyQuota`) added to `api/client.js`
```javascript
// GET:
export function getMyQuota() {
return request('/api/auth/me/quota')
}
// POST with JSON body:
export function getUploadUrl(filename, contentType) {
return request('/api/documents/upload-url', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ filename, content_type: contentType }),
})
}
export function confirmUpload(documentId) {
return request(`/api/documents/${documentId}/confirm`, { method: 'POST' })
}
```
### Pinia Store Action Pattern
**Source:** `frontend/src/stores/documents.js` lines 25-29 and `frontend/src/stores/auth.js` lines 55-83
**Apply to:** Modified `upload` action in `stores/documents.js`
```javascript
// Pattern: loading flag + try/catch/finally + error ref
async function someAction(...args) {
loading.value = true
error.value = null
try {
const data = await api.someApiCall(args)
// update state
return data
} catch (e) {
error.value = e.message
throw e
} finally {
loading.value = false
}
}
```
---
## No Analog Found
All files have analogs. The following files have partial analogs (research patterns used instead of codebase patterns for specific sub-sections):
| File | Sub-section | Reason | Source |
|---|---|---|---|
| `backend/migrations/versions/0003_multi_user_isolation.py` | Inline MinIO SDK calls | No existing migration has side-effect MinIO calls | RESEARCH.md Finding 1 |
| `backend/api/documents.py` | Atomic quota SQL | No existing handler uses raw `text()` UPDATE | RESEARCH.md Finding 4 |
| `frontend/src/stores/documents.js` | XHR upload helper | No existing XHR code in codebase (fetch-only) | RESEARCH.md Finding 9 |
| `frontend/src/components/layout/QuotaBar.vue` | Full component (NEW) | No progress-bar component with `onMounted` fetch exists | PasswordStrengthBar.vue (visual style only) |
---
## Metadata
**Analog search scope:** `backend/` (all .py), `frontend/src/` (all .js, .vue)
**Files scanned:** 14 source files read in full
**Pattern extraction date:** 2026-05-23