import logging from contextlib import asynccontextmanager from aiobotocore.session import get_session from .base import AbstractStorageBackend logger = logging.getLogger(__name__) class S3Backend(AbstractStorageBackend): """ S3-compatible backend. Works with AWS S3, MinIO, Backblaze B2, Cloudflare R2, etc. Set endpoint_url to the service URL for non-AWS providers; leave empty for real AWS. """ def __init__( self, endpoint_url: str, access_key: str, secret_key: str, region: str = "us-east-1", ) -> None: self._endpoint_url = endpoint_url or None self._access_key = access_key self._secret_key = secret_key self._region = region self._session = get_session() @property def driver_name(self) -> str: return "s3" @asynccontextmanager async def _client(self): async with self._session.create_client( "s3", endpoint_url=self._endpoint_url, aws_access_key_id=self._access_key, aws_secret_access_key=self._secret_key, region_name=self._region, ) as client: yield client async def _ensure_bucket(self, client, bucket: str) -> None: try: await client.head_bucket(Bucket=bucket) except Exception: try: if self._region == "us-east-1": await client.create_bucket(Bucket=bucket) else: await client.create_bucket( Bucket=bucket, CreateBucketConfiguration={"LocationConstraint": self._region}, ) except Exception as exc: logger.debug("Bucket create skipped (may already exist): %s", exc) async def put(self, bucket: str, key: str, data: bytes) -> None: async with self._client() as client: await self._ensure_bucket(client, bucket) await client.put_object(Bucket=bucket, Key=key, Body=data) async def get(self, bucket: str, key: str) -> bytes: async with self._client() as client: try: response = await client.get_object(Bucket=bucket, Key=key) return await response["Body"].read() except Exception as exc: raise KeyError(f"{bucket}/{key}") from exc async def delete(self, bucket: str, key: str) -> None: async with self._client() as client: await client.delete_object(Bucket=bucket, Key=key) async def list_keys(self, bucket: str) -> list[str]: async with self._client() as client: try: paginator = client.get_paginator("list_objects_v2") keys: list[str] = [] async for page in paginator.paginate(Bucket=bucket): for obj in page.get("Contents", []): keys.append(obj["Key"]) return keys except Exception: return [] async def exists(self, bucket: str, key: str) -> bool: async with self._client() as client: try: await client.head_object(Bucket=bucket, Key=key) return True except Exception: return False async def test_connection(self) -> None: async with self._client() as client: await client.list_buckets()