Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
16 KiB
phase, plan, type, wave, depends_on, files_modified, autonomous, requirements, must_haves
| phase | plan | type | wave | depends_on | files_modified | autonomous | requirements | must_haves | |||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 05-cloud-storage-backends | 02 | execute | 2 |
|
|
true |
|
|
Purpose: All cloud backends and API handlers depend on these primitives. Establishing them before the backends prevents duplication and ensures security invariants are enforced in one place. Output: cloud_utils.py (validate_cloud_url, encrypt_credentials, decrypt_credentials), cloud_cache.py (TTLCache singleton), updated storage/__init__.py (get_storage_backend_for_document factory).
<execution_context> @/Users/nik/.claude/get-shit-done/workflows/execute-plan.md @/Users/nik/.claude/get-shit-done/templates/summary.md </execution_context>
@.planning/PROJECT.md @.planning/ROADMAP.md @.planning/phases/05-cloud-storage-backends/05-CONTEXT.md @.planning/phases/05-cloud-storage-backends/05-RESEARCH.md @.planning/phases/05-cloud-storage-backends/05-01-SUMMARY.md From backend/storage/__init__.py: def get_storage_backend() -> StorageBackend: """Returns MinIOBackend configured from settings."""From backend/db/models.py: Document: storage_backend (String, nullable=False, default="minio"), user_id (UUID nullable) CloudConnection: id (UUID), user_id (UUID FK), provider (String), credentials_enc (Text), status (String), connected_at (TIMESTAMP) User: id (UUID), default_storage_backend (String, default="minio")
From backend/config.py (after Plan 01): settings.cloud_creds_key: str settings.minio_endpoint, minio_access_key, minio_secret_key, minio_bucket, minio_public_endpoint
From backend/storage/minio_backend.py: class MinIOBackend(StorageBackend): -- reference asyncio.to_thread() pattern
RESEARCH.md Pattern 6: SSRF validation using ipaddress + socket.getaddrinfo. RESEARCH.md Pattern 2: HKDF+Fernet — fresh HKDF instance per call (AlreadyFinalized pitfall). RESEARCH.md Pattern 8: TTLCache thread safety — threading.Lock required for concurrent access. RESEARCH.md Pattern 9: get_storage_backend_for_document factory extension.
Task 1: Create cloud_utils.py — SSRF validation + HKDF credential encryption backend/storage/cloud_utils.py - backend/storage/base.py — StorageBackend ABC, 7 method signatures - backend/config.py — settings.cloud_creds_key field name - .planning/phases/05-cloud-storage-backends/05-RESEARCH.md — Pattern 2 (HKDF+Fernet) and Pattern 6 (SSRF) - validate_cloud_url(url: str) -> None raises ValueError for: localhost, 127.0.0.0/8, 169.254.0.0/16, 10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16, ::1/128, fc00::/7 - validate_cloud_url resolves DNS via socket.getaddrinfo before checking IP (anti-DNS-rebinding: resolves hostname to IP then checks IP against blocked networks) - validate_cloud_url raises ValueError for non-http/https schemes - validate_cloud_url raises ValueError for URLs with no hostname - _derive_fernet_key(master_key: bytes, user_id: str) -> Fernet: creates a fresh HKDF instance on every call (never reuses); uses algorithm=hashes.SHA256(), length=32, salt=user_id.encode("utf-8"), info=b"cloud-credentials" - encrypt_credentials(master_key: bytes, user_id: str, credentials: dict) -> str: returns Fernet-encrypted JSON string (not plaintext) - decrypt_credentials(master_key: bytes, user_id: str, credentials_enc: str) -> dict: returns original dict - Round-trip: decrypt_credentials(master_key, uid, encrypt_credentials(master_key, uid, creds)) == creds Create backend/storage/cloud_utils.py with module docstring explaining SSRF prevention and HKDF pattern.Implement validate_cloud_url(url: str) -> None:
- Import: ipaddress, socket, urllib.parse.urlparse
- Parse URL; reject non-http/https schemes; reject missing hostname
- Define BLOCKED_NETS list: ip_network("127.0.0.0/8"), ip_network("169.254.0.0/16"),
ip_network("10.0.0.0/8"), ip_network("172.16.0.0/12"), ip_network("192.168.0.0/16"),
ip_network("::1/128"), ip_network("fc00::/7")
- Also explicitly block hostname == "localhost" string before IP resolution
- Try ipaddress.ip_address(hostname) — if that fails (not a raw IP), use
socket.getaddrinfo(hostname, None)[0][4][0] to resolve; wrap socket.gaierror
- Check resolved IP against each BLOCKED_NETS entry using addr in net
Implement _derive_fernet_key(master_key: bytes, user_id: str) -> Fernet:
- Import: base64, cryptography.hazmat.primitives.hashes, cryptography.hazmat.primitives.kdf.hkdf.HKDF, cryptography.fernet.Fernet
- Create new HKDF(...) instance each call — do NOT cache or store the instance
- Call hkdf.derive(master_key) → 32 raw bytes
- Return Fernet(base64.urlsafe_b64encode(raw_key))
Implement encrypt_credentials(master_key: bytes, user_id: str, credentials: dict) -> str:
- import json inside function body (or at top)
- Call _derive_fernet_key to get a Fernet instance
- Return f.encrypt(json.dumps(credentials).encode("utf-8")).decode("utf-8")
Implement decrypt_credentials(master_key: bytes, user_id: str, credentials_enc: str) -> dict:
- Call _derive_fernet_key to get a Fernet instance
- Return json.loads(f.decrypt(credentials_enc.encode("utf-8")))
cd /Users/nik/Documents/Progamming/document_scanner/backend && python -c "
from storage.cloud_utils import validate_cloud_url, encrypt_credentials, decrypt_credentials
import pytest
# SSRF check
try:
validate_cloud_url('http://127.0.0.1/dav')
print('FAIL: loopback should raise')
except ValueError:
print('OK: loopback blocked')
try:
validate_cloud_url('http://10.0.0.1/dav')
print('FAIL: RFC-1918 should raise')
except ValueError:
print('OK: RFC-1918 blocked')
# Round-trip
mk = b'test-master-key-32bytes-padded!!'
uid = '550e8400-e29b-41d4-a716-446655440000'
creds = {'access_token': 'ya29.xxx', 'refresh_token': '1//xxx'}
enc = encrypt_credentials(mk, uid, creds)
assert enc != str(creds)
dec = decrypt_credentials(mk, uid, enc)
assert dec == creds, f'Round-trip failed: {dec}'
print('OK: encryption round-trip')
"
- backend/storage/cloud_utils.py contains def validate_cloud_url, def encrypt_credentials, def decrypt_credentials, def _derive_fernet_key
- validate_cloud_url("http://127.0.0.1/dav") raises ValueError
- validate_cloud_url("http://10.0.0.1/dav") raises ValueError
- validate_cloud_url("http://169.254.169.254/dav") raises ValueError
- validate_cloud_url("http://192.168.1.1/dav") raises ValueError
- validate_cloud_url("http://localhost/dav") raises ValueError
- Encryption round-trip: decrypt_credentials(key, uid, encrypt_credentials(key, uid, creds)) == creds
- "access_token" plaintext does NOT appear in the encrypted string
cloud_utils.py created; SSRF validation blocks all 5 network categories; HKDF round-trip verified via python -c invocation
Task 2: Create cloud_cache.py and extend storage factory
backend/services/cloud_cache.py, backend/storage/__init__.py
- backend/storage/__init__.py — current get_storage_backend() factory
- backend/storage/base.py — StorageBackend ABC
- backend/storage/minio_backend.py — MinIOBackend constructor signature
- backend/db/models.py — CloudConnection, Document, User model fields
- .planning/phases/05-cloud-storage-backends/05-RESEARCH.md — Pattern 8 (TTLCache), Pattern 9 (factory extension)
- backend/services/cloud_cache.py exports a module-level _folder_cache = TTLCache(maxsize=1000, ttl=60) and a threading.Lock()
- get_cloud_folders_cached(user_id: str, provider: str, folder_id: str, fetch_fn: Awaitable) is an async function that checks cache before calling fetch_fn
- get_storage_backend_for_document(document, user, session) is an async function added to backend/storage/__init__.py that returns MinIOBackend for storage_backend=="minio" and raises HTTPException(503) for unknown or inactive cloud connections
- existing get_storage_backend() function in __init__.py is NOT modified (existing callers unaffected)
- get_storage_backend_for_document raises HTTPException(503, detail="Cloud connection not found or inactive") when CloudConnection is missing or status != "ACTIVE"
Create backend/services/cloud_cache.py:
- Import: threading, cachetools.TTLCache, typing.Callable, typing.Awaitable
- Module-level: _folder_cache: TTLCache = TTLCache(maxsize=1000, ttl=60)
- Module-level: _folder_cache_lock = threading.Lock()
- async function get_cloud_folders_cached(user_id: str, provider: str, folder_id: str, fetch_fn) -> list:
cache_key = f"{user_id}:{provider}:{folder_id}"
with _folder_cache_lock: check if cache_key in _folder_cache; return cached if found
result = await fetch_fn() # called OUTSIDE the lock to not block event loop
with _folder_cache_lock: store result in cache
return result
- Function invalidate_provider_cache(user_id: str, provider: str) -> None: iterates
_folder_cache with lock and deletes all keys starting with f"{user_id}:{provider}:"
Extend backend/storage/__init__.py (add after existing get_storage_backend()):
- Import at top of file: select from sqlalchemy, HTTPException from fastapi, AsyncSession from sqlalchemy.ext.asyncio, Optional from typing
- Import: from db.models import CloudConnection, Document, User
- Import: from config import settings
- Import: from storage.cloud_utils import decrypt_credentials
- Add async function get_storage_backend_for_document(document, user, session: AsyncSession) -> StorageBackend:
If document.storage_backend == "minio": return get_storage_backend() (existing factory)
Otherwise: query CloudConnection where user_id=user.id AND provider=document.storage_backend AND status="ACTIVE"
If not found: raise HTTPException(status_code=503, detail="Cloud connection not found or inactive")
Decrypt credentials: master_key = settings.cloud_creds_key.encode(); credentials = decrypt_credentials(master_key, str(user.id), conn.credentials_enc)
If provider == "google_drive": import GoogleDriveBackend; return GoogleDriveBackend(credentials)
Elif provider == "onedrive": import OneDriveBackend; return OneDriveBackend(credentials)
Elif provider in ("nextcloud", "webdav"): import WebDAVBackend; return WebDAVBackend(credentials["server_url"], credentials["username"], credentials["password"])
Else: raise ValueError(f"Unknown storage backend: {document.storage_backend}")
Use lazy imports (inside the function) for cloud backends to avoid circular imports at module load time.
cd /Users/nik/Documents/Progamming/document_scanner/backend && python -c "
from services.cloud_cache import get_cloud_folders_cached, _folder_cache, _folder_cache_lock, invalidate_provider_cache
from storage import get_storage_backend, get_storage_backend_for_document
print('cloud_cache imports OK')
print('factory extension imports OK')
print(f'TTLCache maxsize={_folder_cache.maxsize}, ttl={_folder_cache.ttl}')
"
- backend/services/cloud_cache.py exists and exports _folder_cache (TTLCache), _folder_cache_lock (Lock), get_cloud_folders_cached (async), invalidate_provider_cache
- _folder_cache.maxsize == 1000 and _folder_cache.ttl == 60
- backend/storage/__init__.py exports get_storage_backend_for_document (async function)
- `from storage import get_storage_backend_for_document` imports without error
- Existing `from storage import get_storage_backend` still works (no regression)
- `python -m pytest -v --tb=short` passes with 0 failures (no import regressions)
cloud_cache.py created with TTLCache singleton and cache/invalidate helpers; storage/__init__.py has get_storage_backend_for_document; full pytest suite passes
<threat_model>
Trust Boundaries
| Boundary | Description |
|---|---|
| user-supplied URL → validate_cloud_url | Untrusted URL must be checked against SSRF blocklist before any HTTP call |
| credentials dict → Fernet ciphertext | Credentials must never appear in plaintext after this layer |
| DNS resolution → IP check | DNS-based SSRF bypass: hostname resolves to internal IP after validation |
STRIDE Threat Register
| Threat ID | Category | Component | Disposition | Mitigation Plan |
|---|---|---|---|---|
| T-05-02-01 | Tampering | validate_cloud_url — DNS resolution | mitigate | socket.getaddrinfo resolves hostname to IP before network check; validate_cloud_url called immediately before each request (not only at connect-time) per D-17; DNS rebinding window is minimized |
| T-05-02-02 | Information Disclosure | _derive_fernet_key — HKDF instance reuse | mitigate | New HKDF(...) instance created on every _derive_fernet_key call; AlreadyFinalized pitfall (RESEARCH.md Pitfall 3) prevented by construction |
| T-05-02-03 | Information Disclosure | cloud_creds_key default value | mitigate | Default "CHANGEME-32-bytes-padded!!" is clearly a placeholder; production deployment requires CLOUD_CREDS_KEY env var; docstring on Settings field documents the requirement |
| T-05-02-04 | Elevation of Privilege | get_storage_backend_for_document — cross-user | mitigate | Function receives user object from get_regular_user dep; CloudConnection query includes user_id=user.id filter; cross-user access impossible via this function |
| T-05-02-SC | Tampering | cachetools package install | mitigate | cachetools verified [OK] in RESEARCH.md slopcheck audit |
| </threat_model> |
<success_criteria>
- cloud_utils.py: validate_cloud_url blocks RFC-1918/loopback/link-local; HKDF round-trip correct
- cloud_cache.py: TTLCache(maxsize=1000, ttl=60) with thread-safe lock; get_cloud_folders_cached works
- storage/__init__.py: get_storage_backend_for_document added alongside existing get_storage_backend()
- pytest -v exits 0, 0 failures; test_cloud.py still all xfailed </success_criteria>