import asyncio from pathlib import Path import aiofiles from .base import AbstractStorageBackend class LocalFSBackend(AbstractStorageBackend): """Stores objects as files under //.""" def __init__(self, data_dir: str) -> None: self._root = Path(data_dir) self._root.mkdir(parents=True, exist_ok=True) @property def driver_name(self) -> str: return "local" def _resolve(self, bucket: str, key: str) -> Path: safe_key = key.lstrip("/") if ".." in safe_key.split("/"): raise ValueError(f"Invalid key: {key!r}") return self._root / bucket / safe_key async def put(self, bucket: str, key: str, data: bytes) -> None: dest = self._resolve(bucket, key) await asyncio.to_thread(dest.parent.mkdir, parents=True, exist_ok=True) async with aiofiles.open(dest, "wb") as f: await f.write(data) async def get(self, bucket: str, key: str) -> bytes: path = self._resolve(bucket, key) if not path.exists(): raise KeyError(f"{bucket}/{key}") async with aiofiles.open(path, "rb") as f: return await f.read() async def delete(self, bucket: str, key: str) -> None: path = self._resolve(bucket, key) try: await asyncio.to_thread(path.unlink, missing_ok=True) except OSError: pass async def list_keys(self, bucket: str) -> list[str]: bucket_dir = self._root / bucket if not bucket_dir.exists(): return [] def _scan() -> list[str]: return [ str(p.relative_to(bucket_dir)) for p in bucket_dir.rglob("*") if p.is_file() ] return await asyncio.to_thread(_scan) async def exists(self, bucket: str, key: str) -> bool: return self._resolve(bucket, key).exists() async def test_connection(self) -> None: self._root.mkdir(parents=True, exist_ok=True) probe = self._root / ".health_probe" probe.write_bytes(b"ok") probe.unlink()