docs(03): create Phase 3 execution plan — document migration & multi-user isolation
5 plans across 5 sequential waves covering: Alembic migration 0003 (null-user cleanup, NOT NULL constraint, quota reconciliation), presigned MinIO PUT upload flow with atomic quota enforcement, auth guards on all document/topic endpoints, flat-file settings retirement + per-user AI classification, and frontend quota bar with 3-step XHR upload progress. Verification passed across all 12 dimensions. All 8 phase requirements covered (STORE-03/04/05/06, SEC-04, DOC-03/04/05). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,384 @@
|
||||
---
|
||||
phase: 03-document-migration-multi-user-isolation
|
||||
plan: 02
|
||||
type: execute
|
||||
wave: 2
|
||||
depends_on:
|
||||
- 03-01
|
||||
files_modified:
|
||||
- backend/storage/base.py
|
||||
- backend/storage/minio_backend.py
|
||||
- backend/storage/__init__.py
|
||||
- backend/config.py
|
||||
- backend/api/documents.py
|
||||
- backend/api/auth.py
|
||||
- backend/services/storage.py
|
||||
- backend/tasks/document_tasks.py
|
||||
- backend/celery_app.py
|
||||
- docker-compose.yml
|
||||
autonomous: true
|
||||
requirements:
|
||||
- STORE-03
|
||||
- STORE-04
|
||||
- STORE-05
|
||||
- STORE-06
|
||||
- SEC-04
|
||||
|
||||
must_haves:
|
||||
truths:
|
||||
- "Frontend can request a presigned PUT URL targeting a browser-resolvable hostname (not 'minio:9000') for a newly created pending Document row"
|
||||
- "After the browser PUT, /confirm reads the authoritative file size from MinIO stat_object and atomically updates the user's quota, returning 413 with {used_bytes, limit_bytes, rejected_bytes} on overflow"
|
||||
- "Two concurrent /confirm calls that would together exceed the quota result in exactly one 200 and one 413"
|
||||
- "Document delete decrements quota using GREATEST(0, used_bytes - delta) atomically"
|
||||
- "Authenticated user can read their own quota via GET /api/auth/me/quota"
|
||||
- "Celery beat runs cleanup_abandoned_uploads every 30 minutes, deleting pending Document rows older than 1 hour and removing their MinIO objects"
|
||||
- "Browser PUT requests succeed against MinIO because MINIO_API_CORS_ALLOW_ORIGIN is set to the configured CORS origin"
|
||||
artifacts:
|
||||
- path: "backend/storage/base.py"
|
||||
provides: "StorageBackend ABC with generate_presigned_put_url and stat_object abstract methods"
|
||||
contains: "generate_presigned_put_url"
|
||||
- path: "backend/storage/minio_backend.py"
|
||||
provides: "MinIOBackend dual-client (internal + public) + generate_presigned_put_url + stat_object methods"
|
||||
contains: "self._public_client"
|
||||
- path: "backend/storage/__init__.py"
|
||||
provides: "get_storage_backend factory passes public_endpoint to MinIOBackend"
|
||||
contains: "public_endpoint"
|
||||
- path: "backend/api/documents.py"
|
||||
provides: "POST /api/documents/upload-url + POST /api/documents/{id}/confirm endpoints; old /upload removed"
|
||||
contains: "/upload-url"
|
||||
- path: "backend/api/auth.py"
|
||||
provides: "GET /api/auth/me/quota endpoint"
|
||||
contains: "/me/quota"
|
||||
- path: "backend/services/storage.py"
|
||||
provides: "delete_document decrements quota atomically; save_upload removed"
|
||||
contains: "GREATEST(0, used_bytes"
|
||||
- path: "backend/tasks/document_tasks.py"
|
||||
provides: "cleanup_abandoned_uploads Celery task + _cleanup_abandoned async body"
|
||||
contains: "cleanup_abandoned_uploads"
|
||||
- path: "backend/celery_app.py"
|
||||
provides: "beat_schedule entry for cleanup_abandoned_uploads every 30 minutes"
|
||||
contains: "beat_schedule"
|
||||
- path: "docker-compose.yml"
|
||||
provides: "MINIO_API_CORS_ALLOW_ORIGIN + MINIO_PUBLIC_ENDPOINT env vars; celery-beat service"
|
||||
contains: "MINIO_API_CORS_ALLOW_ORIGIN"
|
||||
key_links:
|
||||
- from: "backend/api/documents.py"
|
||||
to: "backend/storage/minio_backend.py"
|
||||
via: "backend.generate_presigned_put_url(object_key) and backend.stat_object(object_key)"
|
||||
pattern: "generate_presigned_put_url|stat_object"
|
||||
- from: "backend/api/documents.py"
|
||||
to: "quotas table"
|
||||
via: "atomic UPDATE quotas SET used_bytes = used_bytes + :delta WHERE (used_bytes + :delta) <= limit_bytes RETURNING used_bytes"
|
||||
pattern: "UPDATE quotas"
|
||||
- from: "backend/api/documents.py"
|
||||
to: "Celery (tasks.document_tasks.extract_and_classify)"
|
||||
via: "extract_and_classify.delay(str(doc.id)) after successful confirm"
|
||||
pattern: "extract_and_classify\\.delay"
|
||||
- from: "backend/celery_app.py"
|
||||
to: "backend/tasks/document_tasks.py"
|
||||
via: "beat_schedule: tasks.document_tasks.cleanup_abandoned_uploads every 30m"
|
||||
pattern: "cleanup_abandoned_uploads"
|
||||
---
|
||||
|
||||
<objective>
|
||||
Implement the presigned upload backend per CONTEXT.md D-04..D-07: replace multipart POST /upload with a 3-step flow (upload-url → browser PUT direct to MinIO → confirm) using the atomic quota UPDATE pattern from CLAUDE.md. Add GET /api/auth/me/quota for the sidebar quota bar. Wire the abandoned-upload cleanup Celery beat task per D-06.
|
||||
|
||||
Purpose: This plan is the only place the atomic quota SQL lives. Plan 03-03's auth guards depend on the endpoint surface defined here. Plan 03-05's frontend depends on the API contract defined here.
|
||||
Output: 6 backend code changes + 1 docker-compose change. After this plan, an authenticated user can perform the full presigned upload/delete/quota cycle from curl; auth guards still allow anonymous access (closed in Plan 03-03).
|
||||
</objective>
|
||||
|
||||
<execution_context>
|
||||
@/Users/nik/.claude/get-shit-done/workflows/execute-plan.md
|
||||
@/Users/nik/.claude/get-shit-done/templates/summary.md
|
||||
</execution_context>
|
||||
|
||||
<context>
|
||||
@.planning/ROADMAP.md
|
||||
@.planning/STATE.md
|
||||
@.planning/phases/03-document-migration-multi-user-isolation/03-CONTEXT.md
|
||||
@.planning/phases/03-document-migration-multi-user-isolation/03-RESEARCH.md
|
||||
@.planning/phases/03-document-migration-multi-user-isolation/03-PATTERNS.md
|
||||
@.planning/phases/03-document-migration-multi-user-isolation/03-UI-SPEC.md
|
||||
@.planning/phases/03-document-migration-multi-user-isolation/03-VALIDATION.md
|
||||
@.planning/phases/03-document-migration-multi-user-isolation/03-01-SUMMARY.md
|
||||
@CLAUDE.md
|
||||
|
||||
@backend/storage/base.py
|
||||
@backend/storage/minio_backend.py
|
||||
@backend/storage/__init__.py
|
||||
@backend/api/documents.py
|
||||
@backend/api/auth.py
|
||||
@backend/services/storage.py
|
||||
@backend/tasks/document_tasks.py
|
||||
@backend/celery_app.py
|
||||
@backend/config.py
|
||||
@docker-compose.yml
|
||||
|
||||
<interfaces>
|
||||
<!-- Contracts the executor needs without re-reading the codebase. -->
|
||||
|
||||
From backend/storage/base.py (current — Phase 1):
|
||||
```
|
||||
class StorageBackend(ABC):
|
||||
@abstractmethod
|
||||
async def put_object(self, user_id, document_id, file_bytes, extension, content_type) -> str
|
||||
@abstractmethod
|
||||
async def get_object(self, object_key) -> bytes
|
||||
@abstractmethod
|
||||
async def delete_object(self, object_key) -> None
|
||||
@abstractmethod
|
||||
async def presigned_get_url(self, object_key, expires_minutes=60) -> str
|
||||
@abstractmethod
|
||||
async def health_check(self) -> bool
|
||||
```
|
||||
|
||||
From backend/storage/minio_backend.py (current):
|
||||
```
|
||||
class MinIOBackend(StorageBackend):
|
||||
def __init__(self, endpoint, access_key, secret_key, bucket, secure=False) -> None
|
||||
# Each method wraps the synchronous Minio SDK in asyncio.to_thread.
|
||||
```
|
||||
|
||||
From CONTEXT.md decisions (D-05):
|
||||
POST /api/documents/upload-url body: {"filename": str, "content_type": str}
|
||||
Returns: {"upload_url": str, "document_id": str}
|
||||
POST /api/documents/{id}/confirm body: empty
|
||||
Returns: {"id": str, "size_bytes": int, "used_bytes": int, "status": "uploaded"}
|
||||
Quota exceeded: HTTP 413 detail = {"used_bytes": int, "limit_bytes": int, "rejected_bytes": int}
|
||||
|
||||
From backend/services/auth.py (Phase 2):
|
||||
def create_access_token(user_id: str, role: str) -> str
|
||||
|
||||
From backend/db/models.py:
|
||||
Document.user_id — Plan 03-01 migration flips to NOT NULL
|
||||
Document.status — 'pending' | 'uploaded' | 'classified' | 'classification_failed'
|
||||
Document.object_key
|
||||
Document.size_bytes
|
||||
Quota.limit_bytes — default 104857600 (100 MB)
|
||||
Quota.used_bytes
|
||||
|
||||
From backend/celery_app.py (current):
|
||||
celery_app.conf.task_routes = {
|
||||
"tasks.document_tasks.*": {"queue": "documents"},
|
||||
"tasks.email_tasks.*": {"queue": "email"},
|
||||
}
|
||||
</interfaces>
|
||||
</context>
|
||||
|
||||
<threat_model>
|
||||
## Trust Boundaries
|
||||
|
||||
| Boundary | Description |
|
||||
|----------|-------------|
|
||||
| browser → MinIO (direct PUT) | Untrusted browser uses time-limited presigned URL; MinIO authenticates via HMAC signature; CORS preflight must succeed |
|
||||
| browser → FastAPI /confirm | Authenticated user provides only document_id; FastAPI reads size_bytes from MinIO stat (never from client) |
|
||||
| FastAPI /confirm → quotas table | Concurrent /confirm calls race against the same Quota row; PostgreSQL row-level lock + atomic UPDATE WHERE clause prevents overflow |
|
||||
| Celery beat → DB+MinIO | Runs as docuvault_app; deletes its own rows (status='pending', age > 1h) and own MinIO objects only |
|
||||
|
||||
## STRIDE Threat Register
|
||||
|
||||
| Threat ID | Category | Component | Disposition | Mitigation Plan |
|
||||
|-----------|----------|-----------|-------------|-----------------|
|
||||
| T-03-04 | Spoofing | POST /api/documents/upload-url body filename/content_type | mitigate | object_key is computed server-side as {user_id}/{document_id}/{uuid4()}{ext} (STORE-02) — filename stored in DB column only; extension derived from Path(body.filename).suffix.lower() |
|
||||
| T-03-05 | Tampering | Confirm endpoint quota delta | mitigate | size_bytes always comes from backend.stat_object(doc.object_key) (MinIO authoritative) — never from client body or request param |
|
||||
| T-03-06 | Denial of Service | Concurrent /confirm uploads at quota boundary (SC2) | mitigate | Atomic SQL UPDATE quotas SET used_bytes = used_bytes + :delta WHERE (used_bytes + :delta) <= limit_bytes RETURNING used_bytes; fetchone() None → HTTP 413 (RESEARCH.md Finding 4 + Risk 2) |
|
||||
| T-03-07 | Information Disclosure | Presigned URL leakage in logs | accept | 15-min TTL, single object key; leak risk acceptable for v1; do not log full URL — log only document_id |
|
||||
| T-03-08 | Repudiation | Abandoned uploads pile up MinIO orphans | mitigate | Celery beat cleanup_abandoned_uploads every 30m deletes pending docs older than 1 hour and their MinIO objects (D-06) |
|
||||
| T-03-09 | Information Disclosure | Browser→MinIO PUT CORS misconfiguration leaks origin | mitigate | MINIO_API_CORS_ALLOW_ORIGIN env var explicitly set to ${CORS_ORIGINS} (or http://localhost:5173 default) — not wildcard |
|
||||
| T-03-10 | Tampering | Docker hostname in presigned URL (minio:9000 not browser-resolvable) | mitigate | Dual MinIO client: self._client (internal endpoint, used for stat/get/put/delete) and self._public_client (public endpoint, used for generate_presigned_put_url) per RESEARCH.md Finding 3 |
|
||||
| T-03-SC | Tampering | pip installs | mitigate | No new package installs (minio + sqlalchemy already pinned in Phase 1 requirements.txt) |
|
||||
</threat_model>
|
||||
|
||||
<tasks>
|
||||
|
||||
<task type="auto" tdd="true">
|
||||
<name>Task 1: Extend StorageBackend ABC + MinIOBackend with dual client, presigned PUT, stat_object; add config knobs and docker-compose env</name>
|
||||
<files>backend/storage/base.py, backend/storage/minio_backend.py, backend/storage/__init__.py, backend/config.py, docker-compose.yml</files>
|
||||
<read_first>
|
||||
- backend/storage/base.py — existing ABC method signatures and docstring style
|
||||
- backend/storage/minio_backend.py — existing constructor, asyncio.to_thread pattern, presigned_get_url example
|
||||
- backend/storage/__init__.py — current get_storage_backend factory
|
||||
- backend/config.py — Settings class shape, SettingsConfigDict usage, existing optional-string fields
|
||||
- docker-compose.yml — current minio service environment block, current celery-worker shape (model for celery-beat)
|
||||
- .planning/phases/03-document-migration-multi-user-isolation/03-RESEARCH.md — Finding 2 (presigned_put_object signature), Finding 3 (dual-client + CORS), Finding 5 (stat_object), Finding 10 (celery-beat service block)
|
||||
</read_first>
|
||||
<behavior>
|
||||
- StorageBackend ABC gains two new abstract methods: generate_presigned_put_url(object_key, expires_minutes=15) -> str and stat_object(object_key) -> int
|
||||
- MinIOBackend constructor accepts optional public_endpoint: str | None = None; stores self._public_client (Minio instance pointing at public_endpoint or falling back to endpoint)
|
||||
- MinIOBackend.generate_presigned_put_url uses self._public_client.presigned_put_object via asyncio.to_thread with timedelta(minutes=expires_minutes)
|
||||
- MinIOBackend.stat_object uses self._client.stat_object via asyncio.to_thread and returns .size (int)
|
||||
- get_storage_backend() reads settings.minio_public_endpoint and passes it to MinIOBackend
|
||||
- config.py adds minio_public_endpoint: str = "" (empty falls back to minio_endpoint inside MinIOBackend)
|
||||
- docker-compose.yml minio service env adds MINIO_API_CORS_ALLOW_ORIGIN: ${CORS_ORIGINS:-http://localhost:5173}; backend service env adds MINIO_PUBLIC_ENDPOINT; new celery-beat service runs celery -A celery_app beat --loglevel=info
|
||||
</behavior>
|
||||
<action>
|
||||
Modify `backend/storage/base.py`: append two `@abstractmethod async def` blocks after `presigned_get_url`, named `generate_presigned_put_url(self, object_key: str, expires_minutes: int = 15) -> str` and `stat_object(self, object_key: str) -> int`. Docstrings cite RESEARCH.md Finding 3 (public client requirement) and Finding 5 (returns authoritative size).
|
||||
|
||||
Modify `backend/storage/minio_backend.py`: extend `__init__` signature with `public_endpoint: str | None = None`; after the existing `self._client = Minio(...)` block add `self._public_client = Minio(endpoint=(public_endpoint or endpoint), access_key=access_key, secret_key=secret_key, secure=secure)`. Append two new async methods (after `presigned_get_url`):
|
||||
- `generate_presigned_put_url(self, object_key, expires_minutes=15)` — `return await asyncio.to_thread(self._public_client.presigned_put_object, self._bucket, object_key, timedelta(minutes=expires_minutes))`
|
||||
- `stat_object(self, object_key)` — `result = await asyncio.to_thread(self._client.stat_object, self._bucket, object_key); return result.size`
|
||||
|
||||
Modify `backend/storage/__init__.py`: extend `get_storage_backend()` to pass `public_endpoint=settings.minio_public_endpoint` to the `MinIOBackend(...)` constructor.
|
||||
|
||||
Modify `backend/config.py`: add `minio_public_endpoint: str = ""` to the `Settings` class alongside the existing `minio_endpoint` field. Comment reference: `# RESEARCH.md Finding 3 — browser-resolvable hostname for presigned URLs`.
|
||||
|
||||
Modify `docker-compose.yml`:
|
||||
1. In the `minio:` service `environment:` block, add `MINIO_API_CORS_ALLOW_ORIGIN: ${CORS_ORIGINS:-http://localhost:5173}` (RESEARCH.md Finding 3, T-03-09).
|
||||
2. In the `backend:` service `environment:` block, add `- MINIO_PUBLIC_ENDPOINT=${MINIO_PUBLIC_ENDPOINT:-localhost:9000}`.
|
||||
3. Append a new `celery-beat:` service mirroring `celery-worker` structure with `command: celery -A celery_app beat --loglevel=info` and the same `depends_on`, environment block, and build context (RESEARCH.md Finding 10).
|
||||
</action>
|
||||
<verify>
|
||||
<automated>cd backend && python -c "from storage.base import StorageBackend; assert 'generate_presigned_put_url' in dir(StorageBackend); assert 'stat_object' in dir(StorageBackend); from storage.minio_backend import MinIOBackend; import inspect; sig = inspect.signature(MinIOBackend.__init__); assert 'public_endpoint' in sig.parameters; print('OK')" && grep -c "MINIO_API_CORS_ALLOW_ORIGIN" docker-compose.yml && grep -c "celery-beat:" docker-compose.yml && grep -c "minio_public_endpoint" backend/config.py</automated>
|
||||
</verify>
|
||||
<done>
|
||||
StorageBackend ABC has 7 abstract methods (5 original + 2 new). MinIOBackend `__init__` accepts `public_endpoint`. docker-compose.yml contains `MINIO_API_CORS_ALLOW_ORIGIN` and `celery-beat:`. config.py contains `minio_public_endpoint`. All grep counts >= 1.
|
||||
</done>
|
||||
</task>
|
||||
|
||||
<task type="auto" tdd="true">
|
||||
<name>Task 2: Implement upload-url, confirm, delete-with-quota, me/quota endpoints + remove old /upload + abandoned-upload Celery beat</name>
|
||||
<files>backend/api/documents.py, backend/api/auth.py, backend/services/storage.py, backend/tasks/document_tasks.py, backend/celery_app.py</files>
|
||||
<read_first>
|
||||
- backend/api/documents.py — existing handler structure (will be replaced)
|
||||
- backend/api/auth.py — existing /me endpoint pattern, router prefix, `from db.models import BackupCode, Quota, RefreshToken, User` import line
|
||||
- backend/services/storage.py — existing save_upload, delete_document, _backend() helper, _doc_to_dict, list_metadata
|
||||
- backend/tasks/document_tasks.py — existing extract_and_classify task; pattern for sync entry + async body
|
||||
- backend/tasks/email_tasks.py — _run_send_security_alert shape for async task body
|
||||
- backend/celery_app.py — current task_routes block (model for beat_schedule append)
|
||||
- backend/deps/auth.py — get_current_user signature (this plan does NOT wire auth into documents.py — Plan 03-03 does)
|
||||
- .planning/phases/03-document-migration-multi-user-isolation/03-RESEARCH.md — Finding 4 (atomic quota SQL), Finding 5 (stat_object error handling), Finding 9 (/me/quota), Finding 10 (Celery beat schedule)
|
||||
- .planning/phases/03-document-migration-multi-user-isolation/03-PATTERNS.md — atomic quota pattern, Celery task pattern
|
||||
</read_first>
|
||||
<behavior>
|
||||
- POST /api/documents/upload-url body {filename: str, content_type: str} returns {upload_url: str, document_id: str} and inserts a Document row with user_id=None (Wave 2 placeholder — Plan 03-03 replaces with current_user.id), status="pending", object_key=f"null-user/{doc_id}/{uuid4()}{ext}", size_bytes=0
|
||||
- POST /api/documents/{id}/confirm: load Document by id; call backend.stat_object(doc.object_key); catch minio.error.S3Error code="NoSuchKey" → return 422; on success set doc.size_bytes; execute atomic quota UPDATE; on row None → DELETE pending Document + remove MinIO object, return 413 {used_bytes, limit_bytes, rejected_bytes}; on success set status="uploaded", commit, enqueue extract_and_classify.delay(str(doc.id)); return {id, size_bytes, used_bytes, status}
|
||||
- GET /api/auth/me/quota: load Quota by current_user.id; return {used_bytes, limit_bytes}; if quota missing return 404
|
||||
- DELETE /api/documents/{id}: existing handler retained but services.storage.delete_document gains an atomic quota decrement (UPDATE quotas SET used_bytes = GREATEST(0, used_bytes - :delta) WHERE user_id = :uid) immediately before row delete
|
||||
- Old POST /api/documents/upload (multipart) is removed entirely; services.storage.save_upload is removed
|
||||
- tasks/document_tasks.py adds cleanup_abandoned_uploads task and async _cleanup_abandoned body that selects status="pending" AND created_at < now()-1h, deletes MinIO objects (try/except), then session.delete(doc), commits; returns {"cleaned": int}
|
||||
- celery_app.py adds beat_schedule entry "cleanup-abandoned-uploads" → "tasks.document_tasks.cleanup_abandoned_uploads" with schedule timedelta(minutes=30) and timezone "UTC"
|
||||
- test_upload_url_endpoint, test_confirm_endpoint, test_get_quota, test_quota_increment_atomic, test_concurrent_quota_race, test_quota_exceeded_response, test_delete_decrements_quota transition from xfail → pass (or xpass under strict=False)
|
||||
</behavior>
|
||||
<action>
|
||||
Rewrite `backend/api/documents.py`. Module-level imports: `from __future__ import annotations`, `import uuid`, `from pathlib import Path`, `from typing import Optional`, `from fastapi import APIRouter, Depends, HTTPException, Query, status`, `from pydantic import BaseModel`, `from sqlalchemy import text`, `from sqlalchemy.ext.asyncio import AsyncSession`, `from db.models import Document, Quota`, `from deps.db import get_db`, `from services import classifier, storage`, `from tasks.document_tasks import extract_and_classify`, `from storage import get_storage_backend`, `from minio.error import S3Error`. Router: `router = APIRouter(prefix="/api/documents", tags=["documents"])`.
|
||||
|
||||
Define request model `class UploadUrlRequest(BaseModel): filename: str; content_type: str`. No response models — return plain dicts matching the documented shape.
|
||||
|
||||
Endpoint POST `/upload-url`: handler signature `async def request_upload_url(body: UploadUrlRequest, session: AsyncSession = Depends(get_db))`. Generate `doc_id = uuid.uuid4()`; `suffix = Path(body.filename).suffix.lower()`; `object_key = f"null-user/{doc_id}/{uuid.uuid4()}{suffix}"` (Plan 03-03 replaces `"null-user"` with `str(current_user.id)`); insert `Document(id=doc_id, user_id=None, filename=body.filename, content_type=body.content_type, size_bytes=0, storage_backend="minio", status="pending", object_key=object_key)`; `session.add(doc); await session.commit()`; call `upload_url = await get_storage_backend().generate_presigned_put_url(object_key, expires_minutes=15)`; return `{"upload_url": upload_url, "document_id": str(doc_id)}`.
|
||||
|
||||
Endpoint POST `/{doc_id}/confirm`: handler signature `async def confirm_upload(doc_id: str, session: AsyncSession = Depends(get_db))`. Parse `uid = uuid.UUID(doc_id)` (catch ValueError → 404); `doc = await session.get(Document, uid)`; if None → HTTPException(404, "Document not found"). Try `size = await get_storage_backend().stat_object(doc.object_key)`; catch `S3Error` where `getattr(e, "code", "") == "NoSuchKey"` → HTTPException(422, "Upload not found — presigned URL may have expired"). Set `doc.size_bytes = size`; `await session.flush()`. Conditional atomic quota update — if `doc.user_id is not None`: execute `text("UPDATE quotas SET used_bytes = used_bytes + :delta WHERE user_id = :uid AND (used_bytes + :delta) <= limit_bytes RETURNING used_bytes, limit_bytes")` with `{"delta": size, "uid": str(doc.user_id)}`; `row = result.fetchone()`. If `row is None` (quota exceeded): execute SELECT `text("SELECT used_bytes, limit_bytes FROM quotas WHERE user_id = :uid")`; `q = quota_row.fetchone()`; delete the pending Document via `await session.delete(doc)`; best-effort `try: await get_storage_backend().delete_object(doc.object_key) except Exception: pass`; `await session.commit()`; raise `HTTPException(status_code=413, detail={"used_bytes": q.used_bytes if q else 0, "limit_bytes": q.limit_bytes if q else 0, "rejected_bytes": size})`. On success path: `doc.status = "uploaded"`; `await session.commit()`; `extract_and_classify.delay(str(doc.id))`; return `{"id": str(doc.id), "size_bytes": size, "used_bytes": (row.used_bytes if row else 0), "status": "uploaded"}`. If `doc.user_id is None` (Wave 2 only): skip quota update entirely, set `doc.status = "uploaded"`, commit, enqueue, return `{"id": str(doc.id), "size_bytes": size, "used_bytes": 0, "status": "uploaded"}`.
|
||||
|
||||
Endpoints GET `""` (list), GET `/{doc_id}` (get), DELETE `/{doc_id}` (delete), POST `/{doc_id}/classify` (reclassify): preserve the current handler bodies verbatim from existing `backend/api/documents.py` (lines 70-118) but rebound to the new router. NOTE: this plan does NOT yet add `Depends(get_current_user)` — Plan 03-03 adds auth wiring. Anonymous handlers remain in Wave 2.
|
||||
|
||||
Modify `backend/services/storage.py`:
|
||||
- Delete `save_upload(session, file_bytes, original_name, mime_type)` function entirely (lines ~86-136) and its entry in `__all__`.
|
||||
- At top imports add `from sqlalchemy import text` (current import is `from sqlalchemy import select, delete` — extend the line).
|
||||
- Modify `delete_document(session, doc_id)`: after the existing `try: await _backend().delete_object(doc.object_key) ... except` block and BEFORE `await session.delete(doc)`, insert:
|
||||
```
|
||||
if doc.user_id is not None:
|
||||
await session.execute(
|
||||
text("UPDATE quotas SET used_bytes = GREATEST(0, used_bytes - :delta) WHERE user_id = :uid"),
|
||||
{"delta": doc.size_bytes, "uid": str(doc.user_id)},
|
||||
)
|
||||
```
|
||||
(Comment: "Atomic quota decrement (STORE-06, D-07). The user_id is None guard is removed in Plan 03-03.")
|
||||
|
||||
Modify `backend/api/auth.py`: append a `GET /me/quota` handler after the existing `/me` handler. Handler:
|
||||
```
|
||||
@router.get("/me/quota")
|
||||
async def get_my_quota(
|
||||
current_user: User = Depends(get_current_user),
|
||||
session: AsyncSession = Depends(get_db),
|
||||
):
|
||||
q = await session.get(Quota, current_user.id)
|
||||
if q is None:
|
||||
raise HTTPException(status_code=404, detail="Quota not found")
|
||||
return {"used_bytes": q.used_bytes, "limit_bytes": q.limit_bytes}
|
||||
```
|
||||
`Quota` is already imported on the existing `from db.models import BackupCode, Quota, RefreshToken, User` line — verify, no new import needed.
|
||||
|
||||
Modify `backend/tasks/document_tasks.py`: append at end of file (after `_run`):
|
||||
```
|
||||
@celery_app.task(name="tasks.document_tasks.cleanup_abandoned_uploads")
|
||||
def cleanup_abandoned_uploads() -> dict:
|
||||
"""Periodic Celery beat task — deletes Document rows with status='pending'
|
||||
older than 1 hour and their MinIO objects (D-06).
|
||||
"""
|
||||
return asyncio.run(_cleanup_abandoned())
|
||||
|
||||
|
||||
async def _cleanup_abandoned() -> dict:
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from sqlalchemy import select
|
||||
|
||||
from db.session import AsyncSessionLocal
|
||||
from db.models import Document
|
||||
from storage import get_storage_backend
|
||||
|
||||
cutoff = datetime.now(timezone.utc) - timedelta(hours=1)
|
||||
async with AsyncSessionLocal() as session:
|
||||
result = await session.execute(
|
||||
select(Document).where(
|
||||
Document.status == "pending",
|
||||
Document.created_at < cutoff,
|
||||
)
|
||||
)
|
||||
docs = result.scalars().all()
|
||||
backend = get_storage_backend()
|
||||
cleaned = 0
|
||||
for doc in docs:
|
||||
try:
|
||||
if doc.object_key:
|
||||
await backend.delete_object(doc.object_key)
|
||||
except Exception:
|
||||
pass # MinIO object may not exist yet — safe to ignore
|
||||
await session.delete(doc)
|
||||
cleaned += 1
|
||||
await session.commit()
|
||||
return {"cleaned": cleaned}
|
||||
```
|
||||
|
||||
Modify `backend/celery_app.py`: at module top add `from datetime import timedelta as _timedelta`. After the existing `celery_app.conf.task_routes = {...}` block (before `autodiscover_tasks`), append:
|
||||
```
|
||||
celery_app.conf.beat_schedule = {
|
||||
"cleanup-abandoned-uploads": {
|
||||
"task": "tasks.document_tasks.cleanup_abandoned_uploads",
|
||||
"schedule": _timedelta(minutes=30),
|
||||
},
|
||||
}
|
||||
celery_app.conf.timezone = "UTC"
|
||||
```
|
||||
</action>
|
||||
<verify>
|
||||
<automated>cd backend && pytest tests/test_documents.py::test_upload_url_endpoint tests/test_documents.py::test_confirm_endpoint tests/test_documents.py::test_get_quota tests/test_quota.py -x -q 2>&1 | tail -30 && grep -c "generate_presigned_put_url" backend/api/documents.py && grep -c "stat_object" backend/api/documents.py && grep -c "UPDATE quotas" backend/api/documents.py && grep -v '^[[:space:]]*#' backend/services/storage.py | grep -c "GREATEST(0, used_bytes" && grep -c "cleanup_abandoned_uploads" backend/tasks/document_tasks.py && grep -c "beat_schedule" backend/celery_app.py && grep -c "/me/quota" backend/api/auth.py</automated>
|
||||
</verify>
|
||||
<done>
|
||||
All 7 listed pytest IDs pass (or xpass under strict=False). `backend/api/documents.py` contains `/upload-url`, `/{doc_id}/confirm`, calls `generate_presigned_put_url` + `stat_object` + atomic `UPDATE quotas`. `backend/services/storage.py` contains `GREATEST(0, used_bytes`. `backend/api/auth.py` contains `/me/quota`. `backend/tasks/document_tasks.py` contains `cleanup_abandoned_uploads`. `backend/celery_app.py` contains `beat_schedule`. All grep counts >= 1.
|
||||
</done>
|
||||
</task>
|
||||
|
||||
</tasks>
|
||||
|
||||
<verification>
|
||||
- Atomic quota race test green: `cd backend && pytest tests/test_quota.py::test_concurrent_quota_race -x -q` (SC2 for Phase 3)
|
||||
- Quota rejection shape green: `cd backend && pytest tests/test_quota.py::test_quota_exceeded_response -x -q`
|
||||
- Quota decrement on delete green: `cd backend && pytest tests/test_quota.py::test_delete_decrements_quota -x -q`
|
||||
- Presigned upload endpoints green: `cd backend && pytest tests/test_documents.py::test_upload_url_endpoint tests/test_documents.py::test_confirm_endpoint -x -q`
|
||||
- Quota read endpoint green: `cd backend && pytest tests/test_documents.py::test_get_quota -x -q`
|
||||
- No legacy /upload route remains: `cd backend && grep -c '"/upload"' backend/api/documents.py` returns 0
|
||||
</verification>
|
||||
|
||||
<success_criteria>
|
||||
- StorageBackend ABC has `generate_presigned_put_url` and `stat_object` abstract methods
|
||||
- MinIOBackend has dual MinIO client instances; presigned URL uses public client
|
||||
- POST /api/documents/upload-url, POST /api/documents/{id}/confirm, GET /api/auth/me/quota exist and respond as documented
|
||||
- DELETE /api/documents/{id} decrements quota atomically when doc.user_id is not None
|
||||
- POST /api/documents/upload (multipart legacy) is removed; services.storage.save_upload is removed
|
||||
- Celery beat schedule includes cleanup-abandoned-uploads every 30 minutes
|
||||
- docker-compose.yml has MINIO_API_CORS_ALLOW_ORIGIN on minio service, MINIO_PUBLIC_ENDPOINT on backend service, and celery-beat service
|
||||
- All Plan 03-01 stubs for STORE-03 / STORE-04 / STORE-05 / STORE-06 / D-05 pass or xpass
|
||||
</success_criteria>
|
||||
|
||||
<output>
|
||||
Create `.planning/phases/03-document-migration-multi-user-isolation/03-02-SUMMARY.md` when done — include the exact request/response shapes for /upload-url and /confirm; note the temporary `doc.user_id is None` guard that Plan 03-03 must remove.
|
||||
</output>
|
||||
Reference in New Issue
Block a user