""" Tests for cloud_utils.py (HKDF credential encryption + SSRF validation) and cloud_cache.py (TTLCache singleton). Plan 05-02: RED phase tests — these tests define the expected behavior before implementation exists. Tests for functions not yet implemented will ImportError (which counts as a failure, confirming the RED state). Requirements: CLOUD-02 (credential encryption), D-17 (SSRF prevention), CLOUD-07 (factory). """ from __future__ import annotations import pytest # ── SSRF validation tests (D-17) ───────────────────────────────────────────── class TestValidateCloudUrl: """Test SSRF prevention in validate_cloud_url.""" def test_blocks_loopback_ipv4(self): """127.0.0.1 must raise ValueError.""" from storage.cloud_utils import validate_cloud_url with pytest.raises(ValueError): validate_cloud_url("http://127.0.0.1/dav") def test_blocks_rfc1918_10(self): """10.x.x.x (RFC-1918 class A) must raise ValueError.""" from storage.cloud_utils import validate_cloud_url with pytest.raises(ValueError): validate_cloud_url("http://10.0.0.1/dav") def test_blocks_rfc1918_172(self): """172.16.x.x through 172.31.x.x (RFC-1918 class B) must raise ValueError.""" from storage.cloud_utils import validate_cloud_url with pytest.raises(ValueError): validate_cloud_url("http://172.16.0.1/dav") def test_blocks_rfc1918_192(self): """192.168.x.x (RFC-1918 class C) must raise ValueError.""" from storage.cloud_utils import validate_cloud_url with pytest.raises(ValueError): validate_cloud_url("http://192.168.1.1/dav") def test_blocks_link_local(self): """169.254.x.x link-local must raise ValueError (AWS metadata endpoint).""" from storage.cloud_utils import validate_cloud_url with pytest.raises(ValueError): validate_cloud_url("http://169.254.169.254/dav") def test_blocks_localhost_string(self): """'localhost' hostname must raise ValueError before DNS resolution.""" from storage.cloud_utils import validate_cloud_url with pytest.raises(ValueError): validate_cloud_url("http://localhost/dav") def test_blocks_ipv6_loopback(self): """::1 (IPv6 loopback) must raise ValueError.""" from storage.cloud_utils import validate_cloud_url with pytest.raises(ValueError): validate_cloud_url("http://[::1]/dav") def test_rejects_non_http_scheme(self): """ftp:// scheme must raise ValueError.""" from storage.cloud_utils import validate_cloud_url with pytest.raises(ValueError): validate_cloud_url("ftp://example.com/dav") def test_rejects_missing_hostname(self): """URL with no hostname must raise ValueError.""" from storage.cloud_utils import validate_cloud_url with pytest.raises(ValueError): validate_cloud_url("http:///path") def test_allows_public_https(self): """Public HTTPS URL must NOT raise ValueError.""" from storage.cloud_utils import validate_cloud_url # This should resolve to a real public IP — no exception expected validate_cloud_url("https://cloud.example.com/remote.php/dav") def test_allows_http_public(self): """Public HTTP URL must NOT raise ValueError.""" from storage.cloud_utils import validate_cloud_url # Public IP in 8.8.0.0/16 range (Google DNS segment, definitely not RFC-1918) validate_cloud_url("http://8.8.8.8/dav") # ── HKDF credential encryption tests (CLOUD-02) ────────────────────────────── class TestEncryptDecryptCredentials: """Test HKDF+Fernet credential encryption/decryption round-trip.""" MASTER_KEY = b"test-master-key-32bytes-padded!!" USER_ID = "550e8400-e29b-41d4-a716-446655440000" def test_round_trip_simple(self): """encrypt then decrypt must return the original dict.""" from storage.cloud_utils import encrypt_credentials, decrypt_credentials creds = {"access_token": "ya29.xxx", "refresh_token": "1//xxx"} enc = encrypt_credentials(self.MASTER_KEY, self.USER_ID, creds) dec = decrypt_credentials(self.MASTER_KEY, self.USER_ID, enc) assert dec == creds def test_encrypted_is_string(self): """encrypt_credentials must return a str (base64 Fernet token).""" from storage.cloud_utils import encrypt_credentials creds = {"access_token": "ya29.xxx"} enc = encrypt_credentials(self.MASTER_KEY, self.USER_ID, creds) assert isinstance(enc, str) def test_plaintext_not_in_ciphertext(self): """Plaintext credential values must NOT appear in the encrypted string.""" from storage.cloud_utils import encrypt_credentials creds = {"access_token": "ya29.abc123secret", "refresh_token": "1//refresh456"} enc = encrypt_credentials(self.MASTER_KEY, self.USER_ID, creds) assert "ya29.abc123secret" not in enc assert "refresh456" not in enc assert "access_token" not in enc def test_different_users_produce_different_ciphertext(self): """Different user_ids must produce different ciphertexts for same plaintext.""" from storage.cloud_utils import encrypt_credentials creds = {"access_token": "same_token"} enc1 = encrypt_credentials(self.MASTER_KEY, "user-id-aaaa-0000", creds) enc2 = encrypt_credentials(self.MASTER_KEY, "user-id-bbbb-1111", creds) # Different per-user keys → different Fernet tokens assert enc1 != enc2 def test_hkdf_not_reused(self): """Calling encrypt twice must not raise AlreadyFinalized (fresh HKDF per call).""" from storage.cloud_utils import encrypt_credentials creds = {"token": "abc"} # Two successive calls — if HKDF instance were reused, the second would raise enc1 = encrypt_credentials(self.MASTER_KEY, self.USER_ID, creds) enc2 = encrypt_credentials(self.MASTER_KEY, self.USER_ID, creds) # Both should succeed (not raise) assert isinstance(enc1, str) assert isinstance(enc2, str) def test_round_trip_nested_dict(self): """Round-trip works for a nested dict with expiry datetime string.""" from storage.cloud_utils import encrypt_credentials, decrypt_credentials creds = { "access_token": "ya29.xxx", "refresh_token": "1//xxx", "expiry": "2026-05-28T15:00:00", "metadata": {"scope": "drive.file"}, } enc = encrypt_credentials(self.MASTER_KEY, self.USER_ID, creds) dec = decrypt_credentials(self.MASTER_KEY, self.USER_ID, enc) assert dec == creds def test_wrong_user_id_fails_decrypt(self): """Decrypting with a different user_id must raise an error.""" from storage.cloud_utils import encrypt_credentials, decrypt_credentials from cryptography.fernet import InvalidToken creds = {"access_token": "secret"} enc = encrypt_credentials(self.MASTER_KEY, "user-aaa", creds) with pytest.raises((InvalidToken, Exception)): decrypt_credentials(self.MASTER_KEY, "user-bbb", enc) # ── TTLCache singleton tests (Pattern 8) ────────────────────────────────────── class TestTTLCacheSingleton: """Test cloud_cache.py module-level TTLCache configuration.""" def test_cache_maxsize(self): """_folder_cache must have maxsize=1000.""" from services.cloud_cache import _folder_cache assert _folder_cache.maxsize == 1000 def test_cache_ttl(self): """_folder_cache must have ttl=60.""" from services.cloud_cache import _folder_cache assert _folder_cache.ttl == 60 def test_lock_is_threading_lock(self): """_folder_cache_lock must be a threading.Lock (or RLock).""" import threading from services.cloud_cache import _folder_cache_lock # Both Lock and RLock are acceptable — check acquire/release protocol assert hasattr(_folder_cache_lock, "acquire") assert hasattr(_folder_cache_lock, "release") def test_exports_get_cloud_folders_cached(self): """cloud_cache must export the async get_cloud_folders_cached function.""" import asyncio from services.cloud_cache import get_cloud_folders_cached # Must be a coroutine function assert asyncio.iscoroutinefunction(get_cloud_folders_cached) def test_exports_invalidate_provider_cache(self): """cloud_cache must export the sync invalidate_provider_cache function.""" import asyncio from services.cloud_cache import invalidate_provider_cache # Must be a regular (not coroutine) function assert not asyncio.iscoroutinefunction(invalidate_provider_cache) @pytest.mark.asyncio async def test_get_cloud_folders_cached_caches_result(self): """get_cloud_folders_cached must return cached result on second call.""" import asyncio from services.cloud_cache import get_cloud_folders_cached, _folder_cache call_count = 0 async def fetch_fn(): nonlocal call_count call_count += 1 return [{"id": "folder1", "name": "Documents"}] user_id = "test-user-99" provider = "test_provider" folder_id = "root" # First call — should invoke fetch_fn result1 = await get_cloud_folders_cached(user_id, provider, folder_id, fetch_fn) # Second call — should use cache, not call fetch_fn again result2 = await get_cloud_folders_cached(user_id, provider, folder_id, fetch_fn) assert result1 == [{"id": "folder1", "name": "Documents"}] assert result2 == result1 assert call_count == 1 # fetch_fn called only once @pytest.mark.asyncio async def test_invalidate_clears_entries(self): """invalidate_provider_cache must remove all entries for user+provider.""" from services.cloud_cache import get_cloud_folders_cached, invalidate_provider_cache, _folder_cache async def fetch_fn(): return [{"id": "x"}] user_id = "user-invalidate-test" provider = "myprovider" # Populate cache await get_cloud_folders_cached(user_id, provider, "folder-a", fetch_fn) await get_cloud_folders_cached(user_id, provider, "folder-b", fetch_fn) # Invalidate invalidate_provider_cache(user_id, provider) # Cache entries should be gone key_a = f"{user_id}:{provider}:folder-a" key_b = f"{user_id}:{provider}:folder-b" assert key_a not in _folder_cache assert key_b not in _folder_cache # ── Storage factory tests (CLOUD-07) ───────────────────────────────────────── class TestStorageFactoryImport: """Test that get_storage_backend_for_document is importable from storage.""" def test_import_factory_function(self): """get_storage_backend_for_document must be importable from storage.""" import asyncio from storage import get_storage_backend_for_document assert asyncio.iscoroutinefunction(get_storage_backend_for_document) def test_existing_factory_unchanged(self): """Existing get_storage_backend() factory must still be importable.""" from storage import get_storage_backend from storage.minio_backend import MinIOBackend backend = get_storage_backend() assert isinstance(backend, MinIOBackend)