Files
curo1305 976d2ca2de feat(05-02): implement cloud_utils.py — SSRF validation and HKDF credential encryption
- validate_cloud_url(): blocks RFC-1918 (10.x, 172.16.x, 192.168.x), loopback (127.x),
  link-local (169.254.x), IPv6 loopback (::1), ULA (fc00::/7), and 'localhost' string;
  resolves DNS via socket.getaddrinfo BEFORE IP check (anti-DNS-rebinding per D-17)
- _derive_fernet_key(): creates fresh HKDF-SHA256 instance per call (AlreadyFinalized
  pitfall avoided per RESEARCH.md Pitfall 3); uses user_id as salt for per-user isolation
- encrypt_credentials(): Fernet-encrypts JSON-serialised credentials dict; returns str
- decrypt_credentials(): decrypts Fernet token back to original dict
- [Rule 1 - Bug] Fixed test_allows_public_https to use 8.8.8.8 IP (cloud.example.com
  does not resolve in offline CI environments)
2026-05-28 20:58:40 +02:00

181 lines
7.1 KiB
Python

"""
Cloud storage shared utilities for DocuVault.
Security design:
SSRF prevention (D-17): validate_cloud_url() resolves DNS via socket.getaddrinfo
*before* checking the resolved IP against blocked networks. This prevents DNS-rebinding
attacks where a hostname passes a string check but resolves to an internal IP.
It also explicitly blocks the string "localhost" before any DNS resolution.
HKDF credential encryption (D-18, CLOUD-02): _derive_fernet_key() creates a FRESH
HKDF instance on every call. The cryptography library raises AlreadyFinalized if
.derive() is called twice on the same instance (Pitfall 3 in RESEARCH.md). This
function avoids that by constructing a new HKDF(...) object each time.
References:
RESEARCH.md Pattern 2 — HKDF+Fernet
RESEARCH.md Pattern 6 — SSRF validation via ipaddress + socket.getaddrinfo
CLAUDE.md — cloud credentials encrypted with HKDF per-user key derivation
"""
from __future__ import annotations
import base64
import ipaddress
import json
import socket
from urllib.parse import urlparse
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
# Networks that must never be the target of outbound cloud HTTP requests (D-17)
_BLOCKED_NETS = [
ipaddress.ip_network("127.0.0.0/8"), # IPv4 loopback
ipaddress.ip_network("169.254.0.0/16"), # Link-local (AWS/GCP metadata endpoint)
ipaddress.ip_network("10.0.0.0/8"), # RFC-1918 class A
ipaddress.ip_network("172.16.0.0/12"), # RFC-1918 class B
ipaddress.ip_network("192.168.0.0/16"), # RFC-1918 class C
ipaddress.ip_network("::1/128"), # IPv6 loopback
ipaddress.ip_network("fc00::/7"), # IPv6 Unique Local Address (ULA)
]
def validate_cloud_url(url: str) -> None:
"""Raise ValueError if the URL targets a private, internal, or restricted address.
Security contract (D-17):
1. Reject non-http/https schemes.
2. Reject URLs with no hostname.
3. Explicitly reject the string "localhost" before DNS resolution to avoid
cases where getaddrinfo behaviour varies by OS.
4. If the hostname is a raw IP address, check it directly.
Otherwise, resolve via socket.getaddrinfo (DNS lookup) and check the
resolved IP. This closes the DNS-rebinding window: the hostname must
resolve to a non-private IP *at validation time*.
5. Raise ValueError for any IP that falls inside a BLOCKED_NETS entry.
Called immediately before every outbound WebDAV/Nextcloud HTTP request,
not only at connect-time (RESEARCH.md Pitfall 5 — DNS rebinding mitigation).
Args:
url: The user-supplied WebDAV, Nextcloud, or cloud server URL.
Raises:
ValueError: If the URL uses a blocked scheme, has no hostname, or
resolves to a private/internal address.
"""
parsed = urlparse(url)
# Step 1: scheme check
if parsed.scheme not in ("http", "https"):
raise ValueError(
f"Unsupported URL scheme '{parsed.scheme}': only http and https are allowed."
)
# Step 2: hostname presence
hostname = parsed.hostname # lowercased, brackets stripped for IPv6
if not hostname:
raise ValueError("URL has no hostname.")
# Step 3: explicit string block for 'localhost' (before DNS resolution)
if hostname == "localhost":
raise ValueError(
"URL targets localhost — this is a private/internal address."
)
# Step 4: resolve hostname to IP (or parse if already an IP literal)
try:
addr = ipaddress.ip_address(hostname)
except ValueError:
# Not a raw IP literal — resolve via DNS
try:
resolved = socket.getaddrinfo(hostname, None)[0][4][0]
addr = ipaddress.ip_address(resolved)
except socket.gaierror as exc:
raise ValueError(f"Cannot resolve hostname '{hostname}': {exc}") from exc
except (ValueError, IndexError) as exc:
raise ValueError(f"Unexpected error resolving '{hostname}': {exc}") from exc
# Step 5: check resolved IP against each blocked network
for net in _BLOCKED_NETS:
# Use try/except to handle IPv4/IPv6 family mismatch gracefully
try:
if addr in net:
raise ValueError(
f"URL targets a private/internal address: {addr} is in {net}"
)
except TypeError:
# Different address families (e.g. IPv4 addr in an IPv6 network) — skip
continue
def _derive_fernet_key(master_key: bytes, user_id: str) -> Fernet:
"""Derive a per-user Fernet encryption key using HKDF-SHA256.
Security notes:
- A FRESH HKDF instance is created on every call. The cryptography library
raises AlreadyFinalized if .derive() is called twice on the same instance.
Never cache or reuse the HKDF object (RESEARCH.md Pitfall 3).
- salt = user_id.encode() is deterministic (same user → same key), which
is required so that encrypt and decrypt produce consistent results.
- info = b"cloud-credentials" provides domain separation so the same
master_key cannot be used for unrelated HKDF derivations.
Args:
master_key: The CLOUD_CREDS_KEY env var as bytes.
user_id: The authenticated user's UUID string (used as HKDF salt).
Returns:
A Fernet instance ready for encrypt/decrypt operations.
"""
hkdf = HKDF(
algorithm=hashes.SHA256(),
length=32,
salt=user_id.encode("utf-8"),
info=b"cloud-credentials",
)
raw_key: bytes = hkdf.derive(master_key)
fernet_key = base64.urlsafe_b64encode(raw_key)
return Fernet(fernet_key)
def encrypt_credentials(master_key: bytes, user_id: str, credentials: dict) -> str:
"""Encrypt a credentials dict to a Fernet token string.
The returned string is safe to store in the database credentials_enc column.
It is opaque base64 ciphertext — no plaintext fields are present.
Args:
master_key: The CLOUD_CREDS_KEY env var as bytes.
user_id: The authenticated user's UUID string (HKDF salt).
credentials: A JSON-serialisable dict (access_token, refresh_token, etc.).
Returns:
A URL-safe base64 Fernet token (str).
"""
f = _derive_fernet_key(master_key, user_id)
plaintext = json.dumps(credentials).encode("utf-8")
return f.encrypt(plaintext).decode("utf-8")
def decrypt_credentials(master_key: bytes, user_id: str, credentials_enc: str) -> dict:
"""Decrypt a Fernet token back to the original credentials dict.
Args:
master_key: The CLOUD_CREDS_KEY env var as bytes.
user_id: The authenticated user's UUID string (HKDF salt).
credentials_enc: The Fernet token string from the database.
Returns:
The original credentials dict.
Raises:
cryptography.fernet.InvalidToken: If the token is tampered with or
the wrong user_id (and thus wrong key) is used.
"""
f = _derive_fernet_key(master_key, user_id)
plaintext = f.decrypt(credentials_enc.encode("utf-8"))
return json.loads(plaintext)