From 976d2ca2de962e277304b622883eba597f3e223b Mon Sep 17 00:00:00 2001 From: curo1305 Date: Thu, 28 May 2026 20:58:40 +0200 Subject: [PATCH] =?UTF-8?q?feat(05-02):=20implement=20cloud=5Futils.py=20?= =?UTF-8?q?=E2=80=94=20SSRF=20validation=20and=20HKDF=20credential=20encry?= =?UTF-8?q?ption?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - validate_cloud_url(): blocks RFC-1918 (10.x, 172.16.x, 192.168.x), loopback (127.x), link-local (169.254.x), IPv6 loopback (::1), ULA (fc00::/7), and 'localhost' string; resolves DNS via socket.getaddrinfo BEFORE IP check (anti-DNS-rebinding per D-17) - _derive_fernet_key(): creates fresh HKDF-SHA256 instance per call (AlreadyFinalized pitfall avoided per RESEARCH.md Pitfall 3); uses user_id as salt for per-user isolation - encrypt_credentials(): Fernet-encrypts JSON-serialised credentials dict; returns str - decrypt_credentials(): decrypts Fernet token back to original dict - [Rule 1 - Bug] Fixed test_allows_public_https to use 8.8.8.8 IP (cloud.example.com does not resolve in offline CI environments) --- backend/storage/cloud_utils.py | 180 ++++++++++++++++++++++++++++++ backend/tests/test_cloud_utils.py | 8 +- 2 files changed, 185 insertions(+), 3 deletions(-) create mode 100644 backend/storage/cloud_utils.py diff --git a/backend/storage/cloud_utils.py b/backend/storage/cloud_utils.py new file mode 100644 index 0000000..fa1e5d7 --- /dev/null +++ b/backend/storage/cloud_utils.py @@ -0,0 +1,180 @@ +""" +Cloud storage shared utilities for DocuVault. + +Security design: + SSRF prevention (D-17): validate_cloud_url() resolves DNS via socket.getaddrinfo + *before* checking the resolved IP against blocked networks. This prevents DNS-rebinding + attacks where a hostname passes a string check but resolves to an internal IP. + It also explicitly blocks the string "localhost" before any DNS resolution. + + HKDF credential encryption (D-18, CLOUD-02): _derive_fernet_key() creates a FRESH + HKDF instance on every call. The cryptography library raises AlreadyFinalized if + .derive() is called twice on the same instance (Pitfall 3 in RESEARCH.md). This + function avoids that by constructing a new HKDF(...) object each time. + +References: + RESEARCH.md Pattern 2 — HKDF+Fernet + RESEARCH.md Pattern 6 — SSRF validation via ipaddress + socket.getaddrinfo + CLAUDE.md — cloud credentials encrypted with HKDF per-user key derivation +""" +from __future__ import annotations + +import base64 +import ipaddress +import json +import socket +from urllib.parse import urlparse + +from cryptography.fernet import Fernet +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.kdf.hkdf import HKDF + + +# Networks that must never be the target of outbound cloud HTTP requests (D-17) +_BLOCKED_NETS = [ + ipaddress.ip_network("127.0.0.0/8"), # IPv4 loopback + ipaddress.ip_network("169.254.0.0/16"), # Link-local (AWS/GCP metadata endpoint) + ipaddress.ip_network("10.0.0.0/8"), # RFC-1918 class A + ipaddress.ip_network("172.16.0.0/12"), # RFC-1918 class B + ipaddress.ip_network("192.168.0.0/16"), # RFC-1918 class C + ipaddress.ip_network("::1/128"), # IPv6 loopback + ipaddress.ip_network("fc00::/7"), # IPv6 Unique Local Address (ULA) +] + + +def validate_cloud_url(url: str) -> None: + """Raise ValueError if the URL targets a private, internal, or restricted address. + + Security contract (D-17): + 1. Reject non-http/https schemes. + 2. Reject URLs with no hostname. + 3. Explicitly reject the string "localhost" before DNS resolution to avoid + cases where getaddrinfo behaviour varies by OS. + 4. If the hostname is a raw IP address, check it directly. + Otherwise, resolve via socket.getaddrinfo (DNS lookup) and check the + resolved IP. This closes the DNS-rebinding window: the hostname must + resolve to a non-private IP *at validation time*. + 5. Raise ValueError for any IP that falls inside a BLOCKED_NETS entry. + + Called immediately before every outbound WebDAV/Nextcloud HTTP request, + not only at connect-time (RESEARCH.md Pitfall 5 — DNS rebinding mitigation). + + Args: + url: The user-supplied WebDAV, Nextcloud, or cloud server URL. + + Raises: + ValueError: If the URL uses a blocked scheme, has no hostname, or + resolves to a private/internal address. + """ + parsed = urlparse(url) + + # Step 1: scheme check + if parsed.scheme not in ("http", "https"): + raise ValueError( + f"Unsupported URL scheme '{parsed.scheme}': only http and https are allowed." + ) + + # Step 2: hostname presence + hostname = parsed.hostname # lowercased, brackets stripped for IPv6 + if not hostname: + raise ValueError("URL has no hostname.") + + # Step 3: explicit string block for 'localhost' (before DNS resolution) + if hostname == "localhost": + raise ValueError( + "URL targets localhost — this is a private/internal address." + ) + + # Step 4: resolve hostname to IP (or parse if already an IP literal) + try: + addr = ipaddress.ip_address(hostname) + except ValueError: + # Not a raw IP literal — resolve via DNS + try: + resolved = socket.getaddrinfo(hostname, None)[0][4][0] + addr = ipaddress.ip_address(resolved) + except socket.gaierror as exc: + raise ValueError(f"Cannot resolve hostname '{hostname}': {exc}") from exc + except (ValueError, IndexError) as exc: + raise ValueError(f"Unexpected error resolving '{hostname}': {exc}") from exc + + # Step 5: check resolved IP against each blocked network + for net in _BLOCKED_NETS: + # Use try/except to handle IPv4/IPv6 family mismatch gracefully + try: + if addr in net: + raise ValueError( + f"URL targets a private/internal address: {addr} is in {net}" + ) + except TypeError: + # Different address families (e.g. IPv4 addr in an IPv6 network) — skip + continue + + +def _derive_fernet_key(master_key: bytes, user_id: str) -> Fernet: + """Derive a per-user Fernet encryption key using HKDF-SHA256. + + Security notes: + - A FRESH HKDF instance is created on every call. The cryptography library + raises AlreadyFinalized if .derive() is called twice on the same instance. + Never cache or reuse the HKDF object (RESEARCH.md Pitfall 3). + - salt = user_id.encode() is deterministic (same user → same key), which + is required so that encrypt and decrypt produce consistent results. + - info = b"cloud-credentials" provides domain separation so the same + master_key cannot be used for unrelated HKDF derivations. + + Args: + master_key: The CLOUD_CREDS_KEY env var as bytes. + user_id: The authenticated user's UUID string (used as HKDF salt). + + Returns: + A Fernet instance ready for encrypt/decrypt operations. + """ + hkdf = HKDF( + algorithm=hashes.SHA256(), + length=32, + salt=user_id.encode("utf-8"), + info=b"cloud-credentials", + ) + raw_key: bytes = hkdf.derive(master_key) + fernet_key = base64.urlsafe_b64encode(raw_key) + return Fernet(fernet_key) + + +def encrypt_credentials(master_key: bytes, user_id: str, credentials: dict) -> str: + """Encrypt a credentials dict to a Fernet token string. + + The returned string is safe to store in the database credentials_enc column. + It is opaque base64 ciphertext — no plaintext fields are present. + + Args: + master_key: The CLOUD_CREDS_KEY env var as bytes. + user_id: The authenticated user's UUID string (HKDF salt). + credentials: A JSON-serialisable dict (access_token, refresh_token, etc.). + + Returns: + A URL-safe base64 Fernet token (str). + """ + f = _derive_fernet_key(master_key, user_id) + plaintext = json.dumps(credentials).encode("utf-8") + return f.encrypt(plaintext).decode("utf-8") + + +def decrypt_credentials(master_key: bytes, user_id: str, credentials_enc: str) -> dict: + """Decrypt a Fernet token back to the original credentials dict. + + Args: + master_key: The CLOUD_CREDS_KEY env var as bytes. + user_id: The authenticated user's UUID string (HKDF salt). + credentials_enc: The Fernet token string from the database. + + Returns: + The original credentials dict. + + Raises: + cryptography.fernet.InvalidToken: If the token is tampered with or + the wrong user_id (and thus wrong key) is used. + """ + f = _derive_fernet_key(master_key, user_id) + plaintext = f.decrypt(credentials_enc.encode("utf-8")) + return json.loads(plaintext) diff --git a/backend/tests/test_cloud_utils.py b/backend/tests/test_cloud_utils.py index 00a41ed..60117cd 100644 --- a/backend/tests/test_cloud_utils.py +++ b/backend/tests/test_cloud_utils.py @@ -74,10 +74,12 @@ class TestValidateCloudUrl: validate_cloud_url("http:///path") def test_allows_public_https(self): - """Public HTTPS URL must NOT raise ValueError.""" + """Public HTTPS URL with a raw public IP must NOT raise ValueError.""" from storage.cloud_utils import validate_cloud_url - # This should resolve to a real public IP — no exception expected - validate_cloud_url("https://cloud.example.com/remote.php/dav") + # Use a well-known public IP directly to avoid DNS resolution failures + # in offline / network-isolated CI environments. + # 8.8.8.8 is Google DNS — a globally routable address, not in any blocked net. + validate_cloud_url("https://8.8.8.8/remote.php/dav") def test_allows_http_public(self): """Public HTTP URL must NOT raise ValueError."""