From bcb887e61de8e3b0777e3a655a66810a4c4b7bf2 Mon Sep 17 00:00:00 2001 From: curo1305 Date: Thu, 28 May 2026 21:10:56 +0200 Subject: [PATCH] =?UTF-8?q?feat(05-03):=20implement=20OneDriveBackend=20?= =?UTF-8?q?=E2=80=94=20Microsoft=20Graph=20StorageBackend?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - CloudConnectionError imported from google_drive_backend (shared exception type) - CHUNK_SIZE = 10 * 1024 * 1024 (10 MB — above Graph 4 MB limit, Pitfall 6) - All 7 StorageBackend methods implemented as async coroutines - Resumable upload sessions (createUploadSession) used for ALL uploads - _ensure_valid_token() checks expiry with 60s buffer, calls _refresh_token() if expired - _refresh_token() wraps msal.ConfidentialClientApplication in asyncio.to_thread() - invalid_grant → CloudConnectionError(reason='invalid_grant') per D-06 / B2 design - presigned_get_url and generate_presigned_put_url raise NotImplementedError (D-14) - delete_object silently ignores 404 (no-op per StorageBackend contract) - Backend is stateless — no DB writes (B2 design) --- backend/storage/onedrive_backend.py | 279 ++++++++++++++++++++++++++++ 1 file changed, 279 insertions(+) create mode 100644 backend/storage/onedrive_backend.py diff --git a/backend/storage/onedrive_backend.py b/backend/storage/onedrive_backend.py new file mode 100644 index 0000000..27465eb --- /dev/null +++ b/backend/storage/onedrive_backend.py @@ -0,0 +1,279 @@ +""" +Microsoft Graph / OneDrive StorageBackend implementation for DocuVault. + +Design notes: + - Resumable upload sessions (createUploadSession) are used for ALL uploads, + regardless of file size (Pitfall 6 — Microsoft Graph's simple upload is + limited to 4 MB; resumable sessions handle both small and large files). + - CHUNK_SIZE = 10 MB (above Graph's 4 MB simple upload limit). + - All sync MSAL calls are wrapped in asyncio.to_thread(); httpx calls are + already async and awaited directly. + - CloudConnectionError is imported from google_drive_backend (shared type). + This keeps the exception hierarchy unified across all cloud backends. + - B2 design: This backend is STATELESS. It raises CloudConnectionError but + does NOT update the DB or CloudConnection objects. DB state transitions + (e.g., REQUIRES_REAUTH) are handled by _call_cloud_op() in cloud.py. + - _ensure_valid_token() checks expiry before each API call and calls + _refresh_token() if the token is within 60 seconds of expiry. If the + refresh returns None (invalid_grant), CloudConnectionError is raised. + - Token key format stored in credentials dict: + access_token — current OAuth bearer token + refresh_token — long-lived refresh token + expires_at — ISO 8601 datetime string (when the access_token expires) +""" +from __future__ import annotations + +import asyncio +import datetime +import io +import uuid + +import httpx +import msal + +from config import settings +from storage.base import StorageBackend +from storage.google_drive_backend import CloudConnectionError # reuse shared exception + +GRAPH_BASE = "https://graph.microsoft.com/v1.0" +CHUNK_SIZE = 10 * 1024 * 1024 # 10 MB — above Graph's 4 MB simple upload limit (Pitfall 6) + + +class OneDriveBackend(StorageBackend): + """Microsoft Graph / OneDrive implementation of StorageBackend. + + Uses MSAL for token management and httpx for async HTTP to Microsoft Graph. + All sync MSAL calls are wrapped in asyncio.to_thread(). The backend is + stateless — it raises CloudConnectionError but never writes to the DB. + """ + + def __init__(self, credentials: dict) -> None: + """Initialise with a decrypted credentials dict. + + Expected keys: + access_token str — current OAuth bearer token + refresh_token str — long-lived refresh token + expires_at str — ISO 8601 datetime string (when access_token expires) + """ + self._credentials = credentials + + # ── Internal helpers ────────────────────────────────────────────────────── + + def _auth_headers(self) -> dict: + """Return Authorization header with current access token.""" + return {"Authorization": f"Bearer {self._credentials['access_token']}"} + + async def _ensure_valid_token(self) -> None: + """Check if the access token is expired (within 60 s buffer) and refresh if so. + + Raises: + CloudConnectionError(reason='invalid_grant') — if the refresh token has + been revoked (MSAL returns result['error'] == 'invalid_grant'). + """ + expires_at_str = self._credentials.get("expires_at", "") + if expires_at_str: + try: + expires_at = datetime.datetime.fromisoformat(expires_at_str) + # Make expires_at timezone-aware if needed for comparison + if expires_at.tzinfo is None: + expires_at = expires_at.replace(tzinfo=datetime.timezone.utc) + now = datetime.datetime.now(tz=datetime.timezone.utc) + if expires_at > now + datetime.timedelta(seconds=60): + # Token is still valid — no refresh needed + return + except ValueError: + # Unparseable expiry — attempt refresh to be safe + pass + + # Token expired or expiry unparseable — attempt refresh + new_creds = await self._refresh_token() + if new_creds is None: + raise CloudConnectionError( + "OneDrive connection requires re-authentication (invalid_grant)", + reason="invalid_grant", + ) + self._credentials = new_creds + + async def _refresh_token(self) -> dict | None: + """Refresh the access token via MSAL. + + Wraps the sync MSAL call in asyncio.to_thread() to avoid blocking + the event loop. + + Returns: + Updated credentials dict on success. + None if MSAL returns result['error'] == 'invalid_grant' (D-06). + """ + + def _msal_refresh() -> dict | None: + app = msal.ConfidentialClientApplication( + client_id=settings.onedrive_client_id, + client_credential=settings.onedrive_client_secret, + authority=f"https://login.microsoftonline.com/{settings.onedrive_tenant_id}", + ) + result = app.acquire_token_by_refresh_token( + self._credentials["refresh_token"], + scopes=["Files.ReadWrite", "offline_access"], + ) + if result.get("error") == "invalid_grant": + return None # Signal to _ensure_valid_token to raise CloudConnectionError + if "access_token" not in result: + # Unexpected MSAL error — treat as token_expired for retry logic + return None + # Build new credentials dict with refreshed tokens + expires_in = result.get("expires_in", 3600) + expires_at = ( + datetime.datetime.now(tz=datetime.timezone.utc) + + datetime.timedelta(seconds=expires_in) + ).isoformat() + return { + "access_token": result["access_token"], + "refresh_token": result.get( + "refresh_token", self._credentials["refresh_token"] + ), + "expires_at": expires_at, + } + + return await asyncio.to_thread(_msal_refresh) + + # ── StorageBackend interface ────────────────────────────────────────────── + + async def put_object( + self, + user_id: str, + document_id: str, + file_bytes: bytes, + extension: str, + content_type: str, + ) -> str: + """Upload bytes to OneDrive via a resumable upload session. + + Uses createUploadSession for ALL files (Pitfall 6 — avoids 4 MB limit + of simple upload). Chunks file in CHUNK_SIZE (10 MB) slices. + + Returns the OneDrive item_id as object_key. + """ + await self._ensure_valid_token() + + # Path within the user's OneDrive: docuvault/{user_id}/{document_id}{extension} + remote_path = f"docuvault/{user_id}/{document_id}{extension}" + create_session_url = ( + f"{GRAPH_BASE}/me/drive/root:/{remote_path}:/createUploadSession" + ) + + async with httpx.AsyncClient() as client: + # Step 1: Create a resumable upload session + session_response = await client.post( + create_session_url, + headers={**self._auth_headers(), "Content-Type": "application/json"}, + json={"item": {"@microsoft.graph.conflictBehavior": "replace"}}, + ) + session_response.raise_for_status() + upload_url = session_response.json()["uploadUrl"] + + # Step 2: Upload file in CHUNK_SIZE chunks + total_size = len(file_bytes) + item_id: str = "" + offset = 0 + + while offset < total_size: + chunk = file_bytes[offset : offset + CHUNK_SIZE] + chunk_size = len(chunk) + end = offset + chunk_size - 1 + + chunk_response = await client.put( + upload_url, + content=chunk, + headers={ + "Content-Length": str(chunk_size), + "Content-Range": f"bytes {offset}-{end}/{total_size}", + "Content-Type": content_type, + }, + ) + chunk_response.raise_for_status() + + # The final chunk response contains the item metadata with id + if chunk_response.status_code in (200, 201): + item_id = chunk_response.json().get("id", "") + + offset += chunk_size + + return item_id + + async def get_object(self, object_key: str) -> bytes: + """Download file bytes from OneDrive by item_id. + + Follows redirects — Microsoft Graph returns a redirect to the CDN URL. + """ + await self._ensure_valid_token() + async with httpx.AsyncClient() as client: + r = await client.get( + f"{GRAPH_BASE}/me/drive/items/{object_key}/content", + headers=self._auth_headers(), + follow_redirects=True, + ) + r.raise_for_status() + return r.content + + async def delete_object(self, object_key: str) -> None: + """Delete a OneDrive item by item_id. Silent no-op on 404.""" + await self._ensure_valid_token() + async with httpx.AsyncClient() as client: + r = await client.delete( + f"{GRAPH_BASE}/me/drive/items/{object_key}", + headers=self._auth_headers(), + ) + if r.status_code not in (204, 404): + r.raise_for_status() + + async def presigned_get_url(self, object_key: str, expires_minutes: int = 60) -> str: + """Not supported by OneDrive — raises NotImplementedError (D-14). + + Use get_object() for direct streaming instead. + """ + raise NotImplementedError( + "OneDrive backend does not support presigned URLs — " + "use get_object() for streaming" + ) + + async def generate_presigned_put_url( + self, object_key: str, expires_minutes: int = 15 + ) -> str: + """Not supported by OneDrive — raises NotImplementedError (D-14). + + Use put_object() for direct upload via FastAPI intermediary. + """ + raise NotImplementedError( + "OneDrive backend does not support presigned put URLs — " + "use put_object() for direct upload" + ) + + async def stat_object(self, object_key: str) -> int: + """Return the file size in bytes from OneDrive item metadata.""" + await self._ensure_valid_token() + async with httpx.AsyncClient() as client: + r = await client.get( + f"{GRAPH_BASE}/me/drive/items/{object_key}", + params={"$select": "size"}, + headers=self._auth_headers(), + ) + r.raise_for_status() + return int(r.json().get("size", 0)) + + async def health_check(self) -> bool: + """Return True if the OneDrive service is reachable and the token is valid. + + Makes a minimal GET /me/drive request selecting only the id field. + Returns False on any error. + """ + try: + await self._ensure_valid_token() + async with httpx.AsyncClient() as client: + r = await client.get( + f"{GRAPH_BASE}/me/drive", + params={"$select": "id"}, + headers=self._auth_headers(), + ) + return r.is_success + except Exception: + return False