Files
kite/backend/tests/test_cloud_utils.py
curo1305 976d2ca2de feat(05-02): implement cloud_utils.py — SSRF validation and HKDF credential encryption
- validate_cloud_url(): blocks RFC-1918 (10.x, 172.16.x, 192.168.x), loopback (127.x),
  link-local (169.254.x), IPv6 loopback (::1), ULA (fc00::/7), and 'localhost' string;
  resolves DNS via socket.getaddrinfo BEFORE IP check (anti-DNS-rebinding per D-17)
- _derive_fernet_key(): creates fresh HKDF-SHA256 instance per call (AlreadyFinalized
  pitfall avoided per RESEARCH.md Pitfall 3); uses user_id as salt for per-user isolation
- encrypt_credentials(): Fernet-encrypts JSON-serialised credentials dict; returns str
- decrypt_credentials(): decrypts Fernet token back to original dict
- [Rule 1 - Bug] Fixed test_allows_public_https to use 8.8.8.8 IP (cloud.example.com
  does not resolve in offline CI environments)
2026-05-28 20:58:40 +02:00

274 lines
12 KiB
Python

"""
Tests for cloud_utils.py (HKDF credential encryption + SSRF validation)
and cloud_cache.py (TTLCache singleton).
Plan 05-02: RED phase tests — these tests define the expected behavior
before implementation exists. Tests for functions not yet implemented will
ImportError (which counts as a failure, confirming the RED state).
Requirements: CLOUD-02 (credential encryption), D-17 (SSRF prevention), CLOUD-07 (factory).
"""
from __future__ import annotations
import pytest
# ── SSRF validation tests (D-17) ─────────────────────────────────────────────
class TestValidateCloudUrl:
"""Test SSRF prevention in validate_cloud_url."""
def test_blocks_loopback_ipv4(self):
"""127.0.0.1 must raise ValueError."""
from storage.cloud_utils import validate_cloud_url
with pytest.raises(ValueError):
validate_cloud_url("http://127.0.0.1/dav")
def test_blocks_rfc1918_10(self):
"""10.x.x.x (RFC-1918 class A) must raise ValueError."""
from storage.cloud_utils import validate_cloud_url
with pytest.raises(ValueError):
validate_cloud_url("http://10.0.0.1/dav")
def test_blocks_rfc1918_172(self):
"""172.16.x.x through 172.31.x.x (RFC-1918 class B) must raise ValueError."""
from storage.cloud_utils import validate_cloud_url
with pytest.raises(ValueError):
validate_cloud_url("http://172.16.0.1/dav")
def test_blocks_rfc1918_192(self):
"""192.168.x.x (RFC-1918 class C) must raise ValueError."""
from storage.cloud_utils import validate_cloud_url
with pytest.raises(ValueError):
validate_cloud_url("http://192.168.1.1/dav")
def test_blocks_link_local(self):
"""169.254.x.x link-local must raise ValueError (AWS metadata endpoint)."""
from storage.cloud_utils import validate_cloud_url
with pytest.raises(ValueError):
validate_cloud_url("http://169.254.169.254/dav")
def test_blocks_localhost_string(self):
"""'localhost' hostname must raise ValueError before DNS resolution."""
from storage.cloud_utils import validate_cloud_url
with pytest.raises(ValueError):
validate_cloud_url("http://localhost/dav")
def test_blocks_ipv6_loopback(self):
"""::1 (IPv6 loopback) must raise ValueError."""
from storage.cloud_utils import validate_cloud_url
with pytest.raises(ValueError):
validate_cloud_url("http://[::1]/dav")
def test_rejects_non_http_scheme(self):
"""ftp:// scheme must raise ValueError."""
from storage.cloud_utils import validate_cloud_url
with pytest.raises(ValueError):
validate_cloud_url("ftp://example.com/dav")
def test_rejects_missing_hostname(self):
"""URL with no hostname must raise ValueError."""
from storage.cloud_utils import validate_cloud_url
with pytest.raises(ValueError):
validate_cloud_url("http:///path")
def test_allows_public_https(self):
"""Public HTTPS URL with a raw public IP must NOT raise ValueError."""
from storage.cloud_utils import validate_cloud_url
# Use a well-known public IP directly to avoid DNS resolution failures
# in offline / network-isolated CI environments.
# 8.8.8.8 is Google DNS — a globally routable address, not in any blocked net.
validate_cloud_url("https://8.8.8.8/remote.php/dav")
def test_allows_http_public(self):
"""Public HTTP URL must NOT raise ValueError."""
from storage.cloud_utils import validate_cloud_url
# Public IP in 8.8.0.0/16 range (Google DNS segment, definitely not RFC-1918)
validate_cloud_url("http://8.8.8.8/dav")
# ── HKDF credential encryption tests (CLOUD-02) ──────────────────────────────
class TestEncryptDecryptCredentials:
"""Test HKDF+Fernet credential encryption/decryption round-trip."""
MASTER_KEY = b"test-master-key-32bytes-padded!!"
USER_ID = "550e8400-e29b-41d4-a716-446655440000"
def test_round_trip_simple(self):
"""encrypt then decrypt must return the original dict."""
from storage.cloud_utils import encrypt_credentials, decrypt_credentials
creds = {"access_token": "ya29.xxx", "refresh_token": "1//xxx"}
enc = encrypt_credentials(self.MASTER_KEY, self.USER_ID, creds)
dec = decrypt_credentials(self.MASTER_KEY, self.USER_ID, enc)
assert dec == creds
def test_encrypted_is_string(self):
"""encrypt_credentials must return a str (base64 Fernet token)."""
from storage.cloud_utils import encrypt_credentials
creds = {"access_token": "ya29.xxx"}
enc = encrypt_credentials(self.MASTER_KEY, self.USER_ID, creds)
assert isinstance(enc, str)
def test_plaintext_not_in_ciphertext(self):
"""Plaintext credential values must NOT appear in the encrypted string."""
from storage.cloud_utils import encrypt_credentials
creds = {"access_token": "ya29.abc123secret", "refresh_token": "1//refresh456"}
enc = encrypt_credentials(self.MASTER_KEY, self.USER_ID, creds)
assert "ya29.abc123secret" not in enc
assert "refresh456" not in enc
assert "access_token" not in enc
def test_different_users_produce_different_ciphertext(self):
"""Different user_ids must produce different ciphertexts for same plaintext."""
from storage.cloud_utils import encrypt_credentials
creds = {"access_token": "same_token"}
enc1 = encrypt_credentials(self.MASTER_KEY, "user-id-aaaa-0000", creds)
enc2 = encrypt_credentials(self.MASTER_KEY, "user-id-bbbb-1111", creds)
# Different per-user keys → different Fernet tokens
assert enc1 != enc2
def test_hkdf_not_reused(self):
"""Calling encrypt twice must not raise AlreadyFinalized (fresh HKDF per call)."""
from storage.cloud_utils import encrypt_credentials
creds = {"token": "abc"}
# Two successive calls — if HKDF instance were reused, the second would raise
enc1 = encrypt_credentials(self.MASTER_KEY, self.USER_ID, creds)
enc2 = encrypt_credentials(self.MASTER_KEY, self.USER_ID, creds)
# Both should succeed (not raise)
assert isinstance(enc1, str)
assert isinstance(enc2, str)
def test_round_trip_nested_dict(self):
"""Round-trip works for a nested dict with expiry datetime string."""
from storage.cloud_utils import encrypt_credentials, decrypt_credentials
creds = {
"access_token": "ya29.xxx",
"refresh_token": "1//xxx",
"expiry": "2026-05-28T15:00:00",
"metadata": {"scope": "drive.file"},
}
enc = encrypt_credentials(self.MASTER_KEY, self.USER_ID, creds)
dec = decrypt_credentials(self.MASTER_KEY, self.USER_ID, enc)
assert dec == creds
def test_wrong_user_id_fails_decrypt(self):
"""Decrypting with a different user_id must raise an error."""
from storage.cloud_utils import encrypt_credentials, decrypt_credentials
from cryptography.fernet import InvalidToken
creds = {"access_token": "secret"}
enc = encrypt_credentials(self.MASTER_KEY, "user-aaa", creds)
with pytest.raises((InvalidToken, Exception)):
decrypt_credentials(self.MASTER_KEY, "user-bbb", enc)
# ── TTLCache singleton tests (Pattern 8) ──────────────────────────────────────
class TestTTLCacheSingleton:
"""Test cloud_cache.py module-level TTLCache configuration."""
def test_cache_maxsize(self):
"""_folder_cache must have maxsize=1000."""
from services.cloud_cache import _folder_cache
assert _folder_cache.maxsize == 1000
def test_cache_ttl(self):
"""_folder_cache must have ttl=60."""
from services.cloud_cache import _folder_cache
assert _folder_cache.ttl == 60
def test_lock_is_threading_lock(self):
"""_folder_cache_lock must be a threading.Lock (or RLock)."""
import threading
from services.cloud_cache import _folder_cache_lock
# Both Lock and RLock are acceptable — check acquire/release protocol
assert hasattr(_folder_cache_lock, "acquire")
assert hasattr(_folder_cache_lock, "release")
def test_exports_get_cloud_folders_cached(self):
"""cloud_cache must export the async get_cloud_folders_cached function."""
import asyncio
from services.cloud_cache import get_cloud_folders_cached
# Must be a coroutine function
assert asyncio.iscoroutinefunction(get_cloud_folders_cached)
def test_exports_invalidate_provider_cache(self):
"""cloud_cache must export the sync invalidate_provider_cache function."""
import asyncio
from services.cloud_cache import invalidate_provider_cache
# Must be a regular (not coroutine) function
assert not asyncio.iscoroutinefunction(invalidate_provider_cache)
@pytest.mark.asyncio
async def test_get_cloud_folders_cached_caches_result(self):
"""get_cloud_folders_cached must return cached result on second call."""
import asyncio
from services.cloud_cache import get_cloud_folders_cached, _folder_cache
call_count = 0
async def fetch_fn():
nonlocal call_count
call_count += 1
return [{"id": "folder1", "name": "Documents"}]
user_id = "test-user-99"
provider = "test_provider"
folder_id = "root"
# First call — should invoke fetch_fn
result1 = await get_cloud_folders_cached(user_id, provider, folder_id, fetch_fn)
# Second call — should use cache, not call fetch_fn again
result2 = await get_cloud_folders_cached(user_id, provider, folder_id, fetch_fn)
assert result1 == [{"id": "folder1", "name": "Documents"}]
assert result2 == result1
assert call_count == 1 # fetch_fn called only once
@pytest.mark.asyncio
async def test_invalidate_clears_entries(self):
"""invalidate_provider_cache must remove all entries for user+provider."""
from services.cloud_cache import get_cloud_folders_cached, invalidate_provider_cache, _folder_cache
async def fetch_fn():
return [{"id": "x"}]
user_id = "user-invalidate-test"
provider = "myprovider"
# Populate cache
await get_cloud_folders_cached(user_id, provider, "folder-a", fetch_fn)
await get_cloud_folders_cached(user_id, provider, "folder-b", fetch_fn)
# Invalidate
invalidate_provider_cache(user_id, provider)
# Cache entries should be gone
key_a = f"{user_id}:{provider}:folder-a"
key_b = f"{user_id}:{provider}:folder-b"
assert key_a not in _folder_cache
assert key_b not in _folder_cache
# ── Storage factory tests (CLOUD-07) ─────────────────────────────────────────
class TestStorageFactoryImport:
"""Test that get_storage_backend_for_document is importable from storage."""
def test_import_factory_function(self):
"""get_storage_backend_for_document must be importable from storage."""
import asyncio
from storage import get_storage_backend_for_document
assert asyncio.iscoroutinefunction(get_storage_backend_for_document)
def test_existing_factory_unchanged(self):
"""Existing get_storage_backend() factory must still be importable."""
from storage import get_storage_backend
from storage.minio_backend import MinIOBackend
backend = get_storage_backend()
assert isinstance(backend, MinIOBackend)