---
phase: 05-cloud-storage-backends
plan: 02
type: execute
wave: 2
depends_on:
- "05-01"
files_modified:
- backend/storage/cloud_utils.py
- backend/services/cloud_cache.py
- backend/storage/__init__.py
autonomous: true
requirements:
- CLOUD-02
- CLOUD-07
must_haves:
truths:
- "validate_cloud_url() blocks all RFC-1918, loopback, and link-local addresses"
- "encrypt_credentials / decrypt_credentials produce a correct round-trip for any dict"
- "get_storage_backend_for_document() factory returns the correct backend type from document.storage_backend"
- "TTLCache singleton is module-level in cloud_cache.py with maxsize=1000, ttl=60"
artifacts:
- path: "backend/storage/cloud_utils.py"
provides: "SSRF validation + HKDF credential encryption"
contains: "def validate_cloud_url"
- path: "backend/services/cloud_cache.py"
provides: "TTLCache singleton for cloud folder listings"
contains: "get_cloud_folders_cached"
- path: "backend/storage/__init__.py"
provides: "Extended factory for cloud backends"
contains: "get_storage_backend_for_document"
key_links:
- from: "backend/storage/cloud_utils.py"
to: "backend/config.py"
via: "settings.cloud_creds_key"
pattern: "cloud_creds_key"
- from: "backend/storage/__init__.py"
to: "backend/storage/cloud_utils.py"
via: "decrypt_credentials import"
pattern: "decrypt_credentials"
---
Create the shared utilities layer for Phase 5: SSRF-safe URL validation, HKDF+Fernet credential encryption/decryption, TTLCache for folder listings, and the extended storage backend factory.
Purpose: All cloud backends and API handlers depend on these primitives. Establishing them before the backends prevents duplication and ensures security invariants are enforced in one place.
Output: cloud_utils.py (validate_cloud_url, encrypt_credentials, decrypt_credentials), cloud_cache.py (TTLCache singleton), updated storage/__init__.py (get_storage_backend_for_document factory).
@/Users/nik/.claude/get-shit-done/workflows/execute-plan.md
@/Users/nik/.claude/get-shit-done/templates/summary.md
@.planning/PROJECT.md
@.planning/ROADMAP.md
@.planning/phases/05-cloud-storage-backends/05-CONTEXT.md
@.planning/phases/05-cloud-storage-backends/05-RESEARCH.md
@.planning/phases/05-cloud-storage-backends/05-01-SUMMARY.md
From backend/storage/__init__.py:
def get_storage_backend() -> StorageBackend:
"""Returns MinIOBackend configured from settings."""
From backend/db/models.py:
Document: storage_backend (String, nullable=False, default="minio"), user_id (UUID nullable)
CloudConnection: id (UUID), user_id (UUID FK), provider (String), credentials_enc (Text),
status (String), connected_at (TIMESTAMP)
User: id (UUID), default_storage_backend (String, default="minio")
From backend/config.py (after Plan 01):
settings.cloud_creds_key: str
settings.minio_endpoint, minio_access_key, minio_secret_key, minio_bucket, minio_public_endpoint
From backend/storage/minio_backend.py:
class MinIOBackend(StorageBackend): -- reference asyncio.to_thread() pattern
RESEARCH.md Pattern 6: SSRF validation using ipaddress + socket.getaddrinfo.
RESEARCH.md Pattern 2: HKDF+Fernet — fresh HKDF instance per call (AlreadyFinalized pitfall).
RESEARCH.md Pattern 8: TTLCache thread safety — threading.Lock required for concurrent access.
RESEARCH.md Pattern 9: get_storage_backend_for_document factory extension.
Task 1: Create cloud_utils.py — SSRF validation + HKDF credential encryption
backend/storage/cloud_utils.py
- backend/storage/base.py — StorageBackend ABC, 7 method signatures
- backend/config.py — settings.cloud_creds_key field name
- .planning/phases/05-cloud-storage-backends/05-RESEARCH.md — Pattern 2 (HKDF+Fernet) and Pattern 6 (SSRF)
- validate_cloud_url(url: str) -> None raises ValueError for: localhost, 127.0.0.0/8, 169.254.0.0/16, 10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16, ::1/128, fc00::/7
- validate_cloud_url resolves DNS via socket.getaddrinfo before checking IP (anti-DNS-rebinding: resolves hostname to IP then checks IP against blocked networks)
- validate_cloud_url raises ValueError for non-http/https schemes
- validate_cloud_url raises ValueError for URLs with no hostname
- _derive_fernet_key(master_key: bytes, user_id: str) -> Fernet: creates a fresh HKDF instance on every call (never reuses); uses algorithm=hashes.SHA256(), length=32, salt=user_id.encode("utf-8"), info=b"cloud-credentials"
- encrypt_credentials(master_key: bytes, user_id: str, credentials: dict) -> str: returns Fernet-encrypted JSON string (not plaintext)
- decrypt_credentials(master_key: bytes, user_id: str, credentials_enc: str) -> dict: returns original dict
- Round-trip: decrypt_credentials(master_key, uid, encrypt_credentials(master_key, uid, creds)) == creds
Create backend/storage/cloud_utils.py with module docstring explaining SSRF prevention and HKDF pattern.
Implement validate_cloud_url(url: str) -> None:
- Import: ipaddress, socket, urllib.parse.urlparse
- Parse URL; reject non-http/https schemes; reject missing hostname
- Define BLOCKED_NETS list: ip_network("127.0.0.0/8"), ip_network("169.254.0.0/16"),
ip_network("10.0.0.0/8"), ip_network("172.16.0.0/12"), ip_network("192.168.0.0/16"),
ip_network("::1/128"), ip_network("fc00::/7")
- Also explicitly block hostname == "localhost" string before IP resolution
- Try ipaddress.ip_address(hostname) — if that fails (not a raw IP), use
socket.getaddrinfo(hostname, None)[0][4][0] to resolve; wrap socket.gaierror
- Check resolved IP against each BLOCKED_NETS entry using addr in net
Implement _derive_fernet_key(master_key: bytes, user_id: str) -> Fernet:
- Import: base64, cryptography.hazmat.primitives.hashes, cryptography.hazmat.primitives.kdf.hkdf.HKDF, cryptography.fernet.Fernet
- Create new HKDF(...) instance each call — do NOT cache or store the instance
- Call hkdf.derive(master_key) → 32 raw bytes
- Return Fernet(base64.urlsafe_b64encode(raw_key))
Implement encrypt_credentials(master_key: bytes, user_id: str, credentials: dict) -> str:
- import json inside function body (or at top)
- Call _derive_fernet_key to get a Fernet instance
- Return f.encrypt(json.dumps(credentials).encode("utf-8")).decode("utf-8")
Implement decrypt_credentials(master_key: bytes, user_id: str, credentials_enc: str) -> dict:
- Call _derive_fernet_key to get a Fernet instance
- Return json.loads(f.decrypt(credentials_enc.encode("utf-8")))
cd /Users/nik/Documents/Progamming/document_scanner/backend && python -c "
from storage.cloud_utils import validate_cloud_url, encrypt_credentials, decrypt_credentials
import pytest
# SSRF check
try:
validate_cloud_url('http://127.0.0.1/dav')
print('FAIL: loopback should raise')
except ValueError:
print('OK: loopback blocked')
try:
validate_cloud_url('http://10.0.0.1/dav')
print('FAIL: RFC-1918 should raise')
except ValueError:
print('OK: RFC-1918 blocked')
# Round-trip
mk = b'test-master-key-32bytes-padded!!'
uid = '550e8400-e29b-41d4-a716-446655440000'
creds = {'access_token': 'ya29.xxx', 'refresh_token': '1//xxx'}
enc = encrypt_credentials(mk, uid, creds)
assert enc != str(creds)
dec = decrypt_credentials(mk, uid, enc)
assert dec == creds, f'Round-trip failed: {dec}'
print('OK: encryption round-trip')
"
- backend/storage/cloud_utils.py contains def validate_cloud_url, def encrypt_credentials, def decrypt_credentials, def _derive_fernet_key
- validate_cloud_url("http://127.0.0.1/dav") raises ValueError
- validate_cloud_url("http://10.0.0.1/dav") raises ValueError
- validate_cloud_url("http://169.254.169.254/dav") raises ValueError
- validate_cloud_url("http://192.168.1.1/dav") raises ValueError
- validate_cloud_url("http://localhost/dav") raises ValueError
- Encryption round-trip: decrypt_credentials(key, uid, encrypt_credentials(key, uid, creds)) == creds
- "access_token" plaintext does NOT appear in the encrypted string
cloud_utils.py created; SSRF validation blocks all 5 network categories; HKDF round-trip verified via python -c invocation
Task 2: Create cloud_cache.py and extend storage factory
backend/services/cloud_cache.py, backend/storage/__init__.py
- backend/storage/__init__.py — current get_storage_backend() factory
- backend/storage/base.py — StorageBackend ABC
- backend/storage/minio_backend.py — MinIOBackend constructor signature
- backend/db/models.py — CloudConnection, Document, User model fields
- .planning/phases/05-cloud-storage-backends/05-RESEARCH.md — Pattern 8 (TTLCache), Pattern 9 (factory extension)
- backend/services/cloud_cache.py exports a module-level _folder_cache = TTLCache(maxsize=1000, ttl=60) and a threading.Lock()
- get_cloud_folders_cached(user_id: str, provider: str, folder_id: str, fetch_fn: Awaitable) is an async function that checks cache before calling fetch_fn
- get_storage_backend_for_document(document, user, session) is an async function added to backend/storage/__init__.py that returns MinIOBackend for storage_backend=="minio" and raises HTTPException(503) for unknown or inactive cloud connections
- existing get_storage_backend() function in __init__.py is NOT modified (existing callers unaffected)
- get_storage_backend_for_document raises HTTPException(503, detail="Cloud connection not found or inactive") when CloudConnection is missing or status != "ACTIVE"
Create backend/services/cloud_cache.py:
- Import: threading, cachetools.TTLCache, typing.Callable, typing.Awaitable
- Module-level: _folder_cache: TTLCache = TTLCache(maxsize=1000, ttl=60)
- Module-level: _folder_cache_lock = threading.Lock()
- async function get_cloud_folders_cached(user_id: str, provider: str, folder_id: str, fetch_fn) -> list:
cache_key = f"{user_id}:{provider}:{folder_id}"
with _folder_cache_lock: check if cache_key in _folder_cache; return cached if found
result = await fetch_fn() # called OUTSIDE the lock to not block event loop
with _folder_cache_lock: store result in cache
return result
- Function invalidate_provider_cache(user_id: str, provider: str) -> None: iterates
_folder_cache with lock and deletes all keys starting with f"{user_id}:{provider}:"
Extend backend/storage/__init__.py (add after existing get_storage_backend()):
- Import at top of file: select from sqlalchemy, HTTPException from fastapi, AsyncSession from sqlalchemy.ext.asyncio, Optional from typing
- Import: from db.models import CloudConnection, Document, User
- Import: from config import settings
- Import: from storage.cloud_utils import decrypt_credentials
- Add async function get_storage_backend_for_document(document, user, session: AsyncSession) -> StorageBackend:
If document.storage_backend == "minio": return get_storage_backend() (existing factory)
Otherwise: query CloudConnection where user_id=user.id AND provider=document.storage_backend AND status="ACTIVE"
If not found: raise HTTPException(status_code=503, detail="Cloud connection not found or inactive")
Decrypt credentials: master_key = settings.cloud_creds_key.encode(); credentials = decrypt_credentials(master_key, str(user.id), conn.credentials_enc)
If provider == "google_drive": import GoogleDriveBackend; return GoogleDriveBackend(credentials)
Elif provider == "onedrive": import OneDriveBackend; return OneDriveBackend(credentials)
Elif provider in ("nextcloud", "webdav"): import WebDAVBackend; return WebDAVBackend(credentials["server_url"], credentials["username"], credentials["password"])
Else: raise ValueError(f"Unknown storage backend: {document.storage_backend}")
Use lazy imports (inside the function) for cloud backends to avoid circular imports at module load time.
cd /Users/nik/Documents/Progamming/document_scanner/backend && python -c "
from services.cloud_cache import get_cloud_folders_cached, _folder_cache, _folder_cache_lock, invalidate_provider_cache
from storage import get_storage_backend, get_storage_backend_for_document
print('cloud_cache imports OK')
print('factory extension imports OK')
print(f'TTLCache maxsize={_folder_cache.maxsize}, ttl={_folder_cache.ttl}')
"
- backend/services/cloud_cache.py exists and exports _folder_cache (TTLCache), _folder_cache_lock (Lock), get_cloud_folders_cached (async), invalidate_provider_cache
- _folder_cache.maxsize == 1000 and _folder_cache.ttl == 60
- backend/storage/__init__.py exports get_storage_backend_for_document (async function)
- `from storage import get_storage_backend_for_document` imports without error
- Existing `from storage import get_storage_backend` still works (no regression)
- `python -m pytest -v --tb=short` passes with 0 failures (no import regressions)
cloud_cache.py created with TTLCache singleton and cache/invalidate helpers; storage/__init__.py has get_storage_backend_for_document; full pytest suite passes
## Trust Boundaries
| Boundary | Description |
|----------|-------------|
| user-supplied URL → validate_cloud_url | Untrusted URL must be checked against SSRF blocklist before any HTTP call |
| credentials dict → Fernet ciphertext | Credentials must never appear in plaintext after this layer |
| DNS resolution → IP check | DNS-based SSRF bypass: hostname resolves to internal IP after validation |
## STRIDE Threat Register
| Threat ID | Category | Component | Disposition | Mitigation Plan |
|-----------|----------|-----------|-------------|-----------------|
| T-05-02-01 | Tampering | validate_cloud_url — DNS resolution | mitigate | socket.getaddrinfo resolves hostname to IP before network check; validate_cloud_url called immediately before each request (not only at connect-time) per D-17; DNS rebinding window is minimized |
| T-05-02-02 | Information Disclosure | _derive_fernet_key — HKDF instance reuse | mitigate | New HKDF(...) instance created on every _derive_fernet_key call; AlreadyFinalized pitfall (RESEARCH.md Pitfall 3) prevented by construction |
| T-05-02-03 | Information Disclosure | cloud_creds_key default value | mitigate | Default "CHANGEME-32-bytes-padded!!" is clearly a placeholder; production deployment requires CLOUD_CREDS_KEY env var; docstring on Settings field documents the requirement |
| T-05-02-04 | Elevation of Privilege | get_storage_backend_for_document — cross-user | mitigate | Function receives user object from get_regular_user dep; CloudConnection query includes user_id=user.id filter; cross-user access impossible via this function |
| T-05-02-SC | Tampering | cachetools package install | mitigate | cachetools verified [OK] in RESEARCH.md slopcheck audit |
cd /Users/nik/Documents/Progamming/document_scanner/backend && python -m pytest tests/test_cloud.py -v && python -m pytest -v --tb=short 2>&1 | tail -10
- cloud_utils.py: validate_cloud_url blocks RFC-1918/loopback/link-local; HKDF round-trip correct
- cloud_cache.py: TTLCache(maxsize=1000, ttl=60) with thread-safe lock; get_cloud_folders_cached works
- storage/__init__.py: get_storage_backend_for_document added alongside existing get_storage_backend()
- pytest -v exits 0, 0 failures; test_cloud.py still all xfailed