Files
kite/.planning/phases/05-cloud-storage-backends/05-03-PLAN.md
T
2026-05-28 19:43:12 +02:00

401 lines
22 KiB
Markdown

---
phase: 05-cloud-storage-backends
plan: 03
type: execute
wave: 3
depends_on:
- "05-02"
files_modified:
- backend/storage/google_drive_backend.py
- backend/storage/onedrive_backend.py
autonomous: true
requirements:
- CLOUD-01
- CLOUD-05
- CLOUD-07
must_haves:
truths:
- "GoogleDriveBackend implements all 7 StorageBackend abstract methods"
- "OneDriveBackend implements all 7 StorageBackend abstract methods"
- "generate_presigned_put_url and presigned_get_url raise NotImplementedError on both cloud backends (D-14)"
- "All sync SDK calls wrapped in asyncio.to_thread() — event loop never blocked"
- "On-demand token refresh: 401/token-expiry error triggers transparent refresh; invalid_grant sets REQUIRES_REAUTH"
- "Google OAuth Flow uses access_type='offline', prompt='consent' (Pitfall 1 prevention)"
- "OneDrive uses resumable upload sessions (createUploadSession) for all files (Pitfall 6 prevention)"
artifacts:
- path: "backend/storage/google_drive_backend.py"
provides: "Google Drive v3 StorageBackend implementation"
contains: "class GoogleDriveBackend"
- path: "backend/storage/onedrive_backend.py"
provides: "Microsoft Graph / OneDrive StorageBackend implementation"
contains: "class OneDriveBackend"
key_links:
- from: "backend/storage/google_drive_backend.py"
to: "backend/storage/cloud_utils.py"
via: "decrypt_credentials used by factory caller"
pattern: "GoogleDriveBackend.__init__"
- from: "backend/storage/onedrive_backend.py"
to: "backend/storage/cloud_utils.py"
via: "decrypt_credentials used by factory caller"
pattern: "OneDriveBackend.__init__"
---
<objective>
Implement GoogleDriveBackend and OneDriveBackend — the two OAuth-based cloud StorageBackend concrete classes.
Purpose: These backends handle Google Drive v3 and Microsoft Graph file operations. Both use async-wrapped sync SDKs, on-demand token refresh, and handle the invalid_grant → REQUIRES_REAUTH transition per D-05/D-06.
Output: google_drive_backend.py and onedrive_backend.py, each implementing all 7 StorageBackend methods.
</objective>
<execution_context>
@/Users/nik/.claude/get-shit-done/workflows/execute-plan.md
@/Users/nik/.claude/get-shit-done/templates/summary.md
</execution_context>
<context>
@.planning/PROJECT.md
@.planning/ROADMAP.md
@.planning/phases/05-cloud-storage-backends/05-CONTEXT.md
@.planning/phases/05-cloud-storage-backends/05-RESEARCH.md
@.planning/phases/05-cloud-storage-backends/05-02-SUMMARY.md
</context>
<interfaces>
<!-- From backend/storage/base.py — StorageBackend ABC (all 7 methods) -->
From backend/storage/base.py:
class StorageBackend(ABC):
async def put_object(self, user_id: str, document_id: str, file_bytes: bytes, extension: str, content_type: str) -> str: ...
async def get_object(self, object_key: str) -> bytes: ...
async def delete_object(self, object_key: str) -> None: ...
async def presigned_get_url(self, object_key: str, expires_minutes: int = 60) -> str: ...
async def health_check(self) -> bool: ...
async def generate_presigned_put_url(self, object_key: str, expires_minutes: int = 15) -> str: ...
async def stat_object(self, object_key: str) -> int: ...
<!-- From RESEARCH.md Pattern 3 — Google Drive OAuth Flow -->
Google Drive credential dict keys: access_token, refresh_token, expiry (ISO string), token_uri, client_id, client_secret
google_auth_oauthlib: Flow.from_client_config, flow.authorization_url(access_type="offline", prompt="consent")
google-api-python-client: googleapiclient.discovery.build("drive", "v3", credentials=creds)
service.files().create(body={...}, media_body=MediaIoBaseUpload(buf, mimetype=content_type)).execute()
service.files().get(fileId=key, fields="id,name,size").execute()
service.files().delete(fileId=key).execute()
GoogleDrive object_key = file_id returned by files().create()
<!-- From RESEARCH.md Pattern 4 — OneDrive MSAL Flow -->
OneDrive credential dict keys: access_token, refresh_token, expires_at (ISO string)
msal.ConfidentialClientApplication(client_id, client_credential=client_secret, authority=f"https://login.microsoftonline.com/{tenant_id}")
app.acquire_token_by_refresh_token(refresh_token, scopes=["Files.ReadWrite", "offline_access"])
Microsoft Graph: POST /me/drive/root:/{path}:/createUploadSession, then PUT chunks to uploadUrl
Microsoft Graph: GET /me/drive/items/{item_id}/content — streams bytes
Microsoft Graph: DELETE /me/drive/items/{item_id}
OneDrive object_key = item_id from upload response
<!-- From RESEARCH.md Pattern 10 — On-demand token refresh -->
Custom exception: CloudConnectionError (raised when invalid_grant detected)
On 401 / token-expiry: refresh token, update credentials_enc in conn, retry once
On invalid_grant: set conn.status = "REQUIRES_REAUTH", raise CloudConnectionError
Both backends need session + conn parameters for the refresh/update path (passed by the API layer caller)
</interfaces>
<tasks>
<task type="auto" tdd="true">
<name>Task 1: Implement GoogleDriveBackend</name>
<files>backend/storage/google_drive_backend.py</files>
<read_first>
- backend/storage/base.py — exact signatures for all 7 abstract methods
- backend/storage/minio_backend.py — asyncio.to_thread() wrapping pattern, __init__ style
- .planning/phases/05-cloud-storage-backends/05-RESEARCH.md — Pattern 3, Pattern 7 (on-demand refresh), Pitfall 1, Pitfall 7
- backend/storage/cloud_utils.py — encrypt_credentials, decrypt_credentials signatures (for refresh path)
</read_first>
<behavior>
- GoogleDriveBackend.__init__(self, credentials: dict) stores credentials dict; builds google.oauth2.credentials.Credentials from it
- put_object: creates Drive file via service.files().create() wrapped in asyncio.to_thread(); returns Google Drive file_id as object_key
- get_object: downloads file bytes via service.files().get_media(fileId=key) wrapped in asyncio.to_thread(); returns bytes
- delete_object: calls service.files().delete(fileId=key) wrapped in asyncio.to_thread(); no-op if file not found (catch HttpError 404)
- presigned_get_url: raises NotImplementedError("Google Drive backend does not support presigned URLs")
- generate_presigned_put_url: raises NotImplementedError("Google Drive backend does not support presigned put URLs")
- stat_object: calls service.files().get(fileId=key, fields="size") wrapped in asyncio.to_thread(); returns int(metadata.get("size", 0))
- health_check: tries files().list(pageSize=1) wrapped in asyncio.to_thread(); returns True/False
- All sync googleapiclient calls wrapped in asyncio.to_thread() (Pitfall 7)
- On-demand token refresh: _is_token_expired(e) detects googleapiclient.errors.HttpError status 401; _refresh_google_creds(credentials) calls google.auth.transport.requests.Request() to refresh; returns updated credentials dict or None on invalid_grant
- CloudConnectionError exception class defined in this module for invalid_grant signaling
</behavior>
<action>
Create backend/storage/google_drive_backend.py with:
Module docstring explaining Google Drive v3 backend, asyncio.to_thread() requirement, and D-14 NotImplementedError rationale.
from __future__ import annotations
import asyncio, io, uuid
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from googleapiclient.http import MediaIoBaseUpload, MediaIoBaseDownload
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
from storage.base import StorageBackend
class CloudConnectionError(Exception): pass
class GoogleDriveBackend(StorageBackend):
SCOPES = ["https://www.googleapis.com/auth/drive.file"]
def __init__(self, credentials: dict) -> None:
self._creds_dict = credentials
self._creds = self._dict_to_google_creds(credentials)
def _dict_to_google_creds(self, d: dict) -> Credentials:
# Build google.oauth2.credentials.Credentials from stored dict
# d keys: access_token, refresh_token, expiry (ISO str), token_uri, client_id, client_secret
import datetime
creds = Credentials(
token=d["access_token"],
refresh_token=d.get("refresh_token"),
token_uri=d.get("token_uri", "https://oauth2.googleapis.com/token"),
client_id=d.get("client_id"),
client_secret=d.get("client_secret"),
)
if d.get("expiry"):
creds.expiry = datetime.datetime.fromisoformat(d["expiry"])
return creds
def _get_service(self):
return build("drive", "v3", credentials=self._creds, cache_discovery=False)
async def put_object(self, user_id, document_id, file_bytes, extension, content_type) -> str:
# Wrap the sync file create in asyncio.to_thread
# file_metadata: name = f"{document_id}{extension}" (provider-side name)
# Returns Drive file_id as object_key (not a path — D-02: cloud object_key = provider native ID)
async def get_object(self, object_key: str) -> bytes:
# Use MediaIoBaseDownload to stream bytes into BytesIO, return bytes
async def delete_object(self, object_key: str) -> None:
# Catch HttpError 404 silently; re-raise other errors
async def presigned_get_url(self, object_key: str, expires_minutes: int = 60) -> str:
raise NotImplementedError("Google Drive backend does not support presigned URLs — use get_object() for streaming")
async def generate_presigned_put_url(self, object_key: str, expires_minutes: int = 15) -> str:
raise NotImplementedError("Google Drive backend does not support presigned put URLs — use put_object() for direct upload")
async def stat_object(self, object_key: str) -> int:
# service.files().get(fileId=object_key, fields="size").execute()
# Return int(metadata.get("size", 0))
async def health_check(self) -> bool:
# Try files().list(pageSize=1); return True/False
All concrete method bodies must be fully implemented (not just stubs).
Each sync call must be wrapped in asyncio.to_thread(lambda: ...) or asyncio.to_thread(fn, arg).
</action>
<verify>
<automated>cd /Users/nik/Documents/Progamming/document_scanner/backend && python -c "
from storage.google_drive_backend import GoogleDriveBackend, CloudConnectionError
import inspect, asyncio
# Verify all 7 methods are coroutines
for method in ['put_object','get_object','delete_object','presigned_get_url','health_check','generate_presigned_put_url','stat_object']:
assert inspect.iscoroutinefunction(getattr(GoogleDriveBackend, method)), f'{method} not async'
# Verify NotImplementedError for presigned methods
backend = GoogleDriveBackend({'access_token':'x','refresh_token':'y','token_uri':'https://oauth2.googleapis.com/token','client_id':'c','client_secret':'s'})
async def check():
try:
await backend.presigned_get_url('key')
print('FAIL: should raise NotImplementedError')
except NotImplementedError:
print('OK: presigned_get_url raises NotImplementedError')
try:
await backend.generate_presigned_put_url('key')
print('FAIL: should raise NotImplementedError')
except NotImplementedError:
print('OK: generate_presigned_put_url raises NotImplementedError')
asyncio.run(check())
print('All 7 methods are coroutines: OK')
"</automated>
</verify>
<acceptance_criteria>
- backend/storage/google_drive_backend.py exists with class GoogleDriveBackend
- All 7 methods are async (inspect.iscoroutinefunction returns True)
- presigned_get_url and generate_presigned_put_url raise NotImplementedError
- CloudConnectionError class defined and importable from this module
- Import succeeds: `from storage.google_drive_backend import GoogleDriveBackend, CloudConnectionError`
- `pytest -v --tb=short` exits 0 (no import regressions)
</acceptance_criteria>
<done>GoogleDriveBackend created with all 7 methods; NotImplementedError on presigned methods; CloudConnectionError defined; pytest passes</done>
</task>
<task type="auto" tdd="true">
<name>Task 2: Implement OneDriveBackend</name>
<files>backend/storage/onedrive_backend.py</files>
<read_first>
- backend/storage/base.py — all 7 method signatures
- backend/storage/google_drive_backend.py — pattern reference (asyncio.to_thread, CloudConnectionError)
- .planning/phases/05-cloud-storage-backends/05-RESEARCH.md — Pattern 4 (MSAL), Pitfall 6 (resumable upload), Assumption A3 (invalid_grant in result["error"])
- backend/config.py — settings.onedrive_client_id, onedrive_client_secret, onedrive_tenant_id
</read_first>
<behavior>
- OneDriveBackend.__init__(self, credentials: dict) stores credentials dict (access_token, refresh_token, expires_at)
- put_object: uses Microsoft Graph createUploadSession + chunked PUT (10 MB chunks) for ALL files (Pitfall 6 — no 4 MB limit); returns OneDrive item_id as object_key
- get_object: GET https://graph.microsoft.com/v1.0/me/drive/items/{item_id}/content via httpx.get with Authorization bearer; returns bytes
- delete_object: DELETE https://graph.microsoft.com/v1.0/me/drive/items/{item_id}; catch 404 silently
- presigned_get_url: raises NotImplementedError
- generate_presigned_put_url: raises NotImplementedError
- stat_object: GET /me/drive/items/{item_id}?$select=size; return int(response["size"])
- health_check: GET /me/drive?$select=id; return True/False
- _refresh_token(credentials: dict) -> dict | None: calls msal.ConfidentialClientApplication.acquire_token_by_refresh_token(); returns new credentials dict or None if result.get("error") == "invalid_grant"
- All sync msal calls wrapped in asyncio.to_thread(); httpx calls are already async (use await httpx.AsyncClient)
- CHUNK_SIZE = 10 * 1024 * 1024 (10 MB, above Graph's 4 MB limit)
</behavior>
<action>
Create backend/storage/onedrive_backend.py with:
Module docstring explaining OneDrive/Microsoft Graph backend, resumable upload requirement (Pitfall 6), and asyncio.to_thread pattern.
from __future__ import annotations
import asyncio, io, uuid, datetime
import httpx
import msal
from config import settings
from storage.base import StorageBackend
from storage.google_drive_backend import CloudConnectionError # reuse same exception
GRAPH_BASE = "https://graph.microsoft.com/v1.0"
CHUNK_SIZE = 10 * 1024 * 1024 # 10 MB — above Graph's 4 MB simple upload limit
class OneDriveBackend(StorageBackend):
def __init__(self, credentials: dict) -> None:
self._credentials = credentials # {"access_token": ..., "refresh_token": ..., "expires_at": ...}
def _auth_headers(self) -> dict:
return {"Authorization": f"Bearer {self._credentials['access_token']}"}
async def _ensure_valid_token(self) -> None:
# Check if access_token is expired (expires_at < now + 60s buffer)
# If expired, call _refresh_token(); update self._credentials
# If refresh returns None → raise CloudConnectionError("OneDrive connection requires re-authentication")
async def _refresh_token(self) -> dict | None:
# Wrap msal call in asyncio.to_thread
# Create ConfidentialClientApplication with settings.onedrive_client_id, onedrive_client_secret, authority
# Call acquire_token_by_refresh_token(self._credentials["refresh_token"], scopes=["Files.ReadWrite","offline_access"])
# Return updated dict or None if result.get("error") == "invalid_grant"
async def put_object(self, user_id, document_id, file_bytes, extension, content_type) -> str:
# 1. Ensure valid token
# 2. POST {GRAPH_BASE}/me/drive/root:/{user_id}/{document_id}{extension}:/createUploadSession
# 3. PUT file_bytes to uploadUrl in CHUNK_SIZE chunks
# 4. Return item_id from final upload response
async def get_object(self, object_key: str) -> bytes:
await self._ensure_valid_token()
async with httpx.AsyncClient() as client:
r = await client.get(f"{GRAPH_BASE}/me/drive/items/{object_key}/content",
headers=self._auth_headers(), follow_redirects=True)
r.raise_for_status()
return r.content
async def delete_object(self, object_key: str) -> None:
await self._ensure_valid_token()
async with httpx.AsyncClient() as client:
r = await client.delete(f"{GRAPH_BASE}/me/drive/items/{object_key}",
headers=self._auth_headers())
if r.status_code not in (204, 404):
r.raise_for_status()
async def presigned_get_url(self, object_key: str, expires_minutes: int = 60) -> str:
raise NotImplementedError("OneDrive backend does not support presigned URLs — use get_object() for streaming")
async def generate_presigned_put_url(self, object_key: str, expires_minutes: int = 15) -> str:
raise NotImplementedError("OneDrive backend does not support presigned put URLs — use put_object() for direct upload")
async def stat_object(self, object_key: str) -> int:
await self._ensure_valid_token()
async with httpx.AsyncClient() as client:
r = await client.get(f"{GRAPH_BASE}/me/drive/items/{object_key}",
params={"$select": "size"}, headers=self._auth_headers())
r.raise_for_status()
return int(r.json().get("size", 0))
async def health_check(self) -> bool:
try:
await self._ensure_valid_token()
async with httpx.AsyncClient() as client:
r = await client.get(f"{GRAPH_BASE}/me/drive", params={"$select": "id"},
headers=self._auth_headers())
return r.is_success
except Exception:
return False
All methods fully implemented. _ensure_valid_token and _refresh_token handle the
invalid_grant → CloudConnectionError path per D-06.
</action>
<verify>
<automated>cd /Users/nik/Documents/Progamming/document_scanner/backend && python -c "
from storage.onedrive_backend import OneDriveBackend, CHUNK_SIZE
from storage.google_drive_backend import CloudConnectionError
import inspect
for method in ['put_object','get_object','delete_object','presigned_get_url','health_check','generate_presigned_put_url','stat_object']:
assert inspect.iscoroutinefunction(getattr(OneDriveBackend, method)), f'{method} not async'
assert CHUNK_SIZE == 10 * 1024 * 1024, f'CHUNK_SIZE should be 10MB, got {CHUNK_SIZE}'
print('All methods async: OK')
print(f'CHUNK_SIZE = {CHUNK_SIZE} bytes: OK')
import asyncio
backend = OneDriveBackend({'access_token':'x','refresh_token':'y','expires_at':'2099-01-01T00:00:00'})
async def check():
try: await backend.presigned_get_url('key')
except NotImplementedError: print('presigned_get_url NotImplementedError: OK')
try: await backend.generate_presigned_put_url('key')
except NotImplementedError: print('generate_presigned_put_url NotImplementedError: OK')
asyncio.run(check())
"</automated>
</verify>
<acceptance_criteria>
- backend/storage/onedrive_backend.py exists with class OneDriveBackend
- All 7 methods are async coroutines
- CHUNK_SIZE = 10 * 1024 * 1024 (10 MB)
- presigned_get_url and generate_presigned_put_url raise NotImplementedError
- CloudConnectionError imported from google_drive_backend (shared exception type)
- Import succeeds: `from storage.onedrive_backend import OneDriveBackend`
- `pytest -v --tb=short` exits 0
</acceptance_criteria>
<done>OneDriveBackend created with all 7 methods; resumable upload uses CHUNK_SIZE=10MB; NotImplementedError on presigned methods; pytest passes</done>
</task>
</tasks>
<threat_model>
## Trust Boundaries
| Boundary | Description |
|----------|-------------|
| GoogleDriveBackend → Google APIs | Outbound to googleapis.com using OAuth tokens from decrypted credentials |
| OneDriveBackend → Microsoft Graph | Outbound to graph.microsoft.com using MSAL-managed tokens |
| invalid_grant response → connection status | Provider error must be surfaced as REQUIRES_REAUTH, not silently swallowed |
## STRIDE Threat Register
| Threat ID | Category | Component | Disposition | Mitigation Plan |
|-----------|----------|-----------|-------------|-----------------|
| T-05-03-01 | Elevation of Privilege | GoogleDriveBackend — token in credentials dict | mitigate | Credentials dict never logged; decryption only in factory; tokens only in memory; no serialization path back to API response |
| T-05-03-02 | Spoofing | OneDriveBackend — invalid_grant detection | mitigate | result.get("error") == "invalid_grant" raises CloudConnectionError → API layer sets REQUIRES_REAUTH; per D-06, no silent failure |
| T-05-03-03 | Denial of Service | OneDriveBackend — 10MB chunked upload | accept | 10 MB chunks are within Microsoft Graph's recommended range; no larger chunks that could cause memory pressure |
| T-05-03-04 | Information Disclosure | GoogleDriveBackend — file names in Drive | accept | Drive file is named {document_id}{extension} — no human filename in provider storage (aligns with D-11 spirit) |
| T-05-03-05 | Tampering | cache_discovery=False in Google Drive build() | mitigate | Disables Google's JSON discovery cache written to /tmp; prevents directory traversal via cached discovery docs |
</threat_model>
<verification>
cd /Users/nik/Documents/Progamming/document_scanner/backend && python -m pytest tests/test_cloud.py -v && python -m pytest -v --tb=short 2>&1 | tail -10
</verification>
<success_criteria>
- GoogleDriveBackend: all 7 methods async; presigned methods raise NotImplementedError; CloudConnectionError defined
- OneDriveBackend: all 7 methods async; CHUNK_SIZE=10MB; presigned methods raise NotImplementedError; CloudConnectionError imported
- pytest -v exits 0, 0 failures; test_cloud.py still all xfailed
</success_criteria>
<output>
Create `.planning/phases/05-cloud-storage-backends/05-03-SUMMARY.md` when done
</output>