Files
2026-05-28 19:43:12 +02:00

16 KiB

phase, plan, type, wave, depends_on, files_modified, autonomous, requirements, must_haves
phase plan type wave depends_on files_modified autonomous requirements must_haves
05-cloud-storage-backends 02 execute 2
05-01
backend/storage/cloud_utils.py
backend/services/cloud_cache.py
backend/storage/__init__.py
true
CLOUD-02
CLOUD-07
truths artifacts key_links
validate_cloud_url() blocks all RFC-1918, loopback, and link-local addresses
encrypt_credentials / decrypt_credentials produce a correct round-trip for any dict
get_storage_backend_for_document() factory returns the correct backend type from document.storage_backend
TTLCache singleton is module-level in cloud_cache.py with maxsize=1000, ttl=60
path provides contains
backend/storage/cloud_utils.py SSRF validation + HKDF credential encryption def validate_cloud_url
path provides contains
backend/services/cloud_cache.py TTLCache singleton for cloud folder listings get_cloud_folders_cached
path provides contains
backend/storage/__init__.py Extended factory for cloud backends get_storage_backend_for_document
from to via pattern
backend/storage/cloud_utils.py backend/config.py settings.cloud_creds_key cloud_creds_key
from to via pattern
backend/storage/__init__.py backend/storage/cloud_utils.py decrypt_credentials import decrypt_credentials
Create the shared utilities layer for Phase 5: SSRF-safe URL validation, HKDF+Fernet credential encryption/decryption, TTLCache for folder listings, and the extended storage backend factory.

Purpose: All cloud backends and API handlers depend on these primitives. Establishing them before the backends prevents duplication and ensures security invariants are enforced in one place. Output: cloud_utils.py (validate_cloud_url, encrypt_credentials, decrypt_credentials), cloud_cache.py (TTLCache singleton), updated storage/__init__.py (get_storage_backend_for_document factory).

<execution_context> @/Users/nik/.claude/get-shit-done/workflows/execute-plan.md @/Users/nik/.claude/get-shit-done/templates/summary.md </execution_context>

@.planning/PROJECT.md @.planning/ROADMAP.md @.planning/phases/05-cloud-storage-backends/05-CONTEXT.md @.planning/phases/05-cloud-storage-backends/05-RESEARCH.md @.planning/phases/05-cloud-storage-backends/05-01-SUMMARY.md From backend/storage/__init__.py: def get_storage_backend() -> StorageBackend: """Returns MinIOBackend configured from settings."""

From backend/db/models.py: Document: storage_backend (String, nullable=False, default="minio"), user_id (UUID nullable) CloudConnection: id (UUID), user_id (UUID FK), provider (String), credentials_enc (Text), status (String), connected_at (TIMESTAMP) User: id (UUID), default_storage_backend (String, default="minio")

From backend/config.py (after Plan 01): settings.cloud_creds_key: str settings.minio_endpoint, minio_access_key, minio_secret_key, minio_bucket, minio_public_endpoint

From backend/storage/minio_backend.py: class MinIOBackend(StorageBackend): -- reference asyncio.to_thread() pattern

RESEARCH.md Pattern 6: SSRF validation using ipaddress + socket.getaddrinfo. RESEARCH.md Pattern 2: HKDF+Fernet — fresh HKDF instance per call (AlreadyFinalized pitfall). RESEARCH.md Pattern 8: TTLCache thread safety — threading.Lock required for concurrent access. RESEARCH.md Pattern 9: get_storage_backend_for_document factory extension.

Task 1: Create cloud_utils.py — SSRF validation + HKDF credential encryption backend/storage/cloud_utils.py - backend/storage/base.py — StorageBackend ABC, 7 method signatures - backend/config.py — settings.cloud_creds_key field name - .planning/phases/05-cloud-storage-backends/05-RESEARCH.md — Pattern 2 (HKDF+Fernet) and Pattern 6 (SSRF) - validate_cloud_url(url: str) -> None raises ValueError for: localhost, 127.0.0.0/8, 169.254.0.0/16, 10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16, ::1/128, fc00::/7 - validate_cloud_url resolves DNS via socket.getaddrinfo before checking IP (anti-DNS-rebinding: resolves hostname to IP then checks IP against blocked networks) - validate_cloud_url raises ValueError for non-http/https schemes - validate_cloud_url raises ValueError for URLs with no hostname - _derive_fernet_key(master_key: bytes, user_id: str) -> Fernet: creates a fresh HKDF instance on every call (never reuses); uses algorithm=hashes.SHA256(), length=32, salt=user_id.encode("utf-8"), info=b"cloud-credentials" - encrypt_credentials(master_key: bytes, user_id: str, credentials: dict) -> str: returns Fernet-encrypted JSON string (not plaintext) - decrypt_credentials(master_key: bytes, user_id: str, credentials_enc: str) -> dict: returns original dict - Round-trip: decrypt_credentials(master_key, uid, encrypt_credentials(master_key, uid, creds)) == creds Create backend/storage/cloud_utils.py with module docstring explaining SSRF prevention and HKDF pattern.
Implement validate_cloud_url(url: str) -> None:
- Import: ipaddress, socket, urllib.parse.urlparse
- Parse URL; reject non-http/https schemes; reject missing hostname
- Define BLOCKED_NETS list: ip_network("127.0.0.0/8"), ip_network("169.254.0.0/16"),
  ip_network("10.0.0.0/8"), ip_network("172.16.0.0/12"), ip_network("192.168.0.0/16"),
  ip_network("::1/128"), ip_network("fc00::/7")
- Also explicitly block hostname == "localhost" string before IP resolution
- Try ipaddress.ip_address(hostname) — if that fails (not a raw IP), use
  socket.getaddrinfo(hostname, None)[0][4][0] to resolve; wrap socket.gaierror
- Check resolved IP against each BLOCKED_NETS entry using addr in net

Implement _derive_fernet_key(master_key: bytes, user_id: str) -> Fernet:
- Import: base64, cryptography.hazmat.primitives.hashes, cryptography.hazmat.primitives.kdf.hkdf.HKDF, cryptography.fernet.Fernet
- Create new HKDF(...) instance each call — do NOT cache or store the instance
- Call hkdf.derive(master_key) → 32 raw bytes
- Return Fernet(base64.urlsafe_b64encode(raw_key))

Implement encrypt_credentials(master_key: bytes, user_id: str, credentials: dict) -> str:
- import json inside function body (or at top)
- Call _derive_fernet_key to get a Fernet instance
- Return f.encrypt(json.dumps(credentials).encode("utf-8")).decode("utf-8")

Implement decrypt_credentials(master_key: bytes, user_id: str, credentials_enc: str) -> dict:
- Call _derive_fernet_key to get a Fernet instance
- Return json.loads(f.decrypt(credentials_enc.encode("utf-8")))
cd /Users/nik/Documents/Progamming/document_scanner/backend && python -c " from storage.cloud_utils import validate_cloud_url, encrypt_credentials, decrypt_credentials import pytest # SSRF check try: validate_cloud_url('http://127.0.0.1/dav') print('FAIL: loopback should raise') except ValueError: print('OK: loopback blocked') try: validate_cloud_url('http://10.0.0.1/dav') print('FAIL: RFC-1918 should raise') except ValueError: print('OK: RFC-1918 blocked') # Round-trip mk = b'test-master-key-32bytes-padded!!' uid = '550e8400-e29b-41d4-a716-446655440000' creds = {'access_token': 'ya29.xxx', 'refresh_token': '1//xxx'} enc = encrypt_credentials(mk, uid, creds) assert enc != str(creds) dec = decrypt_credentials(mk, uid, enc) assert dec == creds, f'Round-trip failed: {dec}' print('OK: encryption round-trip') " - backend/storage/cloud_utils.py contains def validate_cloud_url, def encrypt_credentials, def decrypt_credentials, def _derive_fernet_key - validate_cloud_url("http://127.0.0.1/dav") raises ValueError - validate_cloud_url("http://10.0.0.1/dav") raises ValueError - validate_cloud_url("http://169.254.169.254/dav") raises ValueError - validate_cloud_url("http://192.168.1.1/dav") raises ValueError - validate_cloud_url("http://localhost/dav") raises ValueError - Encryption round-trip: decrypt_credentials(key, uid, encrypt_credentials(key, uid, creds)) == creds - "access_token" plaintext does NOT appear in the encrypted string cloud_utils.py created; SSRF validation blocks all 5 network categories; HKDF round-trip verified via python -c invocation Task 2: Create cloud_cache.py and extend storage factory backend/services/cloud_cache.py, backend/storage/__init__.py - backend/storage/__init__.py — current get_storage_backend() factory - backend/storage/base.py — StorageBackend ABC - backend/storage/minio_backend.py — MinIOBackend constructor signature - backend/db/models.py — CloudConnection, Document, User model fields - .planning/phases/05-cloud-storage-backends/05-RESEARCH.md — Pattern 8 (TTLCache), Pattern 9 (factory extension) - backend/services/cloud_cache.py exports a module-level _folder_cache = TTLCache(maxsize=1000, ttl=60) and a threading.Lock() - get_cloud_folders_cached(user_id: str, provider: str, folder_id: str, fetch_fn: Awaitable) is an async function that checks cache before calling fetch_fn - get_storage_backend_for_document(document, user, session) is an async function added to backend/storage/__init__.py that returns MinIOBackend for storage_backend=="minio" and raises HTTPException(503) for unknown or inactive cloud connections - existing get_storage_backend() function in __init__.py is NOT modified (existing callers unaffected) - get_storage_backend_for_document raises HTTPException(503, detail="Cloud connection not found or inactive") when CloudConnection is missing or status != "ACTIVE" Create backend/services/cloud_cache.py: - Import: threading, cachetools.TTLCache, typing.Callable, typing.Awaitable - Module-level: _folder_cache: TTLCache = TTLCache(maxsize=1000, ttl=60) - Module-level: _folder_cache_lock = threading.Lock() - async function get_cloud_folders_cached(user_id: str, provider: str, folder_id: str, fetch_fn) -> list: cache_key = f"{user_id}:{provider}:{folder_id}" with _folder_cache_lock: check if cache_key in _folder_cache; return cached if found result = await fetch_fn() # called OUTSIDE the lock to not block event loop with _folder_cache_lock: store result in cache return result - Function invalidate_provider_cache(user_id: str, provider: str) -> None: iterates _folder_cache with lock and deletes all keys starting with f"{user_id}:{provider}:"
Extend backend/storage/__init__.py (add after existing get_storage_backend()):
- Import at top of file: select from sqlalchemy, HTTPException from fastapi, AsyncSession from sqlalchemy.ext.asyncio, Optional from typing
- Import: from db.models import CloudConnection, Document, User
- Import: from config import settings
- Import: from storage.cloud_utils import decrypt_credentials
- Add async function get_storage_backend_for_document(document, user, session: AsyncSession) -> StorageBackend:
  If document.storage_backend == "minio": return get_storage_backend() (existing factory)
  Otherwise: query CloudConnection where user_id=user.id AND provider=document.storage_backend AND status="ACTIVE"
  If not found: raise HTTPException(status_code=503, detail="Cloud connection not found or inactive")
  Decrypt credentials: master_key = settings.cloud_creds_key.encode(); credentials = decrypt_credentials(master_key, str(user.id), conn.credentials_enc)
  If provider == "google_drive": import GoogleDriveBackend; return GoogleDriveBackend(credentials)
  Elif provider == "onedrive": import OneDriveBackend; return OneDriveBackend(credentials)
  Elif provider in ("nextcloud", "webdav"): import WebDAVBackend; return WebDAVBackend(credentials["server_url"], credentials["username"], credentials["password"])
  Else: raise ValueError(f"Unknown storage backend: {document.storage_backend}")
  Use lazy imports (inside the function) for cloud backends to avoid circular imports at module load time.
cd /Users/nik/Documents/Progamming/document_scanner/backend && python -c " from services.cloud_cache import get_cloud_folders_cached, _folder_cache, _folder_cache_lock, invalidate_provider_cache from storage import get_storage_backend, get_storage_backend_for_document print('cloud_cache imports OK') print('factory extension imports OK') print(f'TTLCache maxsize={_folder_cache.maxsize}, ttl={_folder_cache.ttl}') " - backend/services/cloud_cache.py exists and exports _folder_cache (TTLCache), _folder_cache_lock (Lock), get_cloud_folders_cached (async), invalidate_provider_cache - _folder_cache.maxsize == 1000 and _folder_cache.ttl == 60 - backend/storage/__init__.py exports get_storage_backend_for_document (async function) - `from storage import get_storage_backend_for_document` imports without error - Existing `from storage import get_storage_backend` still works (no regression) - `python -m pytest -v --tb=short` passes with 0 failures (no import regressions) cloud_cache.py created with TTLCache singleton and cache/invalidate helpers; storage/__init__.py has get_storage_backend_for_document; full pytest suite passes

<threat_model>

Trust Boundaries

Boundary Description
user-supplied URL → validate_cloud_url Untrusted URL must be checked against SSRF blocklist before any HTTP call
credentials dict → Fernet ciphertext Credentials must never appear in plaintext after this layer
DNS resolution → IP check DNS-based SSRF bypass: hostname resolves to internal IP after validation

STRIDE Threat Register

Threat ID Category Component Disposition Mitigation Plan
T-05-02-01 Tampering validate_cloud_url — DNS resolution mitigate socket.getaddrinfo resolves hostname to IP before network check; validate_cloud_url called immediately before each request (not only at connect-time) per D-17; DNS rebinding window is minimized
T-05-02-02 Information Disclosure _derive_fernet_key — HKDF instance reuse mitigate New HKDF(...) instance created on every _derive_fernet_key call; AlreadyFinalized pitfall (RESEARCH.md Pitfall 3) prevented by construction
T-05-02-03 Information Disclosure cloud_creds_key default value mitigate Default "CHANGEME-32-bytes-padded!!" is clearly a placeholder; production deployment requires CLOUD_CREDS_KEY env var; docstring on Settings field documents the requirement
T-05-02-04 Elevation of Privilege get_storage_backend_for_document — cross-user mitigate Function receives user object from get_regular_user dep; CloudConnection query includes user_id=user.id filter; cross-user access impossible via this function
T-05-02-SC Tampering cachetools package install mitigate cachetools verified [OK] in RESEARCH.md slopcheck audit
</threat_model>
cd /Users/nik/Documents/Progamming/document_scanner/backend && python -m pytest tests/test_cloud.py -v && python -m pytest -v --tb=short 2>&1 | tail -10

<success_criteria>

  • cloud_utils.py: validate_cloud_url blocks RFC-1918/loopback/link-local; HKDF round-trip correct
  • cloud_cache.py: TTLCache(maxsize=1000, ttl=60) with thread-safe lock; get_cloud_folders_cached works
  • storage/__init__.py: get_storage_backend_for_document added alongside existing get_storage_backend()
  • pytest -v exits 0, 0 failures; test_cloud.py still all xfailed </success_criteria>
Create `.planning/phases/05-cloud-storage-backends/05-02-SUMMARY.md` when done