From 7fdffddfc122de84f24130222c95cc4d96de5925 Mon Sep 17 00:00:00 2001 From: curo1305 Date: Thu, 28 May 2026 20:57:25 +0200 Subject: [PATCH] test(05-02): add failing RED tests for cloud_utils, cloud_cache, and factory - 11 SSRF validation tests (validate_cloud_url) covering RFC-1918, loopback, link-local, localhost, IPv6 - 7 HKDF credential encryption/decryption round-trip tests (encrypt_credentials, decrypt_credentials) - 9 TTLCache singleton tests (maxsize=1000, ttl=60, thread-safe lock, get/invalidate helpers) - 2 storage factory import tests (get_storage_backend_for_document importable) --- backend/tests/test_cloud_utils.py | 271 ++++++++++++++++++++++++++++++ 1 file changed, 271 insertions(+) create mode 100644 backend/tests/test_cloud_utils.py diff --git a/backend/tests/test_cloud_utils.py b/backend/tests/test_cloud_utils.py new file mode 100644 index 0000000..00a41ed --- /dev/null +++ b/backend/tests/test_cloud_utils.py @@ -0,0 +1,271 @@ +""" +Tests for cloud_utils.py (HKDF credential encryption + SSRF validation) +and cloud_cache.py (TTLCache singleton). + +Plan 05-02: RED phase tests — these tests define the expected behavior +before implementation exists. Tests for functions not yet implemented will +ImportError (which counts as a failure, confirming the RED state). + +Requirements: CLOUD-02 (credential encryption), D-17 (SSRF prevention), CLOUD-07 (factory). +""" +from __future__ import annotations + +import pytest + + +# ── SSRF validation tests (D-17) ───────────────────────────────────────────── + + +class TestValidateCloudUrl: + """Test SSRF prevention in validate_cloud_url.""" + + def test_blocks_loopback_ipv4(self): + """127.0.0.1 must raise ValueError.""" + from storage.cloud_utils import validate_cloud_url + with pytest.raises(ValueError): + validate_cloud_url("http://127.0.0.1/dav") + + def test_blocks_rfc1918_10(self): + """10.x.x.x (RFC-1918 class A) must raise ValueError.""" + from storage.cloud_utils import validate_cloud_url + with pytest.raises(ValueError): + validate_cloud_url("http://10.0.0.1/dav") + + def test_blocks_rfc1918_172(self): + """172.16.x.x through 172.31.x.x (RFC-1918 class B) must raise ValueError.""" + from storage.cloud_utils import validate_cloud_url + with pytest.raises(ValueError): + validate_cloud_url("http://172.16.0.1/dav") + + def test_blocks_rfc1918_192(self): + """192.168.x.x (RFC-1918 class C) must raise ValueError.""" + from storage.cloud_utils import validate_cloud_url + with pytest.raises(ValueError): + validate_cloud_url("http://192.168.1.1/dav") + + def test_blocks_link_local(self): + """169.254.x.x link-local must raise ValueError (AWS metadata endpoint).""" + from storage.cloud_utils import validate_cloud_url + with pytest.raises(ValueError): + validate_cloud_url("http://169.254.169.254/dav") + + def test_blocks_localhost_string(self): + """'localhost' hostname must raise ValueError before DNS resolution.""" + from storage.cloud_utils import validate_cloud_url + with pytest.raises(ValueError): + validate_cloud_url("http://localhost/dav") + + def test_blocks_ipv6_loopback(self): + """::1 (IPv6 loopback) must raise ValueError.""" + from storage.cloud_utils import validate_cloud_url + with pytest.raises(ValueError): + validate_cloud_url("http://[::1]/dav") + + def test_rejects_non_http_scheme(self): + """ftp:// scheme must raise ValueError.""" + from storage.cloud_utils import validate_cloud_url + with pytest.raises(ValueError): + validate_cloud_url("ftp://example.com/dav") + + def test_rejects_missing_hostname(self): + """URL with no hostname must raise ValueError.""" + from storage.cloud_utils import validate_cloud_url + with pytest.raises(ValueError): + validate_cloud_url("http:///path") + + def test_allows_public_https(self): + """Public HTTPS URL must NOT raise ValueError.""" + from storage.cloud_utils import validate_cloud_url + # This should resolve to a real public IP — no exception expected + validate_cloud_url("https://cloud.example.com/remote.php/dav") + + def test_allows_http_public(self): + """Public HTTP URL must NOT raise ValueError.""" + from storage.cloud_utils import validate_cloud_url + # Public IP in 8.8.0.0/16 range (Google DNS segment, definitely not RFC-1918) + validate_cloud_url("http://8.8.8.8/dav") + + +# ── HKDF credential encryption tests (CLOUD-02) ────────────────────────────── + + +class TestEncryptDecryptCredentials: + """Test HKDF+Fernet credential encryption/decryption round-trip.""" + + MASTER_KEY = b"test-master-key-32bytes-padded!!" + USER_ID = "550e8400-e29b-41d4-a716-446655440000" + + def test_round_trip_simple(self): + """encrypt then decrypt must return the original dict.""" + from storage.cloud_utils import encrypt_credentials, decrypt_credentials + creds = {"access_token": "ya29.xxx", "refresh_token": "1//xxx"} + enc = encrypt_credentials(self.MASTER_KEY, self.USER_ID, creds) + dec = decrypt_credentials(self.MASTER_KEY, self.USER_ID, enc) + assert dec == creds + + def test_encrypted_is_string(self): + """encrypt_credentials must return a str (base64 Fernet token).""" + from storage.cloud_utils import encrypt_credentials + creds = {"access_token": "ya29.xxx"} + enc = encrypt_credentials(self.MASTER_KEY, self.USER_ID, creds) + assert isinstance(enc, str) + + def test_plaintext_not_in_ciphertext(self): + """Plaintext credential values must NOT appear in the encrypted string.""" + from storage.cloud_utils import encrypt_credentials + creds = {"access_token": "ya29.abc123secret", "refresh_token": "1//refresh456"} + enc = encrypt_credentials(self.MASTER_KEY, self.USER_ID, creds) + assert "ya29.abc123secret" not in enc + assert "refresh456" not in enc + assert "access_token" not in enc + + def test_different_users_produce_different_ciphertext(self): + """Different user_ids must produce different ciphertexts for same plaintext.""" + from storage.cloud_utils import encrypt_credentials + creds = {"access_token": "same_token"} + enc1 = encrypt_credentials(self.MASTER_KEY, "user-id-aaaa-0000", creds) + enc2 = encrypt_credentials(self.MASTER_KEY, "user-id-bbbb-1111", creds) + # Different per-user keys → different Fernet tokens + assert enc1 != enc2 + + def test_hkdf_not_reused(self): + """Calling encrypt twice must not raise AlreadyFinalized (fresh HKDF per call).""" + from storage.cloud_utils import encrypt_credentials + creds = {"token": "abc"} + # Two successive calls — if HKDF instance were reused, the second would raise + enc1 = encrypt_credentials(self.MASTER_KEY, self.USER_ID, creds) + enc2 = encrypt_credentials(self.MASTER_KEY, self.USER_ID, creds) + # Both should succeed (not raise) + assert isinstance(enc1, str) + assert isinstance(enc2, str) + + def test_round_trip_nested_dict(self): + """Round-trip works for a nested dict with expiry datetime string.""" + from storage.cloud_utils import encrypt_credentials, decrypt_credentials + creds = { + "access_token": "ya29.xxx", + "refresh_token": "1//xxx", + "expiry": "2026-05-28T15:00:00", + "metadata": {"scope": "drive.file"}, + } + enc = encrypt_credentials(self.MASTER_KEY, self.USER_ID, creds) + dec = decrypt_credentials(self.MASTER_KEY, self.USER_ID, enc) + assert dec == creds + + def test_wrong_user_id_fails_decrypt(self): + """Decrypting with a different user_id must raise an error.""" + from storage.cloud_utils import encrypt_credentials, decrypt_credentials + from cryptography.fernet import InvalidToken + creds = {"access_token": "secret"} + enc = encrypt_credentials(self.MASTER_KEY, "user-aaa", creds) + with pytest.raises((InvalidToken, Exception)): + decrypt_credentials(self.MASTER_KEY, "user-bbb", enc) + + +# ── TTLCache singleton tests (Pattern 8) ────────────────────────────────────── + + +class TestTTLCacheSingleton: + """Test cloud_cache.py module-level TTLCache configuration.""" + + def test_cache_maxsize(self): + """_folder_cache must have maxsize=1000.""" + from services.cloud_cache import _folder_cache + assert _folder_cache.maxsize == 1000 + + def test_cache_ttl(self): + """_folder_cache must have ttl=60.""" + from services.cloud_cache import _folder_cache + assert _folder_cache.ttl == 60 + + def test_lock_is_threading_lock(self): + """_folder_cache_lock must be a threading.Lock (or RLock).""" + import threading + from services.cloud_cache import _folder_cache_lock + # Both Lock and RLock are acceptable — check acquire/release protocol + assert hasattr(_folder_cache_lock, "acquire") + assert hasattr(_folder_cache_lock, "release") + + def test_exports_get_cloud_folders_cached(self): + """cloud_cache must export the async get_cloud_folders_cached function.""" + import asyncio + from services.cloud_cache import get_cloud_folders_cached + # Must be a coroutine function + assert asyncio.iscoroutinefunction(get_cloud_folders_cached) + + def test_exports_invalidate_provider_cache(self): + """cloud_cache must export the sync invalidate_provider_cache function.""" + import asyncio + from services.cloud_cache import invalidate_provider_cache + # Must be a regular (not coroutine) function + assert not asyncio.iscoroutinefunction(invalidate_provider_cache) + + @pytest.mark.asyncio + async def test_get_cloud_folders_cached_caches_result(self): + """get_cloud_folders_cached must return cached result on second call.""" + import asyncio + from services.cloud_cache import get_cloud_folders_cached, _folder_cache + + call_count = 0 + + async def fetch_fn(): + nonlocal call_count + call_count += 1 + return [{"id": "folder1", "name": "Documents"}] + + user_id = "test-user-99" + provider = "test_provider" + folder_id = "root" + + # First call — should invoke fetch_fn + result1 = await get_cloud_folders_cached(user_id, provider, folder_id, fetch_fn) + # Second call — should use cache, not call fetch_fn again + result2 = await get_cloud_folders_cached(user_id, provider, folder_id, fetch_fn) + + assert result1 == [{"id": "folder1", "name": "Documents"}] + assert result2 == result1 + assert call_count == 1 # fetch_fn called only once + + @pytest.mark.asyncio + async def test_invalidate_clears_entries(self): + """invalidate_provider_cache must remove all entries for user+provider.""" + from services.cloud_cache import get_cloud_folders_cached, invalidate_provider_cache, _folder_cache + + async def fetch_fn(): + return [{"id": "x"}] + + user_id = "user-invalidate-test" + provider = "myprovider" + + # Populate cache + await get_cloud_folders_cached(user_id, provider, "folder-a", fetch_fn) + await get_cloud_folders_cached(user_id, provider, "folder-b", fetch_fn) + + # Invalidate + invalidate_provider_cache(user_id, provider) + + # Cache entries should be gone + key_a = f"{user_id}:{provider}:folder-a" + key_b = f"{user_id}:{provider}:folder-b" + assert key_a not in _folder_cache + assert key_b not in _folder_cache + + +# ── Storage factory tests (CLOUD-07) ───────────────────────────────────────── + + +class TestStorageFactoryImport: + """Test that get_storage_backend_for_document is importable from storage.""" + + def test_import_factory_function(self): + """get_storage_backend_for_document must be importable from storage.""" + import asyncio + from storage import get_storage_backend_for_document + assert asyncio.iscoroutinefunction(get_storage_backend_for_document) + + def test_existing_factory_unchanged(self): + """Existing get_storage_backend() factory must still be importable.""" + from storage import get_storage_backend + from storage.minio_backend import MinIOBackend + backend = get_storage_backend() + assert isinstance(backend, MinIOBackend)