5 plans across 5 sequential waves covering: Alembic migration 0003 (null-user cleanup, NOT NULL constraint, quota reconciliation), presigned MinIO PUT upload flow with atomic quota enforcement, auth guards on all document/topic endpoints, flat-file settings retirement + per-user AI classification, and frontend quota bar with 3-step XHR upload progress. Verification passed across all 12 dimensions. All 8 phase requirements covered (STORE-03/04/05/06, SEC-04, DOC-03/04/05). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
28 KiB
phase, plan, type, wave, depends_on, files_modified, autonomous, requirements, must_haves
| phase | plan | type | wave | depends_on | files_modified | autonomous | requirements | must_haves | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 03-document-migration-multi-user-isolation | 02 | execute | 2 |
|
|
true |
|
|
Purpose: This plan is the only place the atomic quota SQL lives. Plan 03-03's auth guards depend on the endpoint surface defined here. Plan 03-05's frontend depends on the API contract defined here. Output: 6 backend code changes + 1 docker-compose change. After this plan, an authenticated user can perform the full presigned upload/delete/quota cycle from curl; auth guards still allow anonymous access (closed in Plan 03-03).
<execution_context> @/Users/nik/.claude/get-shit-done/workflows/execute-plan.md @/Users/nik/.claude/get-shit-done/templates/summary.md </execution_context>
@.planning/ROADMAP.md @.planning/STATE.md @.planning/phases/03-document-migration-multi-user-isolation/03-CONTEXT.md @.planning/phases/03-document-migration-multi-user-isolation/03-RESEARCH.md @.planning/phases/03-document-migration-multi-user-isolation/03-PATTERNS.md @.planning/phases/03-document-migration-multi-user-isolation/03-UI-SPEC.md @.planning/phases/03-document-migration-multi-user-isolation/03-VALIDATION.md @.planning/phases/03-document-migration-multi-user-isolation/03-01-SUMMARY.md @CLAUDE.md@backend/storage/base.py @backend/storage/minio_backend.py @backend/storage/__init__.py @backend/api/documents.py @backend/api/auth.py @backend/services/storage.py @backend/tasks/document_tasks.py @backend/celery_app.py @backend/config.py @docker-compose.yml
From backend/storage/base.py (current — Phase 1):
class StorageBackend(ABC):
@abstractmethod
async def put_object(self, user_id, document_id, file_bytes, extension, content_type) -> str
@abstractmethod
async def get_object(self, object_key) -> bytes
@abstractmethod
async def delete_object(self, object_key) -> None
@abstractmethod
async def presigned_get_url(self, object_key, expires_minutes=60) -> str
@abstractmethod
async def health_check(self) -> bool
From backend/storage/minio_backend.py (current):
class MinIOBackend(StorageBackend):
def __init__(self, endpoint, access_key, secret_key, bucket, secure=False) -> None
# Each method wraps the synchronous Minio SDK in asyncio.to_thread.
From CONTEXT.md decisions (D-05): POST /api/documents/upload-url body: {"filename": str, "content_type": str} Returns: {"upload_url": str, "document_id": str} POST /api/documents/{id}/confirm body: empty Returns: {"id": str, "size_bytes": int, "used_bytes": int, "status": "uploaded"} Quota exceeded: HTTP 413 detail = {"used_bytes": int, "limit_bytes": int, "rejected_bytes": int}
From backend/services/auth.py (Phase 2): def create_access_token(user_id: str, role: str) -> str
From backend/db/models.py: Document.user_id — Plan 03-01 migration flips to NOT NULL Document.status — 'pending' | 'uploaded' | 'classified' | 'classification_failed' Document.object_key Document.size_bytes Quota.limit_bytes — default 104857600 (100 MB) Quota.used_bytes
From backend/celery_app.py (current): celery_app.conf.task_routes = { "tasks.document_tasks.": {"queue": "documents"}, "tasks.email_tasks.": {"queue": "email"}, }
<threat_model>
Trust Boundaries
| Boundary | Description |
|---|---|
| browser → MinIO (direct PUT) | Untrusted browser uses time-limited presigned URL; MinIO authenticates via HMAC signature; CORS preflight must succeed |
| browser → FastAPI /confirm | Authenticated user provides only document_id; FastAPI reads size_bytes from MinIO stat (never from client) |
| FastAPI /confirm → quotas table | Concurrent /confirm calls race against the same Quota row; PostgreSQL row-level lock + atomic UPDATE WHERE clause prevents overflow |
| Celery beat → DB+MinIO | Runs as docuvault_app; deletes its own rows (status='pending', age > 1h) and own MinIO objects only |
STRIDE Threat Register
| Threat ID | Category | Component | Disposition | Mitigation Plan |
|---|---|---|---|---|
| T-03-04 | Spoofing | POST /api/documents/upload-url body filename/content_type | mitigate | object_key is computed server-side as {user_id}/{document_id}/{uuid4()}{ext} (STORE-02) — filename stored in DB column only; extension derived from Path(body.filename).suffix.lower() |
| T-03-05 | Tampering | Confirm endpoint quota delta | mitigate | size_bytes always comes from backend.stat_object(doc.object_key) (MinIO authoritative) — never from client body or request param |
| T-03-06 | Denial of Service | Concurrent /confirm uploads at quota boundary (SC2) | mitigate | Atomic SQL UPDATE quotas SET used_bytes = used_bytes + :delta WHERE (used_bytes + :delta) <= limit_bytes RETURNING used_bytes; fetchone() None → HTTP 413 (RESEARCH.md Finding 4 + Risk 2) |
| T-03-07 | Information Disclosure | Presigned URL leakage in logs | accept | 15-min TTL, single object key; leak risk acceptable for v1; do not log full URL — log only document_id |
| T-03-08 | Repudiation | Abandoned uploads pile up MinIO orphans | mitigate | Celery beat cleanup_abandoned_uploads every 30m deletes pending docs older than 1 hour and their MinIO objects (D-06) |
| T-03-09 | Information Disclosure | Browser→MinIO PUT CORS misconfiguration leaks origin | mitigate | MINIO_API_CORS_ALLOW_ORIGIN env var explicitly set to ${CORS_ORIGINS} (or http://localhost:5173 default) — not wildcard |
| T-03-10 | Tampering | Docker hostname in presigned URL (minio:9000 not browser-resolvable) | mitigate | Dual MinIO client: self._client (internal endpoint, used for stat/get/put/delete) and self._public_client (public endpoint, used for generate_presigned_put_url) per RESEARCH.md Finding 3 |
| T-03-SC | Tampering | pip installs | mitigate | No new package installs (minio + sqlalchemy already pinned in Phase 1 requirements.txt) |
| </threat_model> |
Modify `backend/storage/minio_backend.py`: extend `__init__` signature with `public_endpoint: str | None = None`; after the existing `self._client = Minio(...)` block add `self._public_client = Minio(endpoint=(public_endpoint or endpoint), access_key=access_key, secret_key=secret_key, secure=secure)`. Append two new async methods (after `presigned_get_url`):
- `generate_presigned_put_url(self, object_key, expires_minutes=15)` — `return await asyncio.to_thread(self._public_client.presigned_put_object, self._bucket, object_key, timedelta(minutes=expires_minutes))`
- `stat_object(self, object_key)` — `result = await asyncio.to_thread(self._client.stat_object, self._bucket, object_key); return result.size`
Modify `backend/storage/__init__.py`: extend `get_storage_backend()` to pass `public_endpoint=settings.minio_public_endpoint` to the `MinIOBackend(...)` constructor.
Modify `backend/config.py`: add `minio_public_endpoint: str = ""` to the `Settings` class alongside the existing `minio_endpoint` field. Comment reference: `# RESEARCH.md Finding 3 — browser-resolvable hostname for presigned URLs`.
Modify `docker-compose.yml`:
1. In the `minio:` service `environment:` block, add `MINIO_API_CORS_ALLOW_ORIGIN: ${CORS_ORIGINS:-http://localhost:5173}` (RESEARCH.md Finding 3, T-03-09).
2. In the `backend:` service `environment:` block, add `- MINIO_PUBLIC_ENDPOINT=${MINIO_PUBLIC_ENDPOINT:-localhost:9000}`.
3. Append a new `celery-beat:` service mirroring `celery-worker` structure with `command: celery -A celery_app beat --loglevel=info` and the same `depends_on`, environment block, and build context (RESEARCH.md Finding 10).
cd backend && python -c "from storage.base import StorageBackend; assert 'generate_presigned_put_url' in dir(StorageBackend); assert 'stat_object' in dir(StorageBackend); from storage.minio_backend import MinIOBackend; import inspect; sig = inspect.signature(MinIOBackend.__init__); assert 'public_endpoint' in sig.parameters; print('OK')" && grep -c "MINIO_API_CORS_ALLOW_ORIGIN" docker-compose.yml && grep -c "celery-beat:" docker-compose.yml && grep -c "minio_public_endpoint" backend/config.py
StorageBackend ABC has 7 abstract methods (5 original + 2 new). MinIOBackend `__init__` accepts `public_endpoint`. docker-compose.yml contains `MINIO_API_CORS_ALLOW_ORIGIN` and `celery-beat:`. config.py contains `minio_public_endpoint`. All grep counts >= 1.
Task 2: Implement upload-url, confirm, delete-with-quota, me/quota endpoints + remove old /upload + abandoned-upload Celery beat
backend/api/documents.py, backend/api/auth.py, backend/services/storage.py, backend/tasks/document_tasks.py, backend/celery_app.py
- backend/api/documents.py — existing handler structure (will be replaced)
- backend/api/auth.py — existing /me endpoint pattern, router prefix, `from db.models import BackupCode, Quota, RefreshToken, User` import line
- backend/services/storage.py — existing save_upload, delete_document, _backend() helper, _doc_to_dict, list_metadata
- backend/tasks/document_tasks.py — existing extract_and_classify task; pattern for sync entry + async body
- backend/tasks/email_tasks.py — _run_send_security_alert shape for async task body
- backend/celery_app.py — current task_routes block (model for beat_schedule append)
- backend/deps/auth.py — get_current_user signature (this plan does NOT wire auth into documents.py — Plan 03-03 does)
- .planning/phases/03-document-migration-multi-user-isolation/03-RESEARCH.md — Finding 4 (atomic quota SQL), Finding 5 (stat_object error handling), Finding 9 (/me/quota), Finding 10 (Celery beat schedule)
- .planning/phases/03-document-migration-multi-user-isolation/03-PATTERNS.md — atomic quota pattern, Celery task pattern
- POST /api/documents/upload-url body {filename: str, content_type: str} returns {upload_url: str, document_id: str} and inserts a Document row with user_id=None (Wave 2 placeholder — Plan 03-03 replaces with current_user.id), status="pending", object_key=f"null-user/{doc_id}/{uuid4()}{ext}", size_bytes=0
- POST /api/documents/{id}/confirm: load Document by id; call backend.stat_object(doc.object_key); catch minio.error.S3Error code="NoSuchKey" → return 422; on success set doc.size_bytes; execute atomic quota UPDATE; on row None → DELETE pending Document + remove MinIO object, return 413 {used_bytes, limit_bytes, rejected_bytes}; on success set status="uploaded", commit, enqueue extract_and_classify.delay(str(doc.id)); return {id, size_bytes, used_bytes, status}
- GET /api/auth/me/quota: load Quota by current_user.id; return {used_bytes, limit_bytes}; if quota missing return 404
- DELETE /api/documents/{id}: existing handler retained but services.storage.delete_document gains an atomic quota decrement (UPDATE quotas SET used_bytes = GREATEST(0, used_bytes - :delta) WHERE user_id = :uid) immediately before row delete
- Old POST /api/documents/upload (multipart) is removed entirely; services.storage.save_upload is removed
- tasks/document_tasks.py adds cleanup_abandoned_uploads task and async _cleanup_abandoned body that selects status="pending" AND created_at < now()-1h, deletes MinIO objects (try/except), then session.delete(doc), commits; returns {"cleaned": int}
- celery_app.py adds beat_schedule entry "cleanup-abandoned-uploads" → "tasks.document_tasks.cleanup_abandoned_uploads" with schedule timedelta(minutes=30) and timezone "UTC"
- test_upload_url_endpoint, test_confirm_endpoint, test_get_quota, test_quota_increment_atomic, test_concurrent_quota_race, test_quota_exceeded_response, test_delete_decrements_quota transition from xfail → pass (or xpass under strict=False)
Rewrite `backend/api/documents.py`. Module-level imports: `from __future__ import annotations`, `import uuid`, `from pathlib import Path`, `from typing import Optional`, `from fastapi import APIRouter, Depends, HTTPException, Query, status`, `from pydantic import BaseModel`, `from sqlalchemy import text`, `from sqlalchemy.ext.asyncio import AsyncSession`, `from db.models import Document, Quota`, `from deps.db import get_db`, `from services import classifier, storage`, `from tasks.document_tasks import extract_and_classify`, `from storage import get_storage_backend`, `from minio.error import S3Error`. Router: `router = APIRouter(prefix="/api/documents", tags=["documents"])`.
Define request model `class UploadUrlRequest(BaseModel): filename: str; content_type: str`. No response models — return plain dicts matching the documented shape.
Endpoint POST `/upload-url`: handler signature `async def request_upload_url(body: UploadUrlRequest, session: AsyncSession = Depends(get_db))`. Generate `doc_id = uuid.uuid4()`; `suffix = Path(body.filename).suffix.lower()`; `object_key = f"null-user/{doc_id}/{uuid.uuid4()}{suffix}"` (Plan 03-03 replaces `"null-user"` with `str(current_user.id)`); insert `Document(id=doc_id, user_id=None, filename=body.filename, content_type=body.content_type, size_bytes=0, storage_backend="minio", status="pending", object_key=object_key)`; `session.add(doc); await session.commit()`; call `upload_url = await get_storage_backend().generate_presigned_put_url(object_key, expires_minutes=15)`; return `{"upload_url": upload_url, "document_id": str(doc_id)}`.
Endpoint POST `/{doc_id}/confirm`: handler signature `async def confirm_upload(doc_id: str, session: AsyncSession = Depends(get_db))`. Parse `uid = uuid.UUID(doc_id)` (catch ValueError → 404); `doc = await session.get(Document, uid)`; if None → HTTPException(404, "Document not found"). Try `size = await get_storage_backend().stat_object(doc.object_key)`; catch `S3Error` where `getattr(e, "code", "") == "NoSuchKey"` → HTTPException(422, "Upload not found — presigned URL may have expired"). Set `doc.size_bytes = size`; `await session.flush()`. Conditional atomic quota update — if `doc.user_id is not None`: execute `text("UPDATE quotas SET used_bytes = used_bytes + :delta WHERE user_id = :uid AND (used_bytes + :delta) <= limit_bytes RETURNING used_bytes, limit_bytes")` with `{"delta": size, "uid": str(doc.user_id)}`; `row = result.fetchone()`. If `row is None` (quota exceeded): execute SELECT `text("SELECT used_bytes, limit_bytes FROM quotas WHERE user_id = :uid")`; `q = quota_row.fetchone()`; delete the pending Document via `await session.delete(doc)`; best-effort `try: await get_storage_backend().delete_object(doc.object_key) except Exception: pass`; `await session.commit()`; raise `HTTPException(status_code=413, detail={"used_bytes": q.used_bytes if q else 0, "limit_bytes": q.limit_bytes if q else 0, "rejected_bytes": size})`. On success path: `doc.status = "uploaded"`; `await session.commit()`; `extract_and_classify.delay(str(doc.id))`; return `{"id": str(doc.id), "size_bytes": size, "used_bytes": (row.used_bytes if row else 0), "status": "uploaded"}`. If `doc.user_id is None` (Wave 2 only): skip quota update entirely, set `doc.status = "uploaded"`, commit, enqueue, return `{"id": str(doc.id), "size_bytes": size, "used_bytes": 0, "status": "uploaded"}`.
Endpoints GET `""` (list), GET `/{doc_id}` (get), DELETE `/{doc_id}` (delete), POST `/{doc_id}/classify` (reclassify): preserve the current handler bodies verbatim from existing `backend/api/documents.py` (lines 70-118) but rebound to the new router. NOTE: this plan does NOT yet add `Depends(get_current_user)` — Plan 03-03 adds auth wiring. Anonymous handlers remain in Wave 2.
Modify `backend/services/storage.py`:
- Delete `save_upload(session, file_bytes, original_name, mime_type)` function entirely (lines ~86-136) and its entry in `__all__`.
- At top imports add `from sqlalchemy import text` (current import is `from sqlalchemy import select, delete` — extend the line).
- Modify `delete_document(session, doc_id)`: after the existing `try: await _backend().delete_object(doc.object_key) ... except` block and BEFORE `await session.delete(doc)`, insert:
```
if doc.user_id is not None:
await session.execute(
text("UPDATE quotas SET used_bytes = GREATEST(0, used_bytes - :delta) WHERE user_id = :uid"),
{"delta": doc.size_bytes, "uid": str(doc.user_id)},
)
```
(Comment: "Atomic quota decrement (STORE-06, D-07). The user_id is None guard is removed in Plan 03-03.")
Modify `backend/api/auth.py`: append a `GET /me/quota` handler after the existing `/me` handler. Handler:
```
@router.get("/me/quota")
async def get_my_quota(
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_db),
):
q = await session.get(Quota, current_user.id)
if q is None:
raise HTTPException(status_code=404, detail="Quota not found")
return {"used_bytes": q.used_bytes, "limit_bytes": q.limit_bytes}
```
`Quota` is already imported on the existing `from db.models import BackupCode, Quota, RefreshToken, User` line — verify, no new import needed.
Modify `backend/tasks/document_tasks.py`: append at end of file (after `_run`):
```
@celery_app.task(name="tasks.document_tasks.cleanup_abandoned_uploads")
def cleanup_abandoned_uploads() -> dict:
"""Periodic Celery beat task — deletes Document rows with status='pending'
older than 1 hour and their MinIO objects (D-06).
"""
return asyncio.run(_cleanup_abandoned())
async def _cleanup_abandoned() -> dict:
from datetime import datetime, timezone, timedelta
from sqlalchemy import select
from db.session import AsyncSessionLocal
from db.models import Document
from storage import get_storage_backend
cutoff = datetime.now(timezone.utc) - timedelta(hours=1)
async with AsyncSessionLocal() as session:
result = await session.execute(
select(Document).where(
Document.status == "pending",
Document.created_at < cutoff,
)
)
docs = result.scalars().all()
backend = get_storage_backend()
cleaned = 0
for doc in docs:
try:
if doc.object_key:
await backend.delete_object(doc.object_key)
except Exception:
pass # MinIO object may not exist yet — safe to ignore
await session.delete(doc)
cleaned += 1
await session.commit()
return {"cleaned": cleaned}
```
Modify `backend/celery_app.py`: at module top add `from datetime import timedelta as _timedelta`. After the existing `celery_app.conf.task_routes = {...}` block (before `autodiscover_tasks`), append:
```
celery_app.conf.beat_schedule = {
"cleanup-abandoned-uploads": {
"task": "tasks.document_tasks.cleanup_abandoned_uploads",
"schedule": _timedelta(minutes=30),
},
}
celery_app.conf.timezone = "UTC"
```
cd backend && pytest tests/test_documents.py::test_upload_url_endpoint tests/test_documents.py::test_confirm_endpoint tests/test_documents.py::test_get_quota tests/test_quota.py -x -q 2>&1 | tail -30 && grep -c "generate_presigned_put_url" backend/api/documents.py && grep -c "stat_object" backend/api/documents.py && grep -c "UPDATE quotas" backend/api/documents.py && grep -v '^:space:*#' backend/services/storage.py | grep -c "GREATEST(0, used_bytes" && grep -c "cleanup_abandoned_uploads" backend/tasks/document_tasks.py && grep -c "beat_schedule" backend/celery_app.py && grep -c "/me/quota" backend/api/auth.py
All 7 listed pytest IDs pass (or xpass under strict=False). `backend/api/documents.py` contains `/upload-url`, `/{doc_id}/confirm`, calls `generate_presigned_put_url` + `stat_object` + atomic `UPDATE quotas`. `backend/services/storage.py` contains `GREATEST(0, used_bytes`. `backend/api/auth.py` contains `/me/quota`. `backend/tasks/document_tasks.py` contains `cleanup_abandoned_uploads`. `backend/celery_app.py` contains `beat_schedule`. All grep counts >= 1.
- Atomic quota race test green: `cd backend && pytest tests/test_quota.py::test_concurrent_quota_race -x -q` (SC2 for Phase 3)
- Quota rejection shape green: `cd backend && pytest tests/test_quota.py::test_quota_exceeded_response -x -q`
- Quota decrement on delete green: `cd backend && pytest tests/test_quota.py::test_delete_decrements_quota -x -q`
- Presigned upload endpoints green: `cd backend && pytest tests/test_documents.py::test_upload_url_endpoint tests/test_documents.py::test_confirm_endpoint -x -q`
- Quota read endpoint green: `cd backend && pytest tests/test_documents.py::test_get_quota -x -q`
- No legacy /upload route remains: `cd backend && grep -c '"/upload"' backend/api/documents.py` returns 0
<success_criteria>
- StorageBackend ABC has
generate_presigned_put_urlandstat_objectabstract methods - MinIOBackend has dual MinIO client instances; presigned URL uses public client
- POST /api/documents/upload-url, POST /api/documents/{id}/confirm, GET /api/auth/me/quota exist and respond as documented
- DELETE /api/documents/{id} decrements quota atomically when doc.user_id is not None
- POST /api/documents/upload (multipart legacy) is removed; services.storage.save_upload is removed
- Celery beat schedule includes cleanup-abandoned-uploads every 30 minutes
- docker-compose.yml has MINIO_API_CORS_ALLOW_ORIGIN on minio service, MINIO_PUBLIC_ENDPOINT on backend service, and celery-beat service
- All Plan 03-01 stubs for STORE-03 / STORE-04 / STORE-05 / STORE-06 / D-05 pass or xpass </success_criteria>