bcb887e61d
- CloudConnectionError imported from google_drive_backend (shared exception type) - CHUNK_SIZE = 10 * 1024 * 1024 (10 MB — above Graph 4 MB limit, Pitfall 6) - All 7 StorageBackend methods implemented as async coroutines - Resumable upload sessions (createUploadSession) used for ALL uploads - _ensure_valid_token() checks expiry with 60s buffer, calls _refresh_token() if expired - _refresh_token() wraps msal.ConfidentialClientApplication in asyncio.to_thread() - invalid_grant → CloudConnectionError(reason='invalid_grant') per D-06 / B2 design - presigned_get_url and generate_presigned_put_url raise NotImplementedError (D-14) - delete_object silently ignores 404 (no-op per StorageBackend contract) - Backend is stateless — no DB writes (B2 design)
280 lines
11 KiB
Python
280 lines
11 KiB
Python
"""
|
|
Microsoft Graph / OneDrive StorageBackend implementation for DocuVault.
|
|
|
|
Design notes:
|
|
- Resumable upload sessions (createUploadSession) are used for ALL uploads,
|
|
regardless of file size (Pitfall 6 — Microsoft Graph's simple upload is
|
|
limited to 4 MB; resumable sessions handle both small and large files).
|
|
- CHUNK_SIZE = 10 MB (above Graph's 4 MB simple upload limit).
|
|
- All sync MSAL calls are wrapped in asyncio.to_thread(); httpx calls are
|
|
already async and awaited directly.
|
|
- CloudConnectionError is imported from google_drive_backend (shared type).
|
|
This keeps the exception hierarchy unified across all cloud backends.
|
|
- B2 design: This backend is STATELESS. It raises CloudConnectionError but
|
|
does NOT update the DB or CloudConnection objects. DB state transitions
|
|
(e.g., REQUIRES_REAUTH) are handled by _call_cloud_op() in cloud.py.
|
|
- _ensure_valid_token() checks expiry before each API call and calls
|
|
_refresh_token() if the token is within 60 seconds of expiry. If the
|
|
refresh returns None (invalid_grant), CloudConnectionError is raised.
|
|
- Token key format stored in credentials dict:
|
|
access_token — current OAuth bearer token
|
|
refresh_token — long-lived refresh token
|
|
expires_at — ISO 8601 datetime string (when the access_token expires)
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import datetime
|
|
import io
|
|
import uuid
|
|
|
|
import httpx
|
|
import msal
|
|
|
|
from config import settings
|
|
from storage.base import StorageBackend
|
|
from storage.google_drive_backend import CloudConnectionError # reuse shared exception
|
|
|
|
GRAPH_BASE = "https://graph.microsoft.com/v1.0"
|
|
CHUNK_SIZE = 10 * 1024 * 1024 # 10 MB — above Graph's 4 MB simple upload limit (Pitfall 6)
|
|
|
|
|
|
class OneDriveBackend(StorageBackend):
|
|
"""Microsoft Graph / OneDrive implementation of StorageBackend.
|
|
|
|
Uses MSAL for token management and httpx for async HTTP to Microsoft Graph.
|
|
All sync MSAL calls are wrapped in asyncio.to_thread(). The backend is
|
|
stateless — it raises CloudConnectionError but never writes to the DB.
|
|
"""
|
|
|
|
def __init__(self, credentials: dict) -> None:
|
|
"""Initialise with a decrypted credentials dict.
|
|
|
|
Expected keys:
|
|
access_token str — current OAuth bearer token
|
|
refresh_token str — long-lived refresh token
|
|
expires_at str — ISO 8601 datetime string (when access_token expires)
|
|
"""
|
|
self._credentials = credentials
|
|
|
|
# ── Internal helpers ──────────────────────────────────────────────────────
|
|
|
|
def _auth_headers(self) -> dict:
|
|
"""Return Authorization header with current access token."""
|
|
return {"Authorization": f"Bearer {self._credentials['access_token']}"}
|
|
|
|
async def _ensure_valid_token(self) -> None:
|
|
"""Check if the access token is expired (within 60 s buffer) and refresh if so.
|
|
|
|
Raises:
|
|
CloudConnectionError(reason='invalid_grant') — if the refresh token has
|
|
been revoked (MSAL returns result['error'] == 'invalid_grant').
|
|
"""
|
|
expires_at_str = self._credentials.get("expires_at", "")
|
|
if expires_at_str:
|
|
try:
|
|
expires_at = datetime.datetime.fromisoformat(expires_at_str)
|
|
# Make expires_at timezone-aware if needed for comparison
|
|
if expires_at.tzinfo is None:
|
|
expires_at = expires_at.replace(tzinfo=datetime.timezone.utc)
|
|
now = datetime.datetime.now(tz=datetime.timezone.utc)
|
|
if expires_at > now + datetime.timedelta(seconds=60):
|
|
# Token is still valid — no refresh needed
|
|
return
|
|
except ValueError:
|
|
# Unparseable expiry — attempt refresh to be safe
|
|
pass
|
|
|
|
# Token expired or expiry unparseable — attempt refresh
|
|
new_creds = await self._refresh_token()
|
|
if new_creds is None:
|
|
raise CloudConnectionError(
|
|
"OneDrive connection requires re-authentication (invalid_grant)",
|
|
reason="invalid_grant",
|
|
)
|
|
self._credentials = new_creds
|
|
|
|
async def _refresh_token(self) -> dict | None:
|
|
"""Refresh the access token via MSAL.
|
|
|
|
Wraps the sync MSAL call in asyncio.to_thread() to avoid blocking
|
|
the event loop.
|
|
|
|
Returns:
|
|
Updated credentials dict on success.
|
|
None if MSAL returns result['error'] == 'invalid_grant' (D-06).
|
|
"""
|
|
|
|
def _msal_refresh() -> dict | None:
|
|
app = msal.ConfidentialClientApplication(
|
|
client_id=settings.onedrive_client_id,
|
|
client_credential=settings.onedrive_client_secret,
|
|
authority=f"https://login.microsoftonline.com/{settings.onedrive_tenant_id}",
|
|
)
|
|
result = app.acquire_token_by_refresh_token(
|
|
self._credentials["refresh_token"],
|
|
scopes=["Files.ReadWrite", "offline_access"],
|
|
)
|
|
if result.get("error") == "invalid_grant":
|
|
return None # Signal to _ensure_valid_token to raise CloudConnectionError
|
|
if "access_token" not in result:
|
|
# Unexpected MSAL error — treat as token_expired for retry logic
|
|
return None
|
|
# Build new credentials dict with refreshed tokens
|
|
expires_in = result.get("expires_in", 3600)
|
|
expires_at = (
|
|
datetime.datetime.now(tz=datetime.timezone.utc)
|
|
+ datetime.timedelta(seconds=expires_in)
|
|
).isoformat()
|
|
return {
|
|
"access_token": result["access_token"],
|
|
"refresh_token": result.get(
|
|
"refresh_token", self._credentials["refresh_token"]
|
|
),
|
|
"expires_at": expires_at,
|
|
}
|
|
|
|
return await asyncio.to_thread(_msal_refresh)
|
|
|
|
# ── StorageBackend interface ──────────────────────────────────────────────
|
|
|
|
async def put_object(
|
|
self,
|
|
user_id: str,
|
|
document_id: str,
|
|
file_bytes: bytes,
|
|
extension: str,
|
|
content_type: str,
|
|
) -> str:
|
|
"""Upload bytes to OneDrive via a resumable upload session.
|
|
|
|
Uses createUploadSession for ALL files (Pitfall 6 — avoids 4 MB limit
|
|
of simple upload). Chunks file in CHUNK_SIZE (10 MB) slices.
|
|
|
|
Returns the OneDrive item_id as object_key.
|
|
"""
|
|
await self._ensure_valid_token()
|
|
|
|
# Path within the user's OneDrive: docuvault/{user_id}/{document_id}{extension}
|
|
remote_path = f"docuvault/{user_id}/{document_id}{extension}"
|
|
create_session_url = (
|
|
f"{GRAPH_BASE}/me/drive/root:/{remote_path}:/createUploadSession"
|
|
)
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
# Step 1: Create a resumable upload session
|
|
session_response = await client.post(
|
|
create_session_url,
|
|
headers={**self._auth_headers(), "Content-Type": "application/json"},
|
|
json={"item": {"@microsoft.graph.conflictBehavior": "replace"}},
|
|
)
|
|
session_response.raise_for_status()
|
|
upload_url = session_response.json()["uploadUrl"]
|
|
|
|
# Step 2: Upload file in CHUNK_SIZE chunks
|
|
total_size = len(file_bytes)
|
|
item_id: str = ""
|
|
offset = 0
|
|
|
|
while offset < total_size:
|
|
chunk = file_bytes[offset : offset + CHUNK_SIZE]
|
|
chunk_size = len(chunk)
|
|
end = offset + chunk_size - 1
|
|
|
|
chunk_response = await client.put(
|
|
upload_url,
|
|
content=chunk,
|
|
headers={
|
|
"Content-Length": str(chunk_size),
|
|
"Content-Range": f"bytes {offset}-{end}/{total_size}",
|
|
"Content-Type": content_type,
|
|
},
|
|
)
|
|
chunk_response.raise_for_status()
|
|
|
|
# The final chunk response contains the item metadata with id
|
|
if chunk_response.status_code in (200, 201):
|
|
item_id = chunk_response.json().get("id", "")
|
|
|
|
offset += chunk_size
|
|
|
|
return item_id
|
|
|
|
async def get_object(self, object_key: str) -> bytes:
|
|
"""Download file bytes from OneDrive by item_id.
|
|
|
|
Follows redirects — Microsoft Graph returns a redirect to the CDN URL.
|
|
"""
|
|
await self._ensure_valid_token()
|
|
async with httpx.AsyncClient() as client:
|
|
r = await client.get(
|
|
f"{GRAPH_BASE}/me/drive/items/{object_key}/content",
|
|
headers=self._auth_headers(),
|
|
follow_redirects=True,
|
|
)
|
|
r.raise_for_status()
|
|
return r.content
|
|
|
|
async def delete_object(self, object_key: str) -> None:
|
|
"""Delete a OneDrive item by item_id. Silent no-op on 404."""
|
|
await self._ensure_valid_token()
|
|
async with httpx.AsyncClient() as client:
|
|
r = await client.delete(
|
|
f"{GRAPH_BASE}/me/drive/items/{object_key}",
|
|
headers=self._auth_headers(),
|
|
)
|
|
if r.status_code not in (204, 404):
|
|
r.raise_for_status()
|
|
|
|
async def presigned_get_url(self, object_key: str, expires_minutes: int = 60) -> str:
|
|
"""Not supported by OneDrive — raises NotImplementedError (D-14).
|
|
|
|
Use get_object() for direct streaming instead.
|
|
"""
|
|
raise NotImplementedError(
|
|
"OneDrive backend does not support presigned URLs — "
|
|
"use get_object() for streaming"
|
|
)
|
|
|
|
async def generate_presigned_put_url(
|
|
self, object_key: str, expires_minutes: int = 15
|
|
) -> str:
|
|
"""Not supported by OneDrive — raises NotImplementedError (D-14).
|
|
|
|
Use put_object() for direct upload via FastAPI intermediary.
|
|
"""
|
|
raise NotImplementedError(
|
|
"OneDrive backend does not support presigned put URLs — "
|
|
"use put_object() for direct upload"
|
|
)
|
|
|
|
async def stat_object(self, object_key: str) -> int:
|
|
"""Return the file size in bytes from OneDrive item metadata."""
|
|
await self._ensure_valid_token()
|
|
async with httpx.AsyncClient() as client:
|
|
r = await client.get(
|
|
f"{GRAPH_BASE}/me/drive/items/{object_key}",
|
|
params={"$select": "size"},
|
|
headers=self._auth_headers(),
|
|
)
|
|
r.raise_for_status()
|
|
return int(r.json().get("size", 0))
|
|
|
|
async def health_check(self) -> bool:
|
|
"""Return True if the OneDrive service is reachable and the token is valid.
|
|
|
|
Makes a minimal GET /me/drive request selecting only the id field.
|
|
Returns False on any error.
|
|
"""
|
|
try:
|
|
await self._ensure_valid_token()
|
|
async with httpx.AsyncClient() as client:
|
|
r = await client.get(
|
|
f"{GRAPH_BASE}/me/drive",
|
|
params={"$select": "id"},
|
|
headers=self._auth_headers(),
|
|
)
|
|
return r.is_success
|
|
except Exception:
|
|
return False
|