""" Microsoft Graph / OneDrive StorageBackend implementation for DocuVault. Design notes: - Resumable upload sessions (createUploadSession) are used for ALL uploads, regardless of file size (Pitfall 6 — Microsoft Graph's simple upload is limited to 4 MB; resumable sessions handle both small and large files). - CHUNK_SIZE = 10 MB (above Graph's 4 MB simple upload limit). - All sync MSAL calls are wrapped in asyncio.to_thread(); httpx calls are already async and awaited directly. - CloudConnectionError is imported from google_drive_backend (shared type). This keeps the exception hierarchy unified across all cloud backends. - B2 design: This backend is STATELESS. It raises CloudConnectionError but does NOT update the DB or CloudConnection objects. DB state transitions (e.g., REQUIRES_REAUTH) are handled by _call_cloud_op() in cloud.py. - _ensure_valid_token() checks expiry before each API call and calls _refresh_token() if the token is within 60 seconds of expiry. If the refresh returns None (invalid_grant), CloudConnectionError is raised. - Token key format stored in credentials dict: access_token — current OAuth bearer token refresh_token — long-lived refresh token expires_at — ISO 8601 datetime string (when the access_token expires) """ from __future__ import annotations import asyncio import datetime import io import uuid import httpx import msal from config import settings from storage.base import StorageBackend from storage.google_drive_backend import CloudConnectionError # reuse shared exception GRAPH_BASE = "https://graph.microsoft.com/v1.0" CHUNK_SIZE = 10 * 1024 * 1024 # 10 MB — above Graph's 4 MB simple upload limit (Pitfall 6) class OneDriveBackend(StorageBackend): """Microsoft Graph / OneDrive implementation of StorageBackend. Uses MSAL for token management and httpx for async HTTP to Microsoft Graph. All sync MSAL calls are wrapped in asyncio.to_thread(). The backend is stateless — it raises CloudConnectionError but never writes to the DB. """ def __init__(self, credentials: dict) -> None: """Initialise with a decrypted credentials dict. Expected keys: access_token str — current OAuth bearer token refresh_token str — long-lived refresh token expires_at str — ISO 8601 datetime string (when access_token expires) """ self._credentials = credentials # ── Internal helpers ────────────────────────────────────────────────────── def _auth_headers(self) -> dict: """Return Authorization header with current access token.""" return {"Authorization": f"Bearer {self._credentials['access_token']}"} async def _ensure_valid_token(self) -> None: """Check if the access token is expired (within 60 s buffer) and refresh if so. Raises: CloudConnectionError(reason='invalid_grant') — if the refresh token has been revoked (MSAL returns result['error'] == 'invalid_grant'). """ expires_at_str = self._credentials.get("expires_at", "") if expires_at_str: try: expires_at = datetime.datetime.fromisoformat(expires_at_str) # Make expires_at timezone-aware if needed for comparison if expires_at.tzinfo is None: expires_at = expires_at.replace(tzinfo=datetime.timezone.utc) now = datetime.datetime.now(tz=datetime.timezone.utc) if expires_at > now + datetime.timedelta(seconds=60): # Token is still valid — no refresh needed return except ValueError: # Unparseable expiry — attempt refresh to be safe pass # Token expired or expiry unparseable — attempt refresh new_creds = await self._refresh_token() if new_creds is None: raise CloudConnectionError( "OneDrive connection requires re-authentication (invalid_grant)", reason="invalid_grant", ) self._credentials = new_creds async def _refresh_token(self) -> dict | None: """Refresh the access token via MSAL. Wraps the sync MSAL call in asyncio.to_thread() to avoid blocking the event loop. Returns: Updated credentials dict on success. None if MSAL returns result['error'] == 'invalid_grant' (D-06). """ def _msal_refresh() -> dict | None: app = msal.ConfidentialClientApplication( client_id=settings.onedrive_client_id, client_credential=settings.onedrive_client_secret, authority=f"https://login.microsoftonline.com/{settings.onedrive_tenant_id}", ) result = app.acquire_token_by_refresh_token( self._credentials["refresh_token"], scopes=["Files.ReadWrite", "offline_access"], ) if result.get("error") == "invalid_grant": return None # Signal to _ensure_valid_token to raise CloudConnectionError if "access_token" not in result: # Unexpected MSAL error — treat as token_expired for retry logic return None # Build new credentials dict with refreshed tokens expires_in = result.get("expires_in", 3600) expires_at = ( datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(seconds=expires_in) ).isoformat() return { "access_token": result["access_token"], "refresh_token": result.get( "refresh_token", self._credentials["refresh_token"] ), "expires_at": expires_at, } return await asyncio.to_thread(_msal_refresh) # ── StorageBackend interface ────────────────────────────────────────────── async def put_object( self, user_id: str, document_id: str, file_bytes: bytes, extension: str, content_type: str, ) -> str: """Upload bytes to OneDrive via a resumable upload session. Uses createUploadSession for ALL files (Pitfall 6 — avoids 4 MB limit of simple upload). Chunks file in CHUNK_SIZE (10 MB) slices. Returns the OneDrive item_id as object_key. """ await self._ensure_valid_token() # Path within the user's OneDrive: docuvault/{user_id}/{document_id}{extension} remote_path = f"docuvault/{user_id}/{document_id}{extension}" create_session_url = ( f"{GRAPH_BASE}/me/drive/root:/{remote_path}:/createUploadSession" ) async with httpx.AsyncClient() as client: # Step 1: Create a resumable upload session session_response = await client.post( create_session_url, headers={**self._auth_headers(), "Content-Type": "application/json"}, json={"item": {"@microsoft.graph.conflictBehavior": "replace"}}, ) session_response.raise_for_status() upload_url = session_response.json()["uploadUrl"] # Step 2: Upload file in CHUNK_SIZE chunks total_size = len(file_bytes) item_id: str = "" offset = 0 while offset < total_size: chunk = file_bytes[offset : offset + CHUNK_SIZE] chunk_size = len(chunk) end = offset + chunk_size - 1 chunk_response = await client.put( upload_url, content=chunk, headers={ "Content-Length": str(chunk_size), "Content-Range": f"bytes {offset}-{end}/{total_size}", "Content-Type": content_type, }, ) chunk_response.raise_for_status() # The final chunk response contains the item metadata with id if chunk_response.status_code in (200, 201): item_id = chunk_response.json().get("id", "") offset += chunk_size return item_id async def get_object(self, object_key: str) -> bytes: """Download file bytes from OneDrive by item_id. Follows redirects — Microsoft Graph returns a redirect to the CDN URL. """ await self._ensure_valid_token() async with httpx.AsyncClient() as client: r = await client.get( f"{GRAPH_BASE}/me/drive/items/{object_key}/content", headers=self._auth_headers(), follow_redirects=True, ) r.raise_for_status() return r.content async def delete_object(self, object_key: str) -> None: """Delete a OneDrive item by item_id. Silent no-op on 404.""" await self._ensure_valid_token() async with httpx.AsyncClient() as client: r = await client.delete( f"{GRAPH_BASE}/me/drive/items/{object_key}", headers=self._auth_headers(), ) if r.status_code not in (204, 404): r.raise_for_status() async def presigned_get_url(self, object_key: str, expires_minutes: int = 60) -> str: """Not supported by OneDrive — raises NotImplementedError (D-14). Use get_object() for direct streaming instead. """ raise NotImplementedError( "OneDrive backend does not support presigned URLs — " "use get_object() for streaming" ) async def generate_presigned_put_url( self, object_key: str, expires_minutes: int = 15 ) -> str: """Not supported by OneDrive — raises NotImplementedError (D-14). Use put_object() for direct upload via FastAPI intermediary. """ raise NotImplementedError( "OneDrive backend does not support presigned put URLs — " "use put_object() for direct upload" ) async def stat_object(self, object_key: str) -> int: """Return the file size in bytes from OneDrive item metadata.""" await self._ensure_valid_token() async with httpx.AsyncClient() as client: r = await client.get( f"{GRAPH_BASE}/me/drive/items/{object_key}", params={"$select": "size"}, headers=self._auth_headers(), ) r.raise_for_status() return int(r.json().get("size", 0)) async def health_check(self) -> bool: """Return True if the OneDrive service is reachable and the token is valid. Makes a minimal GET /me/drive request selecting only the id field. Returns False on any error. """ try: await self._ensure_valid_token() async with httpx.AsyncClient() as client: r = await client.get( f"{GRAPH_BASE}/me/drive", params={"$select": "id"}, headers=self._auth_headers(), ) return r.is_success except Exception: return False