Files
kite/backend/storage/minio_backend.py
T
curo1305 eaf86a832a feat(01-04): add StorageBackend ABC + MinIOBackend + factory
- backend/storage/base.py: StorageBackend ABC with 5 abstract methods mirroring ai/base.py
- backend/storage/minio_backend.py: MinIOBackend wrapping all sync Minio SDK calls in asyncio.to_thread(); STORE-02 key schema: {user_id}/{document_id}/{uuid4()}{ext}
- backend/storage/__init__.py: get_storage_backend() factory mirroring ai/__init__.py
- backend/tests/test_storage.py: remove xfail markers (plan 04 implements the module)
2026-05-22 09:36:24 +02:00

107 lines
3.3 KiB
Python

"""
MinIOBackend — synchronous Minio SDK wrapped in asyncio.to_thread().
Every call to the synchronous Minio SDK is offloaded to a thread pool via
asyncio.to_thread() so the FastAPI event loop is never blocked.
Object key schema (STORE-02 / D-06):
{user_id}/{document_id}/{uuid4()}{ext}
The human-readable filename is NEVER passed into this module — only the
file extension (derived by the caller from Path(original_name).suffix.lower())
reaches here.
"""
import asyncio
import io
import uuid
from datetime import timedelta
from minio import Minio
from storage.base import StorageBackend
class MinIOBackend(StorageBackend):
"""MinIO implementation of StorageBackend.
All synchronous Minio SDK calls are wrapped in asyncio.to_thread() to
avoid blocking the FastAPI event loop (RESEARCH.md Pattern 3).
"""
def __init__(
self,
endpoint: str,
access_key: str,
secret_key: str,
bucket: str,
secure: bool = False,
) -> None:
self._bucket = bucket
self._client = Minio(
endpoint=endpoint,
access_key=access_key,
secret_key=secret_key,
secure=secure, # False for Docker internal HTTP traffic between containers
)
async def put_object(
self,
user_id: str,
document_id: str,
file_bytes: bytes,
extension: str,
content_type: str,
) -> str:
"""Store bytes in MinIO and return the generated object key.
Key schema: {user_id}/{document_id}/{uuid4()}{extension}
The filename is NOT a parameter — STORE-02 compliance.
"""
object_key = f"{user_id}/{document_id}/{uuid.uuid4()}{extension}"
data = io.BytesIO(file_bytes)
data.seek(0) # belt-and-braces: BytesIO constructor leaves pointer at 0 already
await asyncio.to_thread(
self._client.put_object,
self._bucket,
object_key,
data,
length=len(file_bytes),
content_type=content_type,
)
return object_key
async def get_object(self, object_key: str) -> bytes:
"""Fetch object bytes from MinIO by key."""
def _fetch() -> bytes:
response = self._client.get_object(self._bucket, object_key)
try:
return response.read()
finally:
response.close()
response.release_conn()
return await asyncio.to_thread(_fetch)
async def delete_object(self, object_key: str) -> None:
"""Delete an object from MinIO by key."""
await asyncio.to_thread(self._client.remove_object, self._bucket, object_key)
async def presigned_get_url(
self, object_key: str, expires_minutes: int = 60
) -> str:
"""Return a time-limited pre-signed download URL."""
return await asyncio.to_thread(
self._client.presigned_get_object,
self._bucket,
object_key,
timedelta(minutes=expires_minutes),
)
async def health_check(self) -> bool:
"""Return True when the configured bucket is reachable; False on any exception."""
try:
return await asyncio.to_thread(self._client.bucket_exists, self._bucket)
except Exception:
return False