Files
kite/backend/storage/minio_backend.py
T
curo1305 b6bab5a230 feat(phase-4): Alembic migration 0004 (pdf_open_mode, GIN FTS index, audit-logs bucket) + MinIOBackend.put_object_raw()
- Add users.pdf_open_mode column via batch_alter_table (server_default='in_app')
- Create GIN expression index ix_documents_fts on documents.extracted_text via raw SQL (Alembic #1390)
- Create audit-logs MinIO bucket gated on MINIO_ENDPOINT env var
- Add MinIOBackend.put_object_raw() for caller-supplied bucket+key uploads (audit CSV export)
2026-05-25 18:30:28 +02:00

172 lines
5.7 KiB
Python

"""
MinIOBackend — synchronous Minio SDK wrapped in asyncio.to_thread().
Every call to the synchronous Minio SDK is offloaded to a thread pool via
asyncio.to_thread() so the FastAPI event loop is never blocked.
Object key schema (STORE-02 / D-06):
{user_id}/{document_id}/{uuid4()}{ext}
The human-readable filename is NEVER passed into this module — only the
file extension (derived by the caller from Path(original_name).suffix.lower())
reaches here.
"""
from __future__ import annotations
import asyncio
import io
import uuid
from datetime import timedelta
from typing import Optional
from minio import Minio
from storage.base import StorageBackend
class MinIOBackend(StorageBackend):
"""MinIO implementation of StorageBackend.
All synchronous Minio SDK calls are wrapped in asyncio.to_thread() to
avoid blocking the FastAPI event loop (RESEARCH.md Pattern 3).
"""
def __init__(
self,
endpoint: str,
access_key: str,
secret_key: str,
bucket: str,
secure: bool = False,
public_endpoint: Optional[str] = None,
) -> None:
self._bucket = bucket
self._client = Minio(
endpoint=endpoint,
access_key=access_key,
secret_key=secret_key,
secure=secure,
)
# Public client uses the browser-resolvable hostname (localhost:9000) so
# presigned URLs contain a host the browser can reach (T-03-10).
# region="us-east-1" avoids a region-discovery GET to the public endpoint,
# which is unreachable from inside the backend Docker container.
self._public_client = Minio(
endpoint=(public_endpoint or endpoint),
access_key=access_key,
secret_key=secret_key,
secure=secure,
region="us-east-1",
)
async def put_object(
self,
user_id: str,
document_id: str,
file_bytes: bytes,
extension: str,
content_type: str,
) -> str:
"""Store bytes in MinIO and return the generated object key.
Key schema: {user_id}/{document_id}/{uuid4()}{extension}
The filename is NOT a parameter — STORE-02 compliance.
"""
object_key = f"{user_id}/{document_id}/{uuid.uuid4()}{extension}"
data = io.BytesIO(file_bytes)
data.seek(0) # belt-and-braces: BytesIO constructor leaves pointer at 0 already
await asyncio.to_thread(
self._client.put_object,
self._bucket,
object_key,
data,
length=len(file_bytes),
content_type=content_type,
)
return object_key
async def put_object_raw(
self,
bucket: str,
key: str,
data: io.BytesIO,
length: int,
content_type: str,
) -> None:
"""Upload bytes to an arbitrary bucket+key (used for audit-logs CSV export).
Unlike put_object(), does NOT apply the document key schema — the caller
supplies the complete key. The main documents bucket is NOT used.
"""
await asyncio.to_thread(
self._client.put_object,
bucket,
key,
data,
length=length,
content_type=content_type,
)
async def get_object(self, object_key: str) -> bytes:
"""Fetch object bytes from MinIO by key."""
def _fetch() -> bytes:
response = self._client.get_object(self._bucket, object_key)
try:
return response.read()
finally:
response.close()
response.release_conn()
return await asyncio.to_thread(_fetch)
async def delete_object(self, object_key: str) -> None:
"""Delete an object from MinIO by key."""
await asyncio.to_thread(self._client.remove_object, self._bucket, object_key)
async def presigned_get_url(
self, object_key: str, expires_minutes: int = 60
) -> str:
"""Return a time-limited pre-signed download URL with a browser-resolvable hostname."""
return await asyncio.to_thread(
self._public_client.presigned_get_object,
self._bucket,
object_key,
timedelta(minutes=expires_minutes),
)
async def health_check(self) -> bool:
"""Return True when the configured bucket is reachable; False on any exception."""
try:
return await asyncio.to_thread(self._client.bucket_exists, self._bucket)
except Exception:
return False
async def generate_presigned_put_url(
self, object_key: str, expires_minutes: int = 15
) -> str:
"""Return a presigned PUT URL with a browser-resolvable hostname.
Uses _public_client (localhost:9000) so the browser can resolve the host.
The MinIO Python SDK generates presigned URLs client-side; MINIO_SERVER_URL
on the container does NOT rewrite SDK-generated URLs (T-03-10 / Finding 3).
"""
return await asyncio.to_thread(
self._public_client.presigned_put_object,
self._bucket,
object_key,
timedelta(minutes=expires_minutes),
)
async def stat_object(self, object_key: str) -> int:
"""Return the authoritative file size in bytes from MinIO stat.
Calls self._client.stat_object (internal endpoint) and returns .size.
RESEARCH.md Finding 5: stat_object returns .size as int (authoritative).
Raises minio.error.S3Error(code='NoSuchKey') if the object does not exist.
"""
result = await asyncio.to_thread(
self._client.stat_object, self._bucket, object_key
)
return result.size