--- phase: 05-cloud-storage-backends plan: 02 type: execute wave: 2 depends_on: - "05-01" files_modified: - backend/storage/cloud_utils.py - backend/services/cloud_cache.py - backend/storage/__init__.py autonomous: true requirements: - CLOUD-02 - CLOUD-07 must_haves: truths: - "validate_cloud_url() blocks all RFC-1918, loopback, and link-local addresses" - "encrypt_credentials / decrypt_credentials produce a correct round-trip for any dict" - "get_storage_backend_for_document() factory returns the correct backend type from document.storage_backend" - "TTLCache singleton is module-level in cloud_cache.py with maxsize=1000, ttl=60" artifacts: - path: "backend/storage/cloud_utils.py" provides: "SSRF validation + HKDF credential encryption" contains: "def validate_cloud_url" - path: "backend/services/cloud_cache.py" provides: "TTLCache singleton for cloud folder listings" contains: "get_cloud_folders_cached" - path: "backend/storage/__init__.py" provides: "Extended factory for cloud backends" contains: "get_storage_backend_for_document" key_links: - from: "backend/storage/cloud_utils.py" to: "backend/config.py" via: "settings.cloud_creds_key" pattern: "cloud_creds_key" - from: "backend/storage/__init__.py" to: "backend/storage/cloud_utils.py" via: "decrypt_credentials import" pattern: "decrypt_credentials" --- Create the shared utilities layer for Phase 5: SSRF-safe URL validation, HKDF+Fernet credential encryption/decryption, TTLCache for folder listings, and the extended storage backend factory. Purpose: All cloud backends and API handlers depend on these primitives. Establishing them before the backends prevents duplication and ensures security invariants are enforced in one place. Output: cloud_utils.py (validate_cloud_url, encrypt_credentials, decrypt_credentials), cloud_cache.py (TTLCache singleton), updated storage/__init__.py (get_storage_backend_for_document factory). @/Users/nik/.claude/get-shit-done/workflows/execute-plan.md @/Users/nik/.claude/get-shit-done/templates/summary.md @.planning/PROJECT.md @.planning/ROADMAP.md @.planning/phases/05-cloud-storage-backends/05-CONTEXT.md @.planning/phases/05-cloud-storage-backends/05-RESEARCH.md @.planning/phases/05-cloud-storage-backends/05-01-SUMMARY.md From backend/storage/__init__.py: def get_storage_backend() -> StorageBackend: """Returns MinIOBackend configured from settings.""" From backend/db/models.py: Document: storage_backend (String, nullable=False, default="minio"), user_id (UUID nullable) CloudConnection: id (UUID), user_id (UUID FK), provider (String), credentials_enc (Text), status (String), connected_at (TIMESTAMP) User: id (UUID), default_storage_backend (String, default="minio") From backend/config.py (after Plan 01): settings.cloud_creds_key: str settings.minio_endpoint, minio_access_key, minio_secret_key, minio_bucket, minio_public_endpoint From backend/storage/minio_backend.py: class MinIOBackend(StorageBackend): -- reference asyncio.to_thread() pattern RESEARCH.md Pattern 6: SSRF validation using ipaddress + socket.getaddrinfo. RESEARCH.md Pattern 2: HKDF+Fernet — fresh HKDF instance per call (AlreadyFinalized pitfall). RESEARCH.md Pattern 8: TTLCache thread safety — threading.Lock required for concurrent access. RESEARCH.md Pattern 9: get_storage_backend_for_document factory extension. Task 1: Create cloud_utils.py — SSRF validation + HKDF credential encryption backend/storage/cloud_utils.py - backend/storage/base.py — StorageBackend ABC, 7 method signatures - backend/config.py — settings.cloud_creds_key field name - .planning/phases/05-cloud-storage-backends/05-RESEARCH.md — Pattern 2 (HKDF+Fernet) and Pattern 6 (SSRF) - validate_cloud_url(url: str) -> None raises ValueError for: localhost, 127.0.0.0/8, 169.254.0.0/16, 10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16, ::1/128, fc00::/7 - validate_cloud_url resolves DNS via socket.getaddrinfo before checking IP (anti-DNS-rebinding: resolves hostname to IP then checks IP against blocked networks) - validate_cloud_url raises ValueError for non-http/https schemes - validate_cloud_url raises ValueError for URLs with no hostname - _derive_fernet_key(master_key: bytes, user_id: str) -> Fernet: creates a fresh HKDF instance on every call (never reuses); uses algorithm=hashes.SHA256(), length=32, salt=user_id.encode("utf-8"), info=b"cloud-credentials" - encrypt_credentials(master_key: bytes, user_id: str, credentials: dict) -> str: returns Fernet-encrypted JSON string (not plaintext) - decrypt_credentials(master_key: bytes, user_id: str, credentials_enc: str) -> dict: returns original dict - Round-trip: decrypt_credentials(master_key, uid, encrypt_credentials(master_key, uid, creds)) == creds Create backend/storage/cloud_utils.py with module docstring explaining SSRF prevention and HKDF pattern. Implement validate_cloud_url(url: str) -> None: - Import: ipaddress, socket, urllib.parse.urlparse - Parse URL; reject non-http/https schemes; reject missing hostname - Define BLOCKED_NETS list: ip_network("127.0.0.0/8"), ip_network("169.254.0.0/16"), ip_network("10.0.0.0/8"), ip_network("172.16.0.0/12"), ip_network("192.168.0.0/16"), ip_network("::1/128"), ip_network("fc00::/7") - Also explicitly block hostname == "localhost" string before IP resolution - Try ipaddress.ip_address(hostname) — if that fails (not a raw IP), use socket.getaddrinfo(hostname, None)[0][4][0] to resolve; wrap socket.gaierror - Check resolved IP against each BLOCKED_NETS entry using addr in net Implement _derive_fernet_key(master_key: bytes, user_id: str) -> Fernet: - Import: base64, cryptography.hazmat.primitives.hashes, cryptography.hazmat.primitives.kdf.hkdf.HKDF, cryptography.fernet.Fernet - Create new HKDF(...) instance each call — do NOT cache or store the instance - Call hkdf.derive(master_key) → 32 raw bytes - Return Fernet(base64.urlsafe_b64encode(raw_key)) Implement encrypt_credentials(master_key: bytes, user_id: str, credentials: dict) -> str: - import json inside function body (or at top) - Call _derive_fernet_key to get a Fernet instance - Return f.encrypt(json.dumps(credentials).encode("utf-8")).decode("utf-8") Implement decrypt_credentials(master_key: bytes, user_id: str, credentials_enc: str) -> dict: - Call _derive_fernet_key to get a Fernet instance - Return json.loads(f.decrypt(credentials_enc.encode("utf-8"))) cd /Users/nik/Documents/Progamming/document_scanner/backend && python -c " from storage.cloud_utils import validate_cloud_url, encrypt_credentials, decrypt_credentials import pytest # SSRF check try: validate_cloud_url('http://127.0.0.1/dav') print('FAIL: loopback should raise') except ValueError: print('OK: loopback blocked') try: validate_cloud_url('http://10.0.0.1/dav') print('FAIL: RFC-1918 should raise') except ValueError: print('OK: RFC-1918 blocked') # Round-trip mk = b'test-master-key-32bytes-padded!!' uid = '550e8400-e29b-41d4-a716-446655440000' creds = {'access_token': 'ya29.xxx', 'refresh_token': '1//xxx'} enc = encrypt_credentials(mk, uid, creds) assert enc != str(creds) dec = decrypt_credentials(mk, uid, enc) assert dec == creds, f'Round-trip failed: {dec}' print('OK: encryption round-trip') " - backend/storage/cloud_utils.py contains def validate_cloud_url, def encrypt_credentials, def decrypt_credentials, def _derive_fernet_key - validate_cloud_url("http://127.0.0.1/dav") raises ValueError - validate_cloud_url("http://10.0.0.1/dav") raises ValueError - validate_cloud_url("http://169.254.169.254/dav") raises ValueError - validate_cloud_url("http://192.168.1.1/dav") raises ValueError - validate_cloud_url("http://localhost/dav") raises ValueError - Encryption round-trip: decrypt_credentials(key, uid, encrypt_credentials(key, uid, creds)) == creds - "access_token" plaintext does NOT appear in the encrypted string cloud_utils.py created; SSRF validation blocks all 5 network categories; HKDF round-trip verified via python -c invocation Task 2: Create cloud_cache.py and extend storage factory backend/services/cloud_cache.py, backend/storage/__init__.py - backend/storage/__init__.py — current get_storage_backend() factory - backend/storage/base.py — StorageBackend ABC - backend/storage/minio_backend.py — MinIOBackend constructor signature - backend/db/models.py — CloudConnection, Document, User model fields - .planning/phases/05-cloud-storage-backends/05-RESEARCH.md — Pattern 8 (TTLCache), Pattern 9 (factory extension) - backend/services/cloud_cache.py exports a module-level _folder_cache = TTLCache(maxsize=1000, ttl=60) and a threading.Lock() - get_cloud_folders_cached(user_id: str, provider: str, folder_id: str, fetch_fn: Awaitable) is an async function that checks cache before calling fetch_fn - get_storage_backend_for_document(document, user, session) is an async function added to backend/storage/__init__.py that returns MinIOBackend for storage_backend=="minio" and raises HTTPException(503) for unknown or inactive cloud connections - existing get_storage_backend() function in __init__.py is NOT modified (existing callers unaffected) - get_storage_backend_for_document raises HTTPException(503, detail="Cloud connection not found or inactive") when CloudConnection is missing or status != "ACTIVE" Create backend/services/cloud_cache.py: - Import: threading, cachetools.TTLCache, typing.Callable, typing.Awaitable - Module-level: _folder_cache: TTLCache = TTLCache(maxsize=1000, ttl=60) - Module-level: _folder_cache_lock = threading.Lock() - async function get_cloud_folders_cached(user_id: str, provider: str, folder_id: str, fetch_fn) -> list: cache_key = f"{user_id}:{provider}:{folder_id}" with _folder_cache_lock: check if cache_key in _folder_cache; return cached if found result = await fetch_fn() # called OUTSIDE the lock to not block event loop with _folder_cache_lock: store result in cache return result - Function invalidate_provider_cache(user_id: str, provider: str) -> None: iterates _folder_cache with lock and deletes all keys starting with f"{user_id}:{provider}:" Extend backend/storage/__init__.py (add after existing get_storage_backend()): - Import at top of file: select from sqlalchemy, HTTPException from fastapi, AsyncSession from sqlalchemy.ext.asyncio, Optional from typing - Import: from db.models import CloudConnection, Document, User - Import: from config import settings - Import: from storage.cloud_utils import decrypt_credentials - Add async function get_storage_backend_for_document(document, user, session: AsyncSession) -> StorageBackend: If document.storage_backend == "minio": return get_storage_backend() (existing factory) Otherwise: query CloudConnection where user_id=user.id AND provider=document.storage_backend AND status="ACTIVE" If not found: raise HTTPException(status_code=503, detail="Cloud connection not found or inactive") Decrypt credentials: master_key = settings.cloud_creds_key.encode(); credentials = decrypt_credentials(master_key, str(user.id), conn.credentials_enc) If provider == "google_drive": import GoogleDriveBackend; return GoogleDriveBackend(credentials) Elif provider == "onedrive": import OneDriveBackend; return OneDriveBackend(credentials) Elif provider in ("nextcloud", "webdav"): import WebDAVBackend; return WebDAVBackend(credentials["server_url"], credentials["username"], credentials["password"]) Else: raise ValueError(f"Unknown storage backend: {document.storage_backend}") Use lazy imports (inside the function) for cloud backends to avoid circular imports at module load time. cd /Users/nik/Documents/Progamming/document_scanner/backend && python -c " from services.cloud_cache import get_cloud_folders_cached, _folder_cache, _folder_cache_lock, invalidate_provider_cache from storage import get_storage_backend, get_storage_backend_for_document print('cloud_cache imports OK') print('factory extension imports OK') print(f'TTLCache maxsize={_folder_cache.maxsize}, ttl={_folder_cache.ttl}') " - backend/services/cloud_cache.py exists and exports _folder_cache (TTLCache), _folder_cache_lock (Lock), get_cloud_folders_cached (async), invalidate_provider_cache - _folder_cache.maxsize == 1000 and _folder_cache.ttl == 60 - backend/storage/__init__.py exports get_storage_backend_for_document (async function) - `from storage import get_storage_backend_for_document` imports without error - Existing `from storage import get_storage_backend` still works (no regression) - `python -m pytest -v --tb=short` passes with 0 failures (no import regressions) cloud_cache.py created with TTLCache singleton and cache/invalidate helpers; storage/__init__.py has get_storage_backend_for_document; full pytest suite passes ## Trust Boundaries | Boundary | Description | |----------|-------------| | user-supplied URL → validate_cloud_url | Untrusted URL must be checked against SSRF blocklist before any HTTP call | | credentials dict → Fernet ciphertext | Credentials must never appear in plaintext after this layer | | DNS resolution → IP check | DNS-based SSRF bypass: hostname resolves to internal IP after validation | ## STRIDE Threat Register | Threat ID | Category | Component | Disposition | Mitigation Plan | |-----------|----------|-----------|-------------|-----------------| | T-05-02-01 | Tampering | validate_cloud_url — DNS resolution | mitigate | socket.getaddrinfo resolves hostname to IP before network check; validate_cloud_url called immediately before each request (not only at connect-time) per D-17; DNS rebinding window is minimized | | T-05-02-02 | Information Disclosure | _derive_fernet_key — HKDF instance reuse | mitigate | New HKDF(...) instance created on every _derive_fernet_key call; AlreadyFinalized pitfall (RESEARCH.md Pitfall 3) prevented by construction | | T-05-02-03 | Information Disclosure | cloud_creds_key default value | mitigate | Default "CHANGEME-32-bytes-padded!!" is clearly a placeholder; production deployment requires CLOUD_CREDS_KEY env var; docstring on Settings field documents the requirement | | T-05-02-04 | Elevation of Privilege | get_storage_backend_for_document — cross-user | mitigate | Function receives user object from get_regular_user dep; CloudConnection query includes user_id=user.id filter; cross-user access impossible via this function | | T-05-02-SC | Tampering | cachetools package install | mitigate | cachetools verified [OK] in RESEARCH.md slopcheck audit | cd /Users/nik/Documents/Progamming/document_scanner/backend && python -m pytest tests/test_cloud.py -v && python -m pytest -v --tb=short 2>&1 | tail -10 - cloud_utils.py: validate_cloud_url blocks RFC-1918/loopback/link-local; HKDF round-trip correct - cloud_cache.py: TTLCache(maxsize=1000, ttl=60) with thread-safe lock; get_cloud_folders_cached works - storage/__init__.py: get_storage_backend_for_document added alongside existing get_storage_backend() - pytest -v exits 0, 0 failures; test_cloud.py still all xfailed Create `.planning/phases/05-cloud-storage-backends/05-02-SUMMARY.md` when done