Files
kite/backend/storage/onedrive_backend.py
curo1305 bcb887e61d feat(05-03): implement OneDriveBackend — Microsoft Graph StorageBackend
- CloudConnectionError imported from google_drive_backend (shared exception type)
- CHUNK_SIZE = 10 * 1024 * 1024 (10 MB — above Graph 4 MB limit, Pitfall 6)
- All 7 StorageBackend methods implemented as async coroutines
- Resumable upload sessions (createUploadSession) used for ALL uploads
- _ensure_valid_token() checks expiry with 60s buffer, calls _refresh_token() if expired
- _refresh_token() wraps msal.ConfidentialClientApplication in asyncio.to_thread()
- invalid_grant → CloudConnectionError(reason='invalid_grant') per D-06 / B2 design
- presigned_get_url and generate_presigned_put_url raise NotImplementedError (D-14)
- delete_object silently ignores 404 (no-op per StorageBackend contract)
- Backend is stateless — no DB writes (B2 design)
2026-05-28 21:10:56 +02:00

280 lines
11 KiB
Python

"""
Microsoft Graph / OneDrive StorageBackend implementation for DocuVault.
Design notes:
- Resumable upload sessions (createUploadSession) are used for ALL uploads,
regardless of file size (Pitfall 6 — Microsoft Graph's simple upload is
limited to 4 MB; resumable sessions handle both small and large files).
- CHUNK_SIZE = 10 MB (above Graph's 4 MB simple upload limit).
- All sync MSAL calls are wrapped in asyncio.to_thread(); httpx calls are
already async and awaited directly.
- CloudConnectionError is imported from google_drive_backend (shared type).
This keeps the exception hierarchy unified across all cloud backends.
- B2 design: This backend is STATELESS. It raises CloudConnectionError but
does NOT update the DB or CloudConnection objects. DB state transitions
(e.g., REQUIRES_REAUTH) are handled by _call_cloud_op() in cloud.py.
- _ensure_valid_token() checks expiry before each API call and calls
_refresh_token() if the token is within 60 seconds of expiry. If the
refresh returns None (invalid_grant), CloudConnectionError is raised.
- Token key format stored in credentials dict:
access_token — current OAuth bearer token
refresh_token — long-lived refresh token
expires_at — ISO 8601 datetime string (when the access_token expires)
"""
from __future__ import annotations
import asyncio
import datetime
import io
import uuid
import httpx
import msal
from config import settings
from storage.base import StorageBackend
from storage.google_drive_backend import CloudConnectionError # reuse shared exception
GRAPH_BASE = "https://graph.microsoft.com/v1.0"
CHUNK_SIZE = 10 * 1024 * 1024 # 10 MB — above Graph's 4 MB simple upload limit (Pitfall 6)
class OneDriveBackend(StorageBackend):
"""Microsoft Graph / OneDrive implementation of StorageBackend.
Uses MSAL for token management and httpx for async HTTP to Microsoft Graph.
All sync MSAL calls are wrapped in asyncio.to_thread(). The backend is
stateless — it raises CloudConnectionError but never writes to the DB.
"""
def __init__(self, credentials: dict) -> None:
"""Initialise with a decrypted credentials dict.
Expected keys:
access_token str — current OAuth bearer token
refresh_token str — long-lived refresh token
expires_at str — ISO 8601 datetime string (when access_token expires)
"""
self._credentials = credentials
# ── Internal helpers ──────────────────────────────────────────────────────
def _auth_headers(self) -> dict:
"""Return Authorization header with current access token."""
return {"Authorization": f"Bearer {self._credentials['access_token']}"}
async def _ensure_valid_token(self) -> None:
"""Check if the access token is expired (within 60 s buffer) and refresh if so.
Raises:
CloudConnectionError(reason='invalid_grant') — if the refresh token has
been revoked (MSAL returns result['error'] == 'invalid_grant').
"""
expires_at_str = self._credentials.get("expires_at", "")
if expires_at_str:
try:
expires_at = datetime.datetime.fromisoformat(expires_at_str)
# Make expires_at timezone-aware if needed for comparison
if expires_at.tzinfo is None:
expires_at = expires_at.replace(tzinfo=datetime.timezone.utc)
now = datetime.datetime.now(tz=datetime.timezone.utc)
if expires_at > now + datetime.timedelta(seconds=60):
# Token is still valid — no refresh needed
return
except ValueError:
# Unparseable expiry — attempt refresh to be safe
pass
# Token expired or expiry unparseable — attempt refresh
new_creds = await self._refresh_token()
if new_creds is None:
raise CloudConnectionError(
"OneDrive connection requires re-authentication (invalid_grant)",
reason="invalid_grant",
)
self._credentials = new_creds
async def _refresh_token(self) -> dict | None:
"""Refresh the access token via MSAL.
Wraps the sync MSAL call in asyncio.to_thread() to avoid blocking
the event loop.
Returns:
Updated credentials dict on success.
None if MSAL returns result['error'] == 'invalid_grant' (D-06).
"""
def _msal_refresh() -> dict | None:
app = msal.ConfidentialClientApplication(
client_id=settings.onedrive_client_id,
client_credential=settings.onedrive_client_secret,
authority=f"https://login.microsoftonline.com/{settings.onedrive_tenant_id}",
)
result = app.acquire_token_by_refresh_token(
self._credentials["refresh_token"],
scopes=["Files.ReadWrite", "offline_access"],
)
if result.get("error") == "invalid_grant":
return None # Signal to _ensure_valid_token to raise CloudConnectionError
if "access_token" not in result:
# Unexpected MSAL error — treat as token_expired for retry logic
return None
# Build new credentials dict with refreshed tokens
expires_in = result.get("expires_in", 3600)
expires_at = (
datetime.datetime.now(tz=datetime.timezone.utc)
+ datetime.timedelta(seconds=expires_in)
).isoformat()
return {
"access_token": result["access_token"],
"refresh_token": result.get(
"refresh_token", self._credentials["refresh_token"]
),
"expires_at": expires_at,
}
return await asyncio.to_thread(_msal_refresh)
# ── StorageBackend interface ──────────────────────────────────────────────
async def put_object(
self,
user_id: str,
document_id: str,
file_bytes: bytes,
extension: str,
content_type: str,
) -> str:
"""Upload bytes to OneDrive via a resumable upload session.
Uses createUploadSession for ALL files (Pitfall 6 — avoids 4 MB limit
of simple upload). Chunks file in CHUNK_SIZE (10 MB) slices.
Returns the OneDrive item_id as object_key.
"""
await self._ensure_valid_token()
# Path within the user's OneDrive: docuvault/{user_id}/{document_id}{extension}
remote_path = f"docuvault/{user_id}/{document_id}{extension}"
create_session_url = (
f"{GRAPH_BASE}/me/drive/root:/{remote_path}:/createUploadSession"
)
async with httpx.AsyncClient() as client:
# Step 1: Create a resumable upload session
session_response = await client.post(
create_session_url,
headers={**self._auth_headers(), "Content-Type": "application/json"},
json={"item": {"@microsoft.graph.conflictBehavior": "replace"}},
)
session_response.raise_for_status()
upload_url = session_response.json()["uploadUrl"]
# Step 2: Upload file in CHUNK_SIZE chunks
total_size = len(file_bytes)
item_id: str = ""
offset = 0
while offset < total_size:
chunk = file_bytes[offset : offset + CHUNK_SIZE]
chunk_size = len(chunk)
end = offset + chunk_size - 1
chunk_response = await client.put(
upload_url,
content=chunk,
headers={
"Content-Length": str(chunk_size),
"Content-Range": f"bytes {offset}-{end}/{total_size}",
"Content-Type": content_type,
},
)
chunk_response.raise_for_status()
# The final chunk response contains the item metadata with id
if chunk_response.status_code in (200, 201):
item_id = chunk_response.json().get("id", "")
offset += chunk_size
return item_id
async def get_object(self, object_key: str) -> bytes:
"""Download file bytes from OneDrive by item_id.
Follows redirects — Microsoft Graph returns a redirect to the CDN URL.
"""
await self._ensure_valid_token()
async with httpx.AsyncClient() as client:
r = await client.get(
f"{GRAPH_BASE}/me/drive/items/{object_key}/content",
headers=self._auth_headers(),
follow_redirects=True,
)
r.raise_for_status()
return r.content
async def delete_object(self, object_key: str) -> None:
"""Delete a OneDrive item by item_id. Silent no-op on 404."""
await self._ensure_valid_token()
async with httpx.AsyncClient() as client:
r = await client.delete(
f"{GRAPH_BASE}/me/drive/items/{object_key}",
headers=self._auth_headers(),
)
if r.status_code not in (204, 404):
r.raise_for_status()
async def presigned_get_url(self, object_key: str, expires_minutes: int = 60) -> str:
"""Not supported by OneDrive — raises NotImplementedError (D-14).
Use get_object() for direct streaming instead.
"""
raise NotImplementedError(
"OneDrive backend does not support presigned URLs — "
"use get_object() for streaming"
)
async def generate_presigned_put_url(
self, object_key: str, expires_minutes: int = 15
) -> str:
"""Not supported by OneDrive — raises NotImplementedError (D-14).
Use put_object() for direct upload via FastAPI intermediary.
"""
raise NotImplementedError(
"OneDrive backend does not support presigned put URLs — "
"use put_object() for direct upload"
)
async def stat_object(self, object_key: str) -> int:
"""Return the file size in bytes from OneDrive item metadata."""
await self._ensure_valid_token()
async with httpx.AsyncClient() as client:
r = await client.get(
f"{GRAPH_BASE}/me/drive/items/{object_key}",
params={"$select": "size"},
headers=self._auth_headers(),
)
r.raise_for_status()
return int(r.json().get("size", 0))
async def health_check(self) -> bool:
"""Return True if the OneDrive service is reachable and the token is valid.
Makes a minimal GET /me/drive request selecting only the id field.
Returns False on any error.
"""
try:
await self._ensure_valid_token()
async with httpx.AsyncClient() as client:
r = await client.get(
f"{GRAPH_BASE}/me/drive",
params={"$select": "id"},
headers=self._auth_headers(),
)
return r.is_success
except Exception:
return False