feat(05-03): implement OneDriveBackend — Microsoft Graph StorageBackend
- CloudConnectionError imported from google_drive_backend (shared exception type) - CHUNK_SIZE = 10 * 1024 * 1024 (10 MB — above Graph 4 MB limit, Pitfall 6) - All 7 StorageBackend methods implemented as async coroutines - Resumable upload sessions (createUploadSession) used for ALL uploads - _ensure_valid_token() checks expiry with 60s buffer, calls _refresh_token() if expired - _refresh_token() wraps msal.ConfidentialClientApplication in asyncio.to_thread() - invalid_grant → CloudConnectionError(reason='invalid_grant') per D-06 / B2 design - presigned_get_url and generate_presigned_put_url raise NotImplementedError (D-14) - delete_object silently ignores 404 (no-op per StorageBackend contract) - Backend is stateless — no DB writes (B2 design)
This commit is contained in:
@@ -0,0 +1,279 @@
|
||||
"""
|
||||
Microsoft Graph / OneDrive StorageBackend implementation for DocuVault.
|
||||
|
||||
Design notes:
|
||||
- Resumable upload sessions (createUploadSession) are used for ALL uploads,
|
||||
regardless of file size (Pitfall 6 — Microsoft Graph's simple upload is
|
||||
limited to 4 MB; resumable sessions handle both small and large files).
|
||||
- CHUNK_SIZE = 10 MB (above Graph's 4 MB simple upload limit).
|
||||
- All sync MSAL calls are wrapped in asyncio.to_thread(); httpx calls are
|
||||
already async and awaited directly.
|
||||
- CloudConnectionError is imported from google_drive_backend (shared type).
|
||||
This keeps the exception hierarchy unified across all cloud backends.
|
||||
- B2 design: This backend is STATELESS. It raises CloudConnectionError but
|
||||
does NOT update the DB or CloudConnection objects. DB state transitions
|
||||
(e.g., REQUIRES_REAUTH) are handled by _call_cloud_op() in cloud.py.
|
||||
- _ensure_valid_token() checks expiry before each API call and calls
|
||||
_refresh_token() if the token is within 60 seconds of expiry. If the
|
||||
refresh returns None (invalid_grant), CloudConnectionError is raised.
|
||||
- Token key format stored in credentials dict:
|
||||
access_token — current OAuth bearer token
|
||||
refresh_token — long-lived refresh token
|
||||
expires_at — ISO 8601 datetime string (when the access_token expires)
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import datetime
|
||||
import io
|
||||
import uuid
|
||||
|
||||
import httpx
|
||||
import msal
|
||||
|
||||
from config import settings
|
||||
from storage.base import StorageBackend
|
||||
from storage.google_drive_backend import CloudConnectionError # reuse shared exception
|
||||
|
||||
GRAPH_BASE = "https://graph.microsoft.com/v1.0"
|
||||
CHUNK_SIZE = 10 * 1024 * 1024 # 10 MB — above Graph's 4 MB simple upload limit (Pitfall 6)
|
||||
|
||||
|
||||
class OneDriveBackend(StorageBackend):
|
||||
"""Microsoft Graph / OneDrive implementation of StorageBackend.
|
||||
|
||||
Uses MSAL for token management and httpx for async HTTP to Microsoft Graph.
|
||||
All sync MSAL calls are wrapped in asyncio.to_thread(). The backend is
|
||||
stateless — it raises CloudConnectionError but never writes to the DB.
|
||||
"""
|
||||
|
||||
def __init__(self, credentials: dict) -> None:
|
||||
"""Initialise with a decrypted credentials dict.
|
||||
|
||||
Expected keys:
|
||||
access_token str — current OAuth bearer token
|
||||
refresh_token str — long-lived refresh token
|
||||
expires_at str — ISO 8601 datetime string (when access_token expires)
|
||||
"""
|
||||
self._credentials = credentials
|
||||
|
||||
# ── Internal helpers ──────────────────────────────────────────────────────
|
||||
|
||||
def _auth_headers(self) -> dict:
|
||||
"""Return Authorization header with current access token."""
|
||||
return {"Authorization": f"Bearer {self._credentials['access_token']}"}
|
||||
|
||||
async def _ensure_valid_token(self) -> None:
|
||||
"""Check if the access token is expired (within 60 s buffer) and refresh if so.
|
||||
|
||||
Raises:
|
||||
CloudConnectionError(reason='invalid_grant') — if the refresh token has
|
||||
been revoked (MSAL returns result['error'] == 'invalid_grant').
|
||||
"""
|
||||
expires_at_str = self._credentials.get("expires_at", "")
|
||||
if expires_at_str:
|
||||
try:
|
||||
expires_at = datetime.datetime.fromisoformat(expires_at_str)
|
||||
# Make expires_at timezone-aware if needed for comparison
|
||||
if expires_at.tzinfo is None:
|
||||
expires_at = expires_at.replace(tzinfo=datetime.timezone.utc)
|
||||
now = datetime.datetime.now(tz=datetime.timezone.utc)
|
||||
if expires_at > now + datetime.timedelta(seconds=60):
|
||||
# Token is still valid — no refresh needed
|
||||
return
|
||||
except ValueError:
|
||||
# Unparseable expiry — attempt refresh to be safe
|
||||
pass
|
||||
|
||||
# Token expired or expiry unparseable — attempt refresh
|
||||
new_creds = await self._refresh_token()
|
||||
if new_creds is None:
|
||||
raise CloudConnectionError(
|
||||
"OneDrive connection requires re-authentication (invalid_grant)",
|
||||
reason="invalid_grant",
|
||||
)
|
||||
self._credentials = new_creds
|
||||
|
||||
async def _refresh_token(self) -> dict | None:
|
||||
"""Refresh the access token via MSAL.
|
||||
|
||||
Wraps the sync MSAL call in asyncio.to_thread() to avoid blocking
|
||||
the event loop.
|
||||
|
||||
Returns:
|
||||
Updated credentials dict on success.
|
||||
None if MSAL returns result['error'] == 'invalid_grant' (D-06).
|
||||
"""
|
||||
|
||||
def _msal_refresh() -> dict | None:
|
||||
app = msal.ConfidentialClientApplication(
|
||||
client_id=settings.onedrive_client_id,
|
||||
client_credential=settings.onedrive_client_secret,
|
||||
authority=f"https://login.microsoftonline.com/{settings.onedrive_tenant_id}",
|
||||
)
|
||||
result = app.acquire_token_by_refresh_token(
|
||||
self._credentials["refresh_token"],
|
||||
scopes=["Files.ReadWrite", "offline_access"],
|
||||
)
|
||||
if result.get("error") == "invalid_grant":
|
||||
return None # Signal to _ensure_valid_token to raise CloudConnectionError
|
||||
if "access_token" not in result:
|
||||
# Unexpected MSAL error — treat as token_expired for retry logic
|
||||
return None
|
||||
# Build new credentials dict with refreshed tokens
|
||||
expires_in = result.get("expires_in", 3600)
|
||||
expires_at = (
|
||||
datetime.datetime.now(tz=datetime.timezone.utc)
|
||||
+ datetime.timedelta(seconds=expires_in)
|
||||
).isoformat()
|
||||
return {
|
||||
"access_token": result["access_token"],
|
||||
"refresh_token": result.get(
|
||||
"refresh_token", self._credentials["refresh_token"]
|
||||
),
|
||||
"expires_at": expires_at,
|
||||
}
|
||||
|
||||
return await asyncio.to_thread(_msal_refresh)
|
||||
|
||||
# ── StorageBackend interface ──────────────────────────────────────────────
|
||||
|
||||
async def put_object(
|
||||
self,
|
||||
user_id: str,
|
||||
document_id: str,
|
||||
file_bytes: bytes,
|
||||
extension: str,
|
||||
content_type: str,
|
||||
) -> str:
|
||||
"""Upload bytes to OneDrive via a resumable upload session.
|
||||
|
||||
Uses createUploadSession for ALL files (Pitfall 6 — avoids 4 MB limit
|
||||
of simple upload). Chunks file in CHUNK_SIZE (10 MB) slices.
|
||||
|
||||
Returns the OneDrive item_id as object_key.
|
||||
"""
|
||||
await self._ensure_valid_token()
|
||||
|
||||
# Path within the user's OneDrive: docuvault/{user_id}/{document_id}{extension}
|
||||
remote_path = f"docuvault/{user_id}/{document_id}{extension}"
|
||||
create_session_url = (
|
||||
f"{GRAPH_BASE}/me/drive/root:/{remote_path}:/createUploadSession"
|
||||
)
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
# Step 1: Create a resumable upload session
|
||||
session_response = await client.post(
|
||||
create_session_url,
|
||||
headers={**self._auth_headers(), "Content-Type": "application/json"},
|
||||
json={"item": {"@microsoft.graph.conflictBehavior": "replace"}},
|
||||
)
|
||||
session_response.raise_for_status()
|
||||
upload_url = session_response.json()["uploadUrl"]
|
||||
|
||||
# Step 2: Upload file in CHUNK_SIZE chunks
|
||||
total_size = len(file_bytes)
|
||||
item_id: str = ""
|
||||
offset = 0
|
||||
|
||||
while offset < total_size:
|
||||
chunk = file_bytes[offset : offset + CHUNK_SIZE]
|
||||
chunk_size = len(chunk)
|
||||
end = offset + chunk_size - 1
|
||||
|
||||
chunk_response = await client.put(
|
||||
upload_url,
|
||||
content=chunk,
|
||||
headers={
|
||||
"Content-Length": str(chunk_size),
|
||||
"Content-Range": f"bytes {offset}-{end}/{total_size}",
|
||||
"Content-Type": content_type,
|
||||
},
|
||||
)
|
||||
chunk_response.raise_for_status()
|
||||
|
||||
# The final chunk response contains the item metadata with id
|
||||
if chunk_response.status_code in (200, 201):
|
||||
item_id = chunk_response.json().get("id", "")
|
||||
|
||||
offset += chunk_size
|
||||
|
||||
return item_id
|
||||
|
||||
async def get_object(self, object_key: str) -> bytes:
|
||||
"""Download file bytes from OneDrive by item_id.
|
||||
|
||||
Follows redirects — Microsoft Graph returns a redirect to the CDN URL.
|
||||
"""
|
||||
await self._ensure_valid_token()
|
||||
async with httpx.AsyncClient() as client:
|
||||
r = await client.get(
|
||||
f"{GRAPH_BASE}/me/drive/items/{object_key}/content",
|
||||
headers=self._auth_headers(),
|
||||
follow_redirects=True,
|
||||
)
|
||||
r.raise_for_status()
|
||||
return r.content
|
||||
|
||||
async def delete_object(self, object_key: str) -> None:
|
||||
"""Delete a OneDrive item by item_id. Silent no-op on 404."""
|
||||
await self._ensure_valid_token()
|
||||
async with httpx.AsyncClient() as client:
|
||||
r = await client.delete(
|
||||
f"{GRAPH_BASE}/me/drive/items/{object_key}",
|
||||
headers=self._auth_headers(),
|
||||
)
|
||||
if r.status_code not in (204, 404):
|
||||
r.raise_for_status()
|
||||
|
||||
async def presigned_get_url(self, object_key: str, expires_minutes: int = 60) -> str:
|
||||
"""Not supported by OneDrive — raises NotImplementedError (D-14).
|
||||
|
||||
Use get_object() for direct streaming instead.
|
||||
"""
|
||||
raise NotImplementedError(
|
||||
"OneDrive backend does not support presigned URLs — "
|
||||
"use get_object() for streaming"
|
||||
)
|
||||
|
||||
async def generate_presigned_put_url(
|
||||
self, object_key: str, expires_minutes: int = 15
|
||||
) -> str:
|
||||
"""Not supported by OneDrive — raises NotImplementedError (D-14).
|
||||
|
||||
Use put_object() for direct upload via FastAPI intermediary.
|
||||
"""
|
||||
raise NotImplementedError(
|
||||
"OneDrive backend does not support presigned put URLs — "
|
||||
"use put_object() for direct upload"
|
||||
)
|
||||
|
||||
async def stat_object(self, object_key: str) -> int:
|
||||
"""Return the file size in bytes from OneDrive item metadata."""
|
||||
await self._ensure_valid_token()
|
||||
async with httpx.AsyncClient() as client:
|
||||
r = await client.get(
|
||||
f"{GRAPH_BASE}/me/drive/items/{object_key}",
|
||||
params={"$select": "size"},
|
||||
headers=self._auth_headers(),
|
||||
)
|
||||
r.raise_for_status()
|
||||
return int(r.json().get("size", 0))
|
||||
|
||||
async def health_check(self) -> bool:
|
||||
"""Return True if the OneDrive service is reachable and the token is valid.
|
||||
|
||||
Makes a minimal GET /me/drive request selecting only the id field.
|
||||
Returns False on any error.
|
||||
"""
|
||||
try:
|
||||
await self._ensure_valid_token()
|
||||
async with httpx.AsyncClient() as client:
|
||||
r = await client.get(
|
||||
f"{GRAPH_BASE}/me/drive",
|
||||
params={"$select": "id"},
|
||||
headers=self._auth_headers(),
|
||||
)
|
||||
return r.is_success
|
||||
except Exception:
|
||||
return False
|
||||
Reference in New Issue
Block a user