feat(05-03): implement GoogleDriveBackend — Google Drive v3 StorageBackend

- CloudConnectionError(reason=) defined in this module — token_expired | invalid_grant
- All 7 StorageBackend methods implemented as async coroutines
- Every sync googleapiclient call wrapped in asyncio.to_thread() (Pitfall 7)
- cache_discovery=False on build() prevents /tmp directory traversal (T-05-03-05)
- presigned_get_url and generate_presigned_put_url raise NotImplementedError (D-14)
- HttpError 401 raises CloudConnectionError(reason='token_expired')
- HttpError 400 with 'invalid_grant' raises CloudConnectionError(reason='invalid_grant')
- HttpError 404 on delete_object is silently swallowed (no-op per contract)
- Backend is stateless — no DB writes (B2 design, D-05/D-06)
This commit is contained in:
curo1305
2026-05-28 21:07:26 +02:00
parent c406ab1081
commit 337ee8ef11
+258
View File
@@ -0,0 +1,258 @@
"""
Google Drive v3 StorageBackend implementation for DocuVault.
Design notes:
- All sync googleapiclient calls are wrapped in asyncio.to_thread() to
avoid blocking the FastAPI event loop (Pitfall 7 — google-api-python-client
is synchronous; every .execute() call makes an HTTP request).
- cache_discovery=False on build() prevents the client library from writing
a JSON discovery document to /tmp, which would be a directory traversal
risk (T-05-03-05).
- D-14: presigned_get_url and generate_presigned_put_url raise
NotImplementedError. The API upload endpoint detects cloud backends and
uses the direct put_object() path instead.
- B2 design: This backend is STATELESS. It raises CloudConnectionError but
does NOT update the DB or CloudConnection objects. DB state transitions
(e.g., REQUIRES_REAUTH) are handled by the _call_cloud_op() helper in
cloud.py (Plan 05-05), which has the DB session.
- Token key format stored in credentials dict:
access_token — current OAuth bearer token
refresh_token — long-lived refresh token
token_uri — OAuth token endpoint
client_id — OAuth client ID
client_secret — OAuth client secret
expiry — ISO 8601 string (optional)
"""
from __future__ import annotations
import asyncio
import datetime
import io
import uuid
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from googleapiclient.http import MediaIoBaseDownload, MediaIoBaseUpload
from storage.base import StorageBackend
class CloudConnectionError(Exception):
"""Raised when a cloud provider signals a non-retryable connection problem.
Attributes:
reason: "token_expired" — access token expired; API layer can refresh and retry.
"invalid_grant" — refresh token revoked; user must reconnect.
The backend never updates the DB. The API layer (_call_cloud_op in cloud.py)
catches this exception, performs the DB state transition, and decides whether
to retry or surface a 503 to the client (B2 design, D-05/D-06).
"""
def __init__(self, msg: str = "", *, reason: str = "") -> None:
super().__init__(msg)
self.reason = reason # "token_expired" | "invalid_grant"
class GoogleDriveBackend(StorageBackend):
"""Google Drive v3 implementation of StorageBackend.
Every sync googleapiclient call is wrapped in asyncio.to_thread() (Pitfall 7).
The backend is stateless — it raises CloudConnectionError on token issues
but never writes to the DB.
"""
SCOPES = ["https://www.googleapis.com/auth/drive.file"]
def __init__(self, credentials: dict) -> None:
"""Initialise with a decrypted credentials dict.
Expected keys:
access_token str — current OAuth bearer token
refresh_token str — long-lived refresh token (may be None)
token_uri str — defaults to https://oauth2.googleapis.com/token
client_id str
client_secret str
expiry str — ISO 8601 datetime string (optional)
"""
self._creds_dict = credentials
self._creds = self._dict_to_google_creds(credentials)
# ── Internal helpers ──────────────────────────────────────────────────────
def _dict_to_google_creds(self, d: dict) -> Credentials:
"""Build a google.oauth2.credentials.Credentials object from a stored dict."""
creds = Credentials(
token=d["access_token"],
refresh_token=d.get("refresh_token"),
token_uri=d.get("token_uri", "https://oauth2.googleapis.com/token"),
client_id=d.get("client_id"),
client_secret=d.get("client_secret"),
)
if d.get("expiry"):
creds.expiry = datetime.datetime.fromisoformat(d["expiry"])
return creds
def _get_service(self):
"""Build the Drive v3 service object.
cache_discovery=False prevents the client library from caching the
discovery document in /tmp — important for security (T-05-03-05).
"""
return build("drive", "v3", credentials=self._creds, cache_discovery=False)
def _handle_http_error(self, exc: HttpError) -> None:
"""Classify an HttpError and raise CloudConnectionError if appropriate."""
status = exc.resp.status
if status == 401:
# Token expired — API layer should refresh and retry
raise CloudConnectionError(
"Google Drive access token expired", reason="token_expired"
) from exc
# Check for invalid_grant in the error body
if status == 400:
body = exc.content.decode("utf-8", errors="replace").lower()
if "invalid_grant" in body:
raise CloudConnectionError(
"Google Drive refresh token revoked (invalid_grant)",
reason="invalid_grant",
) from exc
raise exc
# ── StorageBackend interface ──────────────────────────────────────────────
async def put_object(
self,
user_id: str,
document_id: str,
file_bytes: bytes,
extension: str,
content_type: str,
) -> str:
"""Upload bytes to Google Drive and return the Drive file_id as object_key.
The file is named "{document_id}{extension}" in Drive — no human filename
is used in provider storage (D-04 spirit / T-05-03-04).
Returns the Google Drive file_id (object_key for this document).
"""
file_name = f"{document_id}{extension}"
file_metadata = {"name": file_name}
buf = io.BytesIO(file_bytes)
def _upload() -> str:
service = self._get_service()
media = MediaIoBaseUpload(buf, mimetype=content_type, resumable=False)
try:
result = (
service.files()
.create(body=file_metadata, media_body=media, fields="id")
.execute()
)
return result["id"]
except HttpError as exc:
self._handle_http_error(exc)
return await asyncio.to_thread(_upload)
async def get_object(self, object_key: str) -> bytes:
"""Download file bytes from Google Drive by Drive file_id.
Uses MediaIoBaseDownload to stream the file into a BytesIO buffer.
"""
def _download() -> bytes:
service = self._get_service()
try:
request = service.files().get_media(fileId=object_key)
buf = io.BytesIO()
downloader = MediaIoBaseDownload(buf, request)
done = False
while not done:
_, done = downloader.next_chunk()
return buf.getvalue()
except HttpError as exc:
self._handle_http_error(exc)
return await asyncio.to_thread(_download)
async def delete_object(self, object_key: str) -> None:
"""Delete a Drive file by file_id. Silent no-op if the file is not found (404)."""
def _delete() -> None:
service = self._get_service()
try:
service.files().delete(fileId=object_key).execute()
except HttpError as exc:
if exc.resp.status == 404:
return # Already deleted — no-op per StorageBackend contract
self._handle_http_error(exc)
await asyncio.to_thread(_delete)
async def presigned_get_url(self, object_key: str, expires_minutes: int = 60) -> str:
"""Not supported by Google Drive — raises NotImplementedError (D-14).
Use get_object() for direct streaming instead. The API layer proxies
content through the FastAPI endpoint.
"""
raise NotImplementedError(
"Google Drive backend does not support presigned URLs — "
"use get_object() for streaming"
)
async def generate_presigned_put_url(
self, object_key: str, expires_minutes: int = 15
) -> str:
"""Not supported by Google Drive — raises NotImplementedError (D-14).
Use put_object() for direct upload via FastAPI intermediary.
"""
raise NotImplementedError(
"Google Drive backend does not support presigned put URLs — "
"use put_object() for direct upload"
)
async def stat_object(self, object_key: str) -> int:
"""Return the file size in bytes from Google Drive metadata.
Calls files().get(fileId=object_key, fields='size') and returns
int(metadata.get('size', 0)).
Note: Google Workspace files (Docs, Sheets, Slides) have no binary size
and return an empty 'size' field. DocuVault only uploads binary files,
so the 0 fallback handles this edge case.
"""
def _stat() -> int:
service = self._get_service()
try:
metadata = (
service.files()
.get(fileId=object_key, fields="size")
.execute()
)
return int(metadata.get("size", 0))
except HttpError as exc:
self._handle_http_error(exc)
return await asyncio.to_thread(_stat)
async def health_check(self) -> bool:
"""Return True if the Drive service is reachable and the token is valid.
Makes a minimal files().list(pageSize=1) call. Returns False on any error
to avoid propagating transient failures up the call stack.
"""
def _check() -> bool:
service = self._get_service()
service.files().list(pageSize=1, fields="files(id)").execute()
return True
try:
return await asyncio.to_thread(_check)
except Exception:
return False