feat(05-02): implement cloud_cache.py and extend storage factory
- cloud_cache.py: module-level TTLCache(maxsize=1000, ttl=60) singleton with threading.Lock for concurrent access safety (RESEARCH.md Pattern 8 / D-16) - get_cloud_folders_cached(): async function; calls fetch_fn OUTSIDE the lock to avoid blocking the event loop during cloud API calls - invalidate_provider_cache(): removes all cache entries for a user+provider prefix - storage/__init__.py: adds get_storage_backend_for_document() async factory — returns MinIOBackend for minio docs; queries CloudConnection (scoped to user.id), decrypts credentials, and lazy-imports cloud backends to avoid circular imports — raises HTTPException(503) if connection missing or not ACTIVE (T-05-02-04)
This commit is contained in:
@@ -0,0 +1,94 @@
|
|||||||
|
"""
|
||||||
|
Cloud folder listing cache for DocuVault.
|
||||||
|
|
||||||
|
Provides a module-level TTLCache singleton for caching cloud provider folder
|
||||||
|
listings with a 60-second TTL (D-16: live API calls with 60-second in-memory TTL).
|
||||||
|
|
||||||
|
Thread-safety: cachetools.TTLCache is NOT thread-safe by itself. A threading.Lock
|
||||||
|
is required for all reads and writes (RESEARCH.md Pattern 8). The fetch function
|
||||||
|
is called OUTSIDE the lock to prevent blocking the asyncio event loop while an
|
||||||
|
outbound cloud API call is in flight.
|
||||||
|
|
||||||
|
Cache key scheme: "{user_id}:{provider}:{folder_id}"
|
||||||
|
- user_id ensures strict per-user isolation
|
||||||
|
- provider namespace-separates entries from different cloud backends
|
||||||
|
- folder_id identifies the specific folder whose listing is cached
|
||||||
|
|
||||||
|
References:
|
||||||
|
RESEARCH.md Pattern 8 — TTLCache thread safety + asyncio integration
|
||||||
|
D-16 — 60-second TTL, in-memory cache (not Redis)
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import threading
|
||||||
|
from typing import Any, Callable, Awaitable
|
||||||
|
|
||||||
|
from cachetools import TTLCache
|
||||||
|
|
||||||
|
|
||||||
|
# Module-level singleton: maxsize=1000 (sufficient for ~50 users × 20 folders),
|
||||||
|
# ttl=60 seconds per D-16.
|
||||||
|
_folder_cache: TTLCache = TTLCache(maxsize=1000, ttl=60)
|
||||||
|
|
||||||
|
# Lock required for all _folder_cache access (cachetools.TTLCache is not thread-safe)
|
||||||
|
_folder_cache_lock = threading.Lock()
|
||||||
|
|
||||||
|
|
||||||
|
async def get_cloud_folders_cached(
|
||||||
|
user_id: str,
|
||||||
|
provider: str,
|
||||||
|
folder_id: str,
|
||||||
|
fetch_fn: Callable[[], Awaitable[list]],
|
||||||
|
) -> list:
|
||||||
|
"""Return cached folder listing, or call fetch_fn and cache the result.
|
||||||
|
|
||||||
|
The cache key is "{user_id}:{provider}:{folder_id}".
|
||||||
|
|
||||||
|
The fetch_fn coroutine is awaited OUTSIDE the lock so that a slow cloud
|
||||||
|
API call does not block other asyncio tasks from acquiring the lock.
|
||||||
|
A race condition where two concurrent callers both miss the cache and
|
||||||
|
both call fetch_fn is acceptable — the second result overwrites the first,
|
||||||
|
and both callers receive consistent data.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
user_id: The authenticated user's UUID string.
|
||||||
|
provider: The cloud provider identifier (e.g. "google_drive").
|
||||||
|
folder_id: The provider-native folder/directory identifier.
|
||||||
|
fetch_fn: An async callable (no arguments) that returns the folder listing
|
||||||
|
list when called. Only invoked on cache miss.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The folder listing list (from cache or fresh from fetch_fn).
|
||||||
|
"""
|
||||||
|
cache_key = f"{user_id}:{provider}:{folder_id}"
|
||||||
|
|
||||||
|
# Check cache under lock
|
||||||
|
with _folder_cache_lock:
|
||||||
|
if cache_key in _folder_cache:
|
||||||
|
return _folder_cache[cache_key]
|
||||||
|
|
||||||
|
# Cache miss — call fetch_fn outside the lock to not block the event loop
|
||||||
|
result = await fetch_fn()
|
||||||
|
|
||||||
|
# Store result in cache under lock
|
||||||
|
with _folder_cache_lock:
|
||||||
|
_folder_cache[cache_key] = result
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def invalidate_provider_cache(user_id: str, provider: str) -> None:
|
||||||
|
"""Invalidate all cached folder listings for a specific user + provider.
|
||||||
|
|
||||||
|
Called when a cloud connection is disconnected, credentials are updated,
|
||||||
|
or any event that makes the cached listings stale.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
user_id: The authenticated user's UUID string.
|
||||||
|
provider: The cloud provider identifier to invalidate.
|
||||||
|
"""
|
||||||
|
prefix = f"{user_id}:{provider}:"
|
||||||
|
with _folder_cache_lock:
|
||||||
|
keys_to_delete = [k for k in list(_folder_cache.keys()) if k.startswith(prefix)]
|
||||||
|
for key in keys_to_delete:
|
||||||
|
del _folder_cache[key]
|
||||||
@@ -4,12 +4,29 @@ Storage backend factory for DocuVault.
|
|||||||
Mirrors backend/ai/__init__.py — exposes a get_storage_backend() factory
|
Mirrors backend/ai/__init__.py — exposes a get_storage_backend() factory
|
||||||
that returns the configured StorageBackend implementation.
|
that returns the configured StorageBackend implementation.
|
||||||
|
|
||||||
Phase 1 always returns MinIOBackend. Phase 5 will extend this factory to
|
Phase 1: get_storage_backend() always returns MinIOBackend.
|
||||||
support OneDrive, Google Drive, Nextcloud, and WebDAV backends.
|
Phase 5: get_storage_backend_for_document() extends the factory to support
|
||||||
|
OneDrive, Google Drive, Nextcloud, and WebDAV backends based on the document's
|
||||||
|
storage_backend field and the user's active cloud connections.
|
||||||
|
|
||||||
|
Security notes:
|
||||||
|
- get_storage_backend_for_document() is the only function that decrypts cloud
|
||||||
|
credentials. It receives the user object from get_regular_user dep, ensuring
|
||||||
|
the CloudConnection query is always scoped to the authenticated user (T-05-02-04).
|
||||||
|
- Cloud backend modules are imported lazily inside the function body to avoid
|
||||||
|
circular imports at module load time (RESEARCH.md Pattern 9).
|
||||||
"""
|
"""
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from fastapi import HTTPException
|
||||||
|
from sqlalchemy import select
|
||||||
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
|
from config import settings
|
||||||
|
from db.models import CloudConnection, Document, User
|
||||||
from storage.base import StorageBackend
|
from storage.base import StorageBackend
|
||||||
from storage.minio_backend import MinIOBackend
|
from storage.minio_backend import MinIOBackend
|
||||||
from config import settings
|
from storage.cloud_utils import decrypt_credentials
|
||||||
|
|
||||||
|
|
||||||
def get_storage_backend() -> StorageBackend:
|
def get_storage_backend() -> StorageBackend:
|
||||||
@@ -31,3 +48,78 @@ def get_storage_backend() -> StorageBackend:
|
|||||||
secure=False,
|
secure=False,
|
||||||
public_endpoint=public_ep,
|
public_endpoint=public_ep,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def get_storage_backend_for_document(
|
||||||
|
document: Document,
|
||||||
|
user: User,
|
||||||
|
session: AsyncSession,
|
||||||
|
) -> StorageBackend:
|
||||||
|
"""Return the correct StorageBackend for the given document.
|
||||||
|
|
||||||
|
For MinIO documents (storage_backend == 'minio'), returns the shared
|
||||||
|
MinIOBackend instance via the existing get_storage_backend() factory.
|
||||||
|
|
||||||
|
For cloud documents, loads the user's active CloudConnection from the DB,
|
||||||
|
decrypts credentials using HKDF-derived per-user Fernet key (D-18), and
|
||||||
|
instantiates the appropriate backend class.
|
||||||
|
|
||||||
|
Security:
|
||||||
|
- CloudConnection query is scoped to user.id — cross-user access is
|
||||||
|
impossible via this function (T-05-02-04, CLOUD-IDOR).
|
||||||
|
- Cloud backend classes are imported lazily to avoid circular imports
|
||||||
|
(RESEARCH.md Pattern 9 anti-pattern note).
|
||||||
|
- Raises HTTPException(503) if the connection is missing or not ACTIVE —
|
||||||
|
callers receive a clear error, not a silent fallback to MinIO.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
document: The Document ORM instance with a storage_backend field.
|
||||||
|
user: The authenticated User ORM instance (from get_regular_user dep).
|
||||||
|
session: An active async SQLAlchemy session.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A concrete StorageBackend instance for the document's backend.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
HTTPException(503): If the cloud connection is not found or not ACTIVE.
|
||||||
|
ValueError: If the storage_backend value is not recognised.
|
||||||
|
"""
|
||||||
|
if document.storage_backend == "minio":
|
||||||
|
return get_storage_backend()
|
||||||
|
|
||||||
|
# Load the active CloudConnection for this user + provider
|
||||||
|
result = await session.execute(
|
||||||
|
select(CloudConnection).where(
|
||||||
|
CloudConnection.user_id == user.id,
|
||||||
|
CloudConnection.provider == document.storage_backend,
|
||||||
|
CloudConnection.status == "ACTIVE",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
conn = result.scalar_one_or_none()
|
||||||
|
if conn is None:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=503,
|
||||||
|
detail="Cloud connection not found or inactive",
|
||||||
|
)
|
||||||
|
|
||||||
|
# Decrypt per-user credentials (HKDF key derived from master key + user_id)
|
||||||
|
master_key = settings.cloud_creds_key.encode()
|
||||||
|
credentials = decrypt_credentials(master_key, str(user.id), conn.credentials_enc)
|
||||||
|
|
||||||
|
# Lazy imports to avoid circular dependency at module load time
|
||||||
|
provider = document.storage_backend
|
||||||
|
if provider == "google_drive":
|
||||||
|
from storage.google_drive_backend import GoogleDriveBackend # type: ignore[import]
|
||||||
|
return GoogleDriveBackend(credentials)
|
||||||
|
elif provider == "onedrive":
|
||||||
|
from storage.onedrive_backend import OneDriveBackend # type: ignore[import]
|
||||||
|
return OneDriveBackend(credentials)
|
||||||
|
elif provider in ("nextcloud", "webdav"):
|
||||||
|
from storage.webdav_backend import WebDAVBackend # type: ignore[import]
|
||||||
|
return WebDAVBackend(
|
||||||
|
credentials["server_url"],
|
||||||
|
credentials["username"],
|
||||||
|
credentials["password"],
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
raise ValueError(f"Unknown storage backend: {document.storage_backend}")
|
||||||
|
|||||||
Reference in New Issue
Block a user