""" Cloud storage connection management API endpoints for DocuVault. Endpoints: GET /api/cloud/oauth/initiate/{provider} — start OAuth flow (Google Drive, OneDrive) GET /api/cloud/oauth/callback/{provider} — OAuth callback: exchange code, store creds POST /api/cloud/connections/webdav — connect WebDAV / Nextcloud with credentials GET /api/cloud/connections — list all active cloud connections DELETE /api/cloud/connections/{id} — disconnect and purge credentials_enc GET /api/cloud/folders/{provider}/{folder_id} — lazy-load folder tree (TTL-cached 60s) PATCH /api/users/me/default-storage — update user's default storage backend Security invariants: - ALL endpoints use Depends(get_regular_user) — admin role gets 403 (D-18, D-19) - credentials_enc column NEVER appears in any API response (CloudConnectionOut whitelist) - DELETE /connections/{id} returns 404 for wrong-owner connections (prevents ID enumeration, D-19) - OAuth state token: secrets.token_urlsafe(32), Redis key "oauth_state:{token}", TTL 1800, single-use - write_audit_log metadata: only {"provider": provider} — no credentials, no tokens (T-05-05-08) - WebDAV server_url validated via validate_cloud_url() before backend instantiation (T-05-05-05) """ from __future__ import annotations import asyncio import secrets import uuid import urllib.parse from typing import Optional from fastapi import APIRouter, Depends, HTTPException, Request, status from fastapi.responses import RedirectResponse from pydantic import BaseModel from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from api.admin import CloudConnectionOut from config import settings from db.models import CloudConnection, User from deps.auth import get_regular_user from deps.db import get_db from services.audit import write_audit_log from storage.cloud_utils import encrypt_credentials, decrypt_credentials, validate_cloud_url # ── Router definitions ──────────────────────────────────────────────────────── router = APIRouter(prefix="/api/cloud", tags=["cloud"]) users_router = APIRouter(prefix="/api/users", tags=["users"]) # ── Provider constants ──────────────────────────────────────────────────────── VALID_OAUTH_PROVIDERS = {"google_drive", "onedrive"} VALID_WEBDAV_PROVIDERS = {"nextcloud", "webdav"} _DISPLAY_NAMES = { "google_drive": "Google Drive", "onedrive": "OneDrive", "nextcloud": "Nextcloud", "webdav": "WebDAV server", } # ── Request models ──────────────────────────────────────────────────────────── class WebDAVConnectRequest(BaseModel): """Request body for POST /api/cloud/connections/webdav.""" server_url: str username: str password: str provider: str # "nextcloud" or "webdav" class DefaultStorageRequest(BaseModel): """Request body for PATCH /api/users/me/default-storage.""" backend: str # ── _call_cloud_op helper ───────────────────────────────────────────────────── async def _call_cloud_op(conn: CloudConnection, user: User, session: AsyncSession, op_fn): """Wrap a cloud operation with transparent token refresh and invalid_grant handling. Design (B2, D-05, D-06): 1. Call op_fn() — a zero-argument async callable that performs the cloud operation. 2. On CloudConnectionError(reason="token_expired"): decrypt current credentials, refresh the access token via the provider, encrypt the new credentials, update conn.credentials_enc in the DB, rebuild the backend instance, and retry op_fn() once. 3. On CloudConnectionError(reason="invalid_grant"): set conn.status="REQUIRES_REAUTH", commit, write audit log, raise HTTPException(503). 4. All other exceptions are propagated unchanged. Args: conn: The CloudConnection ORM row for the current user + provider. user: The authenticated User ORM instance. session: An active AsyncSession (caller owns commit responsibility). op_fn: An async callable with no arguments. Must capture the backend instance in its closure. After token refresh, the caller should pass a new op_fn that captures the rebuilt backend. """ from storage.google_drive_backend import CloudConnectionError # lazy import try: return await op_fn() except CloudConnectionError as exc: if exc.reason == "token_expired": # Refresh token and retry once master_key = settings.cloud_creds_key.encode() credentials = decrypt_credentials(master_key, str(user.id), conn.credentials_enc) try: new_credentials = await _refresh_oauth_token(conn.provider, credentials) except Exception: # If refresh itself fails, treat as invalid_grant conn.status = "REQUIRES_REAUTH" await session.commit() await write_audit_log( session, event_type="cloud.requires_reauth", user_id=user.id, actor_id=user.id, resource_id=conn.id, ip_address=None, metadata_={"provider": conn.provider}, ) raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Cloud connection requires re-authentication. Please reconnect in Settings.", ) from exc # Persist new credentials conn.credentials_enc = encrypt_credentials(master_key, str(user.id), new_credentials) await session.commit() # Rebuild backend and retry rebuilt_backend = _build_backend(conn.provider, new_credentials) async def retried_op(): return await _invoke_backend_op(rebuilt_backend, op_fn) try: return await retried_op() except CloudConnectionError as retry_exc: if retry_exc.reason == "invalid_grant": conn.status = "REQUIRES_REAUTH" await session.commit() await write_audit_log( session, event_type="cloud.requires_reauth", user_id=user.id, actor_id=user.id, resource_id=conn.id, ip_address=None, metadata_={"provider": conn.provider}, ) raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Cloud connection requires re-authentication. Please reconnect in Settings.", ) from retry_exc raise elif exc.reason == "invalid_grant": conn.status = "REQUIRES_REAUTH" await session.commit() await write_audit_log( session, event_type="cloud.requires_reauth", user_id=user.id, actor_id=user.id, resource_id=conn.id, ip_address=None, metadata_={"provider": conn.provider}, ) raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Cloud connection requires re-authentication. Please reconnect in Settings.", ) from exc raise async def _refresh_oauth_token(provider: str, credentials: dict) -> dict: """Refresh the OAuth access token for google_drive or onedrive. Returns an updated credentials dict with new access_token and expiry. Raises Exception on failure (including invalid_grant from the provider). """ if provider == "google_drive": from google.oauth2.credentials import Credentials # lazy import from google.auth.transport.requests import Request # lazy import creds = Credentials( token=credentials.get("access_token"), refresh_token=credentials.get("refresh_token"), token_uri=credentials.get("token_uri", "https://oauth2.googleapis.com/token"), client_id=credentials.get("client_id"), client_secret=credentials.get("client_secret"), ) await asyncio.to_thread(creds.refresh, Request()) new_creds = dict(credentials) new_creds["access_token"] = creds.token if creds.expiry: new_creds["expiry"] = creds.expiry.isoformat() return new_creds elif provider == "onedrive": import msal # lazy import app = msal.ConfidentialClientApplication( settings.onedrive_client_id, client_credential=settings.onedrive_client_secret, authority=f"https://login.microsoftonline.com/{settings.onedrive_tenant_id}", ) result = await asyncio.to_thread( app.acquire_token_by_refresh_token, credentials.get("refresh_token", ""), scopes=["Files.ReadWrite", "offline_access"], ) if "error" in result: raise Exception(f"Token refresh failed: {result.get('error_description', result['error'])}") new_creds = dict(credentials) new_creds["access_token"] = result["access_token"] if "refresh_token" in result: new_creds["refresh_token"] = result["refresh_token"] return new_creds raise ValueError(f"Cannot refresh token for provider: {provider}") def _build_backend(provider: str, credentials: dict): """Build a cloud backend instance from provider name and credentials dict.""" if provider == "google_drive": from storage.google_drive_backend import GoogleDriveBackend # lazy import return GoogleDriveBackend(credentials) elif provider == "onedrive": from storage.onedrive_backend import OneDriveBackend # lazy import return OneDriveBackend(credentials) elif provider == "nextcloud": from storage.nextcloud_backend import NextcloudBackend # lazy import return NextcloudBackend( credentials["server_url"], credentials["username"], credentials["password"], ) elif provider == "webdav": from storage.webdav_backend import WebDAVBackend # lazy import return WebDAVBackend( credentials["server_url"], credentials["username"], credentials["password"], ) raise ValueError(f"Unknown provider: {provider}") async def _invoke_backend_op(backend, op_fn): """Call op_fn, but the backend in the closure needs to be the rebuilt one. Since op_fn is already a closure, we call it directly. This helper exists so the retry path in _call_cloud_op can call op_fn with minimal friction. """ return await op_fn() # ── Helper: upsert CloudConnection ─────────────────────────────────────────── async def _upsert_cloud_connection( session: AsyncSession, user_id: uuid.UUID, provider: str, credentials_enc: str, ) -> CloudConnection: """Insert or update a CloudConnection row for the given user + provider. If an existing row is found, updates credentials_enc and sets status=ACTIVE. If no row exists, inserts a new one. Args: session: Active async SQLAlchemy session. user_id: The authenticated user's UUID. provider: The cloud provider string (e.g. "google_drive"). credentials_enc: The HKDF+Fernet-encrypted credentials string. Returns: The CloudConnection ORM instance (added to session, not yet committed). """ result = await session.execute( select(CloudConnection).where( CloudConnection.user_id == user_id, CloudConnection.provider == provider, ) ) conn = result.scalar_one_or_none() if conn is not None: conn.credentials_enc = credentials_enc conn.status = "ACTIVE" else: conn = CloudConnection( id=uuid.uuid4(), user_id=user_id, provider=provider, display_name=_DISPLAY_NAMES.get(provider, provider), credentials_enc=credentials_enc, status="ACTIVE", ) session.add(conn) return conn # ── GET /api/cloud/oauth/initiate/{provider} ────────────────────────────────── @router.get("/oauth/initiate/{provider}") async def oauth_initiate( provider: str, request: Request, current_user: User = Depends(get_regular_user), ) -> dict: """Start the OAuth flow for Google Drive or OneDrive. Generates a CSRF state token, stores it in Redis with TTL 1800 (30 min), and returns the provider's authorization URL as JSON so the frontend can navigate using fetch() with the Bearer header (plan 05-10 fix). Returns: {"url": ""} Security: - state token is secrets.token_urlsafe(32) — 256 bits of entropy (T-05-05-01) - Redis key is single-use: deleted in the callback handler (T-05-05-02) - Only google_drive and onedrive are accepted (T-05-05-06) - Endpoint requires get_regular_user — no unauthenticated access (T-05-10-01) """ from fastapi.responses import JSONResponse # already available via fastapi if provider not in VALID_OAUTH_PROVIDERS: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Unsupported OAuth provider: {provider}. Valid providers: {sorted(VALID_OAUTH_PROVIDERS)}", ) # Pre-flight: validate credentials are configured before allocating Redis state if provider == "google_drive" and (not settings.google_client_id or not settings.google_client_secret): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Google Drive OAuth is not configured on this server. Set GOOGLE_CLIENT_ID and GOOGLE_CLIENT_SECRET in your environment.", ) if provider == "onedrive" and ( not settings.onedrive_client_id or not settings.onedrive_client_secret or not settings.onedrive_tenant_id ): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="OneDrive OAuth is not configured on this server. Set ONEDRIVE_CLIENT_ID, ONEDRIVE_CLIENT_SECRET, and ONEDRIVE_TENANT_ID in your environment.", ) state_token = secrets.token_urlsafe(32) redis_client = request.app.state.redis await redis_client.setex(f"oauth_state:{state_token}", 1800, str(current_user.id)) redirect_uri = f"{settings.backend_url}/api/cloud/oauth/callback/{provider}" if provider == "google_drive": from google_auth_oauthlib.flow import Flow # lazy import flow = Flow.from_client_config( { "web": { "client_id": settings.google_client_id, "client_secret": settings.google_client_secret, "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", } }, scopes=["https://www.googleapis.com/auth/drive.file"], redirect_uri=redirect_uri, ) authorization_url, _ = flow.authorization_url( access_type="offline", prompt="consent", state=state_token, ) return JSONResponse({"url": authorization_url}) elif provider == "onedrive": import msal # lazy import app = msal.ConfidentialClientApplication( settings.onedrive_client_id, client_credential=settings.onedrive_client_secret, authority=f"https://login.microsoftonline.com/{settings.onedrive_tenant_id}", ) auth_url = await asyncio.to_thread( app.get_authorization_request_url, scopes=["Files.ReadWrite", "offline_access"], redirect_uri=redirect_uri, state=state_token, ) return JSONResponse({"url": auth_url}) # ── GET /api/cloud/oauth/callback/{provider} ────────────────────────────────── @router.get("/oauth/callback/{provider}", response_class=RedirectResponse) async def oauth_callback( provider: str, request: Request, session: AsyncSession = Depends(get_db), ): """Handle the OAuth callback from Google Drive or OneDrive. Validates state token (single-use, from Redis), exchanges the authorization code for tokens, encrypts credentials, and upserts a CloudConnection row. On success: redirects to {settings.frontend_url}/settings?cloud_connected={provider} On any error: redirects to {settings.frontend_url}/settings?cloud_error={message} Security: - state token is looked up in Redis (T-05-05-01), deleted on first use (T-05-05-02) - Missing state → 400 redirect (not silent failure) - credentials_enc stored, never returned in response """ state = request.query_params.get("state") code = request.query_params.get("code") error_param = request.query_params.get("error") frontend_settings = f"{settings.frontend_url}/settings" try: if error_param: raise ValueError(f"OAuth provider returned error: {error_param}") if provider not in VALID_OAUTH_PROVIDERS: raise ValueError(f"Unsupported OAuth provider: {provider}") if not state: raise ValueError("Missing OAuth state parameter") redis_client = request.app.state.redis stored_user_id = await redis_client.get(f"oauth_state:{state}") if not stored_user_id: # state is missing or expired — single-use validation failed return RedirectResponse( url=f"{frontend_settings}?cloud_error={urllib.parse.quote('Invalid or expired OAuth state. Please try connecting again.')}", status_code=302, ) # Delete the Redis key immediately — single-use (T-05-05-02) await redis_client.delete(f"oauth_state:{state}") # Parse the user_id from the stored value if isinstance(stored_user_id, bytes): stored_user_id = stored_user_id.decode("utf-8") user_id = uuid.UUID(stored_user_id) # Load user from DB (the OAuth callback is not authenticated via JWT — # the state token binds this callback to the correct user session) user = await session.get(User, user_id) if user is None or not user.is_active: raise ValueError("User not found or inactive") redirect_uri = f"{settings.backend_url}/api/cloud/oauth/callback/{provider}" master_key = settings.cloud_creds_key.encode() if provider == "google_drive": from google_auth_oauthlib.flow import Flow # lazy import flow = Flow.from_client_config( { "web": { "client_id": settings.google_client_id, "client_secret": settings.google_client_secret, "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", } }, scopes=["https://www.googleapis.com/auth/drive.file"], redirect_uri=redirect_uri, ) await asyncio.to_thread(flow.fetch_token, code=code) token_credentials = flow.credentials credentials = { "access_token": token_credentials.token, "refresh_token": token_credentials.refresh_token, "token_uri": token_credentials.token_uri, "client_id": token_credentials.client_id, "client_secret": token_credentials.client_secret, "expiry": token_credentials.expiry.isoformat() if token_credentials.expiry else None, } elif provider == "onedrive": import msal # lazy import app = msal.ConfidentialClientApplication( settings.onedrive_client_id, client_credential=settings.onedrive_client_secret, authority=f"https://login.microsoftonline.com/{settings.onedrive_tenant_id}", ) result = await asyncio.to_thread( app.acquire_token_by_authorization_code, code, scopes=["Files.ReadWrite", "offline_access"], redirect_uri=redirect_uri, ) if "error" in result: raise ValueError(f"Token exchange failed: {result.get('error_description', result['error'])}") credentials = { "access_token": result["access_token"], "refresh_token": result.get("refresh_token", ""), "token_uri": f"https://login.microsoftonline.com/{settings.onedrive_tenant_id}/oauth2/v2.0/token", "client_id": settings.onedrive_client_id, "client_secret": settings.onedrive_client_secret, } else: raise ValueError(f"Unsupported OAuth provider: {provider}") credentials_enc = encrypt_credentials(master_key, str(user_id), credentials) conn = await _upsert_cloud_connection(session, user_id, provider, credentials_enc) await session.flush() await write_audit_log( session, event_type="cloud.connected", user_id=user.id, actor_id=user.id, resource_id=conn.id, ip_address=None, metadata_={"provider": provider}, ) await session.commit() return RedirectResponse( url=f"{frontend_settings}?cloud_connected={provider}", status_code=302, ) except Exception as exc: error_msg = str(exc) return RedirectResponse( url=f"{frontend_settings}?cloud_error={urllib.parse.quote(error_msg)}", status_code=302, ) # ── POST /api/cloud/connections/webdav ──────────────────────────────────────── @router.post("/connections/webdav", status_code=status.HTTP_201_CREATED) async def connect_webdav( body: WebDAVConnectRequest, request: Request, session: AsyncSession = Depends(get_db), current_user: User = Depends(get_regular_user), ) -> dict: """Connect a WebDAV or Nextcloud server. Validates the URL for SSRF, tests the connection with health_check(), encrypts credentials, and upserts a CloudConnection row. Security: - validate_cloud_url() blocks RFC-1918, loopback, and link-local (T-05-05-05) - health_check() requires a successful PROPFIND before storing credentials - credentials_enc never returned in response (CloudConnectionOut whitelist) """ if body.provider not in VALID_WEBDAV_PROVIDERS: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Unsupported WebDAV provider: {body.provider}. Valid values: {sorted(VALID_WEBDAV_PROVIDERS)}", ) # SSRF validation (T-05-05-05) try: validate_cloud_url(body.server_url) except ValueError as exc: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Invalid server URL: {exc}", ) from exc # Instantiate backend and run health_check try: if body.provider == "nextcloud": from storage.nextcloud_backend import NextcloudBackend # lazy import backend = NextcloudBackend(body.server_url, body.username, body.password) else: from storage.webdav_backend import WebDAVBackend # lazy import backend = WebDAVBackend(body.server_url, body.username, body.password) except ValueError as exc: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Invalid server URL: {exc}", ) from exc try: ok = await backend.health_check() if not ok: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Connection test failed — check server URL and credentials", ) except HTTPException: raise except Exception as exc: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Connection test failed — check server URL and credentials: {exc}", ) from exc credentials = { "server_url": body.server_url, "username": body.username, "password": body.password, } master_key = settings.cloud_creds_key.encode() credentials_enc = encrypt_credentials(master_key, str(current_user.id), credentials) conn = await _upsert_cloud_connection(session, current_user.id, body.provider, credentials_enc) await session.flush() _ip = request.headers.get("X-Forwarded-For") or (request.client.host if request.client else None) await write_audit_log( session, event_type="cloud.connected", user_id=current_user.id, actor_id=current_user.id, resource_id=conn.id, ip_address=_ip, metadata_={"provider": body.provider}, ) await session.commit() await session.refresh(conn) return CloudConnectionOut.model_validate(conn).model_dump() # ── GET /api/cloud/connections ──────────────────────────────────────────────── @router.get("/connections") async def list_connections( session: AsyncSession = Depends(get_db), current_user: User = Depends(get_regular_user), ) -> dict: """List all cloud connections for the current user. Security: - Only connections owned by current_user.id are returned - credentials_enc excluded by CloudConnectionOut whitelist (T-05-05-03) """ result = await session.execute( select(CloudConnection).where(CloudConnection.user_id == current_user.id) ) connections = result.scalars().all() items = [] master_key = settings.cloud_creds_key.encode() for conn in connections: d = CloudConnectionOut.model_validate(conn).model_dump() if conn.provider in ("nextcloud", "webdav"): try: creds = decrypt_credentials(master_key, str(conn.user_id), conn.credentials_enc) d["server_url"] = creds.get("server_url") d["connection_username"] = creds.get("username") except Exception: pass items.append(d) return {"items": items} # ── GET /api/cloud/connections/{connection_id}/config ──────────────────────── @router.get("/connections/{connection_id}/config") async def get_connection_config( connection_id: uuid.UUID, session: AsyncSession = Depends(get_db), current_user: User = Depends(get_regular_user), ) -> dict: """Return non-secret configuration fields for a WebDAV/Nextcloud connection. Returns server_url and connection_username (not password) so the frontend can pre-populate the Edit modal without exposing credentials. Only applicable to WebDAV / Nextcloud connections (not OAuth providers). Returns 404 for wrong-owner or unknown connections (prevents ID enumeration). Returns 400 for OAuth providers (no non-secret config to return). Security: - Only connection owned by current_user.id is returned (T-05-05-04) - password is never included in the response (D-18) - Returns 404 for wrong-owner connections (prevents ID enumeration) """ conn = await session.get(CloudConnection, connection_id) if conn is None or conn.user_id != current_user.id: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Connection not found") if conn.provider not in VALID_WEBDAV_PROVIDERS: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Connection config is only available for WebDAV/Nextcloud connections", ) master_key = settings.cloud_creds_key.encode() try: credentials = decrypt_credentials(master_key, str(current_user.id), conn.credentials_enc) except Exception: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Failed to decrypt connection credentials", ) # Return non-secret fields only — never expose the password return { "id": str(conn.id), "provider": conn.provider, "server_url": credentials.get("server_url", ""), "connection_username": credentials.get("username", ""), } # ── DELETE /api/cloud/connections/{connection_id} ───────────────────────────── @router.delete("/connections/{connection_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_connection( connection_id: uuid.UUID, request: Request, session: AsyncSession = Depends(get_db), current_user: User = Depends(get_regular_user), ) -> None: """Disconnect a cloud connection and permanently purge credentials_enc. Returns 404 (not 403) for connections owned by other users to prevent connection ID enumeration (T-05-05-04, D-19). On success: connection row is deleted, audit log written, cache invalidated. """ conn = await session.get(CloudConnection, connection_id) # Return 404 for any access failure — prevents ID enumeration (T-05-05-04) if conn is None or conn.user_id != current_user.id: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Connection not found") provider = conn.provider # Invalidate folder listing cache for this provider from services.cloud_cache import invalidate_provider_cache # lazy import invalidate_provider_cache(str(current_user.id), provider) _ip = request.headers.get("X-Forwarded-For") or (request.client.host if request.client else None) await write_audit_log( session, event_type="cloud.disconnected", user_id=current_user.id, actor_id=current_user.id, resource_id=conn.id, ip_address=_ip, metadata_={"provider": provider}, ) await session.delete(conn) await session.commit() # ── GET /api/cloud/folders/{provider}/{folder_id} ───────────────────────────── @router.get("/folders/{provider}/{folder_id:path}") async def list_cloud_folders( provider: str, folder_id: str, session: AsyncSession = Depends(get_db), current_user: User = Depends(get_regular_user), ) -> dict: """List folder contents for a cloud provider (TTL-cached 60 seconds, D-16). Loads the active CloudConnection for current_user + provider, decrypts credentials, builds the appropriate backend, and calls list_folder via the TTL cache. Returns 404 if no active connection found (prevents enumeration). """ all_providers = VALID_OAUTH_PROVIDERS | VALID_WEBDAV_PROVIDERS if provider not in all_providers: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Unsupported provider: {provider}", ) result = await session.execute( select(CloudConnection).where( CloudConnection.user_id == current_user.id, CloudConnection.provider == provider, CloudConnection.status == "ACTIVE", ) ) conn = result.scalar_one_or_none() if conn is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="No active cloud connection found for this provider", ) master_key = settings.cloud_creds_key.encode() credentials = decrypt_credentials(master_key, str(current_user.id), conn.credentials_enc) from services.cloud_cache import get_cloud_folders_cached # lazy import if provider == "google_drive": from storage.google_drive_backend import GoogleDriveBackend # lazy import backend = GoogleDriveBackend(credentials) async def fetch_google_drive() -> list: """Fetch Google Drive folder listing via Drive v3 files.list.""" from googleapiclient.discovery import build # lazy import from google.oauth2.credentials import Credentials # lazy import import datetime as _dt expiry_str = credentials.get("expiry") expiry = None if expiry_str: try: expiry = _dt.datetime.fromisoformat(expiry_str) except ValueError: pass creds = Credentials( token=credentials.get("access_token"), refresh_token=credentials.get("refresh_token"), token_uri=credentials.get("token_uri", "https://oauth2.googleapis.com/token"), client_id=credentials.get("client_id"), client_secret=credentials.get("client_secret"), expiry=expiry, ) def _list_files(): service = build("drive", "v3", credentials=creds, cache_discovery=False) query = f"'{folder_id}' in parents and trashed=false" response = service.files().list( q=query, fields="files(id,name,mimeType,size)", pageSize=200, ).execute() files = response.get("files", []) result = [] for f in files: is_dir = f.get("mimeType") == "application/vnd.google-apps.folder" result.append({ "id": f["id"], "name": f["name"], "is_dir": is_dir, "size": int(f.get("size", 0)) if not is_dir else 0, }) return result return await asyncio.to_thread(_list_files) items = await get_cloud_folders_cached( str(current_user.id), provider, folder_id, fetch_google_drive ) elif provider == "onedrive": async def fetch_onedrive() -> list: """Fetch OneDrive folder listing via Microsoft Graph.""" import httpx # lazy import access_token = credentials.get("access_token", "") if folder_id in ("root", ""): url = "https://graph.microsoft.com/v1.0/me/drive/root/children" else: url = f"https://graph.microsoft.com/v1.0/me/drive/items/{folder_id}/children" async with httpx.AsyncClient() as client: resp = await client.get( url, headers={"Authorization": f"Bearer {access_token}"}, timeout=30, ) resp.raise_for_status() data = resp.json() result = [] for item in data.get("value", []): is_dir = "folder" in item result.append({ "id": item["id"], "name": item["name"], "is_dir": is_dir, "size": item.get("size", 0) if not is_dir else 0, }) return result items = await get_cloud_folders_cached( str(current_user.id), provider, folder_id, fetch_onedrive ) elif provider in ("nextcloud", "webdav"): backend = _build_backend(provider, credentials) # "root" is a frontend sentinel meaning the WebDAV root; translate to "" webdav_path = "" if folder_id == "root" else folder_id async def fetch_webdav() -> list: return await backend.list_folder(webdav_path) items = await get_cloud_folders_cached( str(current_user.id), provider, folder_id, fetch_webdav ) else: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Unsupported provider: {provider}", ) return {"items": items} # ── PATCH /api/users/me/default-storage ────────────────────────────────────── @users_router.patch("/me/default-storage") async def update_default_storage( body: DefaultStorageRequest, session: AsyncSession = Depends(get_db), current_user: User = Depends(get_regular_user), ) -> dict: """Update the current user's default storage backend. The backend value is stored as-is (validated by the frontend dropdown). Returns the updated default_storage_backend value. """ user = await session.get(User, current_user.id) if user is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found") user.default_storage_backend = body.backend session.add(user) await session.commit() return {"default_storage_backend": user.default_storage_backend}