feat: add storage-service container with pluggable backends (Phase 1)

New FastAPI microservice (port 8020) providing unified blob storage via
PUT/GET/DELETE/LIST HTTP API. Local filesystem backend is the default (zero
extra deps). S3-compatible and WebDAV backends are built in. Backend is
switchable at runtime via POST /migrate, which copies all objects to the new
backend, verifies each one, atomically switches, then cleans up the old backend.

WebDAV XML parsing uses defusedxml to prevent XXE attacks.

Wired into docker-compose (storage_data volume) and registered in the backend
service-health poller as 'storage-service'.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
curo1305
2026-04-20 15:50:31 +02:00
parent 50d2348b36
commit 5349f21752
27 changed files with 1052 additions and 3 deletions
@@ -0,0 +1,70 @@
import logging
from app.core.config import settings
from app.services.backends.base import AbstractStorageBackend
from app.services.backends.local import LocalFSBackend
from app.services.backends.s3 import S3Backend
from app.services.backends.webdav import WebDAVBackend
logger = logging.getLogger(__name__)
_active_backend: AbstractStorageBackend | None = None
def build_backend(driver: str, config: dict) -> AbstractStorageBackend:
"""Construct a backend instance from a driver name + config dict."""
if driver == "local":
return LocalFSBackend(data_dir=config.get("data_dir", settings.DATA_DIR))
if driver == "s3":
return S3Backend(
endpoint_url=config.get("endpoint_url", ""),
access_key=config.get("access_key", ""),
secret_key=config.get("secret_key", ""),
region=config.get("region", "us-east-1"),
)
if driver == "webdav":
return WebDAVBackend(
url=config.get("url", ""),
username=config.get("username", ""),
password=config.get("password", ""),
root_path=config.get("root_path", "/"),
)
raise ValueError(f"Unknown driver: {driver!r}. Valid options: local, s3, webdav")
def initialize_backend() -> None:
"""Build the initial backend from environment variables at startup."""
global _active_backend
driver = settings.STORAGE_BACKEND
config: dict = {}
if driver == "s3":
config = {
"endpoint_url": settings.S3_ENDPOINT_URL,
"access_key": settings.S3_ACCESS_KEY,
"secret_key": settings.S3_SECRET_KEY,
"region": settings.S3_REGION,
}
elif driver == "webdav":
config = {
"url": settings.WEBDAV_URL,
"username": settings.WEBDAV_USERNAME,
"password": settings.WEBDAV_PASSWORD,
"root_path": settings.WEBDAV_ROOT_PATH,
}
# local needs no extra config — DATA_DIR is read from settings inside build_backend
_active_backend = build_backend(driver, config)
logger.info("Storage backend initialized: %s", driver)
def get_backend() -> AbstractStorageBackend:
if _active_backend is None:
raise RuntimeError("Backend not initialized — call initialize_backend() at startup")
return _active_backend
def switch_backend(new_backend: AbstractStorageBackend) -> None:
"""Replace the active backend. Called by the migration job after all data is verified."""
global _active_backend
old_name = _active_backend.driver_name if _active_backend else "none"
_active_backend = new_backend
logger.info("Storage backend switched: %s%s", old_name, new_backend.driver_name)
@@ -0,0 +1,34 @@
from abc import ABC, abstractmethod
class AbstractStorageBackend(ABC):
"""Common interface every storage backend must implement."""
@property
@abstractmethod
def driver_name(self) -> str:
"""Short identifier returned in /health: 'local', 's3', or 'webdav'."""
@abstractmethod
async def put(self, bucket: str, key: str, data: bytes) -> None:
"""Store *data* under bucket/key. Creates bucket/intermediate dirs as needed."""
@abstractmethod
async def get(self, bucket: str, key: str) -> bytes:
"""Return the stored bytes. Raises KeyError if the object does not exist."""
@abstractmethod
async def delete(self, bucket: str, key: str) -> None:
"""Delete the object. No-op if it does not exist."""
@abstractmethod
async def list_keys(self, bucket: str) -> list[str]:
"""Return all keys stored in *bucket*. Returns [] if bucket is empty/absent."""
@abstractmethod
async def exists(self, bucket: str, key: str) -> bool:
"""Return True if the object exists."""
@abstractmethod
async def test_connection(self) -> None:
"""Verify the backend is reachable and writable. Raise on failure."""
@@ -0,0 +1,67 @@
import asyncio
from pathlib import Path
import aiofiles
from .base import AbstractStorageBackend
class LocalFSBackend(AbstractStorageBackend):
"""Stores objects as files under <data_dir>/<bucket>/<key>."""
def __init__(self, data_dir: str) -> None:
self._root = Path(data_dir)
self._root.mkdir(parents=True, exist_ok=True)
@property
def driver_name(self) -> str:
return "local"
def _resolve(self, bucket: str, key: str) -> Path:
safe_key = key.lstrip("/")
if ".." in safe_key.split("/"):
raise ValueError(f"Invalid key: {key!r}")
return self._root / bucket / safe_key
async def put(self, bucket: str, key: str, data: bytes) -> None:
dest = self._resolve(bucket, key)
await asyncio.to_thread(dest.parent.mkdir, parents=True, exist_ok=True)
async with aiofiles.open(dest, "wb") as f:
await f.write(data)
async def get(self, bucket: str, key: str) -> bytes:
path = self._resolve(bucket, key)
if not path.exists():
raise KeyError(f"{bucket}/{key}")
async with aiofiles.open(path, "rb") as f:
return await f.read()
async def delete(self, bucket: str, key: str) -> None:
path = self._resolve(bucket, key)
try:
await asyncio.to_thread(path.unlink, missing_ok=True)
except OSError:
pass
async def list_keys(self, bucket: str) -> list[str]:
bucket_dir = self._root / bucket
if not bucket_dir.exists():
return []
def _scan() -> list[str]:
return [
str(p.relative_to(bucket_dir))
for p in bucket_dir.rglob("*")
if p.is_file()
]
return await asyncio.to_thread(_scan)
async def exists(self, bucket: str, key: str) -> bool:
return self._resolve(bucket, key).exists()
async def test_connection(self) -> None:
self._root.mkdir(parents=True, exist_ok=True)
probe = self._root / ".health_probe"
probe.write_bytes(b"ok")
probe.unlink()
@@ -0,0 +1,99 @@
import logging
from contextlib import asynccontextmanager
from aiobotocore.session import get_session
from .base import AbstractStorageBackend
logger = logging.getLogger(__name__)
class S3Backend(AbstractStorageBackend):
"""
S3-compatible backend. Works with AWS S3, MinIO, Backblaze B2, Cloudflare R2, etc.
Set endpoint_url to the service URL for non-AWS providers; leave empty for real AWS.
"""
def __init__(
self,
endpoint_url: str,
access_key: str,
secret_key: str,
region: str = "us-east-1",
) -> None:
self._endpoint_url = endpoint_url or None
self._access_key = access_key
self._secret_key = secret_key
self._region = region
self._session = get_session()
@property
def driver_name(self) -> str:
return "s3"
@asynccontextmanager
async def _client(self):
async with self._session.create_client(
"s3",
endpoint_url=self._endpoint_url,
aws_access_key_id=self._access_key,
aws_secret_access_key=self._secret_key,
region_name=self._region,
) as client:
yield client
async def _ensure_bucket(self, client, bucket: str) -> None:
try:
await client.head_bucket(Bucket=bucket)
except Exception:
try:
if self._region == "us-east-1":
await client.create_bucket(Bucket=bucket)
else:
await client.create_bucket(
Bucket=bucket,
CreateBucketConfiguration={"LocationConstraint": self._region},
)
except Exception as exc:
logger.debug("Bucket create skipped (may already exist): %s", exc)
async def put(self, bucket: str, key: str, data: bytes) -> None:
async with self._client() as client:
await self._ensure_bucket(client, bucket)
await client.put_object(Bucket=bucket, Key=key, Body=data)
async def get(self, bucket: str, key: str) -> bytes:
async with self._client() as client:
try:
response = await client.get_object(Bucket=bucket, Key=key)
return await response["Body"].read()
except Exception as exc:
raise KeyError(f"{bucket}/{key}") from exc
async def delete(self, bucket: str, key: str) -> None:
async with self._client() as client:
await client.delete_object(Bucket=bucket, Key=key)
async def list_keys(self, bucket: str) -> list[str]:
async with self._client() as client:
try:
paginator = client.get_paginator("list_objects_v2")
keys: list[str] = []
async for page in paginator.paginate(Bucket=bucket):
for obj in page.get("Contents", []):
keys.append(obj["Key"])
return keys
except Exception:
return []
async def exists(self, bucket: str, key: str) -> bool:
async with self._client() as client:
try:
await client.head_object(Bucket=bucket, Key=key)
return True
except Exception:
return False
async def test_connection(self) -> None:
async with self._client() as client:
await client.list_buckets()
@@ -0,0 +1,121 @@
import base64
from urllib.parse import quote
import defusedxml.ElementTree as ET
import aiohttp
from .base import AbstractStorageBackend
class WebDAVBackend(AbstractStorageBackend):
"""
WebDAV backend. Compatible with Nextcloud and any standard WebDAV server.
root_path should be the WebDAV root on the server, e.g. '/remote.php/dav/files/username'.
"""
def __init__(
self,
url: str,
username: str,
password: str,
root_path: str = "/",
) -> None:
self._base = url.rstrip("/")
self._root = root_path.rstrip("/")
creds = base64.b64encode(f"{username}:{password}".encode()).decode()
self._auth = f"Basic {creds}"
@property
def driver_name(self) -> str:
return "webdav"
def _url(self, *parts: str) -> str:
encoded = "/".join(quote(p, safe="") for p in parts)
return f"{self._base}{self._root}/{encoded}"
def _headers(self, extra: dict | None = None) -> dict[str, str]:
h = {"Authorization": self._auth}
if extra:
h.update(extra)
return h
async def _ensure_collection(self, session: aiohttp.ClientSession, *parts: str) -> None:
"""MKCOL is idempotent — ignore 405 (already exists)."""
url = self._url(*parts)
async with session.request("MKCOL", url, headers=self._headers()) as resp:
if resp.status not in (200, 201, 405):
pass # best-effort; PUT will fail if directory is truly missing
async def put(self, bucket: str, key: str, data: bytes) -> None:
async with aiohttp.ClientSession() as session:
await self._ensure_collection(session, bucket)
parts = key.split("/")
for i in range(1, len(parts)):
await self._ensure_collection(session, bucket, *parts[:i])
url = self._url(bucket, key)
async with session.put(url, data=data, headers=self._headers()) as resp:
if resp.status not in (200, 201, 204):
raise OSError(f"WebDAV PUT {url}{resp.status}")
async def get(self, bucket: str, key: str) -> bytes:
async with aiohttp.ClientSession() as session:
url = self._url(bucket, key)
async with session.get(url, headers=self._headers()) as resp:
if resp.status == 404:
raise KeyError(f"{bucket}/{key}")
if resp.status != 200:
raise OSError(f"WebDAV GET {url}{resp.status}")
return await resp.read()
async def delete(self, bucket: str, key: str) -> None:
async with aiohttp.ClientSession() as session:
url = self._url(bucket, key)
async with session.delete(url, headers=self._headers()) as resp:
if resp.status not in (200, 204, 404):
raise OSError(f"WebDAV DELETE {url}{resp.status}")
async def list_keys(self, bucket: str) -> list[str]:
async with aiohttp.ClientSession() as session:
url = self._url(bucket)
headers = self._headers({"Depth": "infinity", "Content-Type": "application/xml"})
body = '<?xml version="1.0"?><d:propfind xmlns:d="DAV:"><d:prop><d:resourcetype/></d:prop></d:propfind>'
async with session.request("PROPFIND", url, headers=headers, data=body) as resp:
if resp.status == 404:
return []
if resp.status != 207:
return []
xml_body = await resp.text()
ns = {"d": "DAV:"}
try:
root = ET.fromstring(xml_body)
except ET.ParseError:
return []
prefix = f"{self._base}{self._root}/{quote(bucket, safe='')}/"
keys: list[str] = []
for response in root.findall("d:response", ns):
href = response.findtext("d:href", namespaces=ns) or ""
prop = response.find(".//d:prop", ns)
if prop is not None:
rt = prop.find("d:resourcetype", ns)
if rt is not None and rt.find("d:collection", ns) is not None:
continue # skip directories
if href.startswith(prefix):
keys.append(href[len(prefix):])
return keys
async def exists(self, bucket: str, key: str) -> bool:
async with aiohttp.ClientSession() as session:
url = self._url(bucket, key)
async with session.request("HEAD", url, headers=self._headers()) as resp:
return resp.status == 200
async def test_connection(self) -> None:
async with aiohttp.ClientSession() as session:
root_url = f"{self._base}{self._root}/"
headers = self._headers({"Depth": "0"})
async with session.request("PROPFIND", root_url, headers=headers) as resp:
if resp.status not in (200, 207):
raise ConnectionError(f"WebDAV root PROPFIND → {resp.status}")
@@ -0,0 +1,139 @@
"""
Backend migration service.
Flow:
1. POST /migrate → validate new backend (test_connection)
2. Background task enumerates all objects in all known buckets
3. Each object is copied old → new, then verified
4. Only after 100 % success: atomically switch active backend
5. Delete all objects from old backend
6. If any copy fails: old backend stays active; state = "failed"
7. DELETE /migrate cancels a running migration (old backend stays active)
"""
import logging
from dataclasses import dataclass, field
from typing import Literal
from app.services.backends.base import AbstractStorageBackend
from app.services.backend_manager import get_backend, switch_backend
logger = logging.getLogger(__name__)
# All logical buckets the service knows about — enumerated during migration.
KNOWN_BUCKETS = ["documents", "config"]
MigrationState = Literal[
"idle", "validating", "migrating", "switching", "cleaning", "done", "failed", "cancelled"
]
@dataclass
class _MigrationStatus:
state: MigrationState = "idle"
total: int = 0
done: int = 0
failed: int = 0
errors: list[str] = field(default_factory=list)
_status = _MigrationStatus()
_cancel_requested: bool = False
def get_status() -> dict:
return {
"state": _status.state,
"total": _status.total,
"done": _status.done,
"failed": _status.failed,
"errors": _status.errors[:50], # cap to avoid huge responses
}
def is_in_progress() -> bool:
return _status.state in ("validating", "migrating", "switching", "cleaning")
async def cancel() -> bool:
global _cancel_requested
if _status.state == "migrating":
_cancel_requested = True
return True
return False
async def run_migration(new_backend: AbstractStorageBackend) -> None:
"""
Background task: copy all objects to new_backend, verify, switch, clean old.
Called after the caller has already validated new_backend.test_connection().
"""
global _cancel_requested
_cancel_requested = False
old_backend = get_backend()
_status.state = "migrating"
_status.done = 0
_status.failed = 0
_status.errors.clear()
try:
# Collect all objects across every known bucket
all_objects: list[tuple[str, str]] = []
for bucket in KNOWN_BUCKETS:
try:
keys = await old_backend.list_keys(bucket)
for key in keys:
all_objects.append((bucket, key))
except Exception as exc:
logger.warning("Could not list bucket %r: %s", bucket, exc)
_status.total = len(all_objects)
logger.info("Migration: %d objects to migrate across %d buckets", len(all_objects), len(KNOWN_BUCKETS))
for bucket, key in all_objects:
if _cancel_requested:
_status.state = "cancelled"
logger.info("Migration cancelled (%d/%d done)", _status.done, _status.total)
return
try:
data = await old_backend.get(bucket, key)
await new_backend.put(bucket, key, data)
if not await new_backend.exists(bucket, key):
raise OSError("Verification failed: object absent after PUT")
_status.done += 1
except Exception as exc:
_status.failed += 1
entry = f"{bucket}/{key}: {exc}"
_status.errors.append(entry)
logger.warning("Migration copy failed — %s", entry)
if _status.failed > 0:
_status.state = "failed"
logger.error(
"Migration failed: %d/%d objects could not be copied; old backend remains active",
_status.failed,
_status.total,
)
return
# All objects verified — atomically switch
_status.state = "switching"
switch_backend(new_backend)
# Remove all objects from old backend (best-effort)
_status.state = "cleaning"
for bucket, key in all_objects:
try:
await old_backend.delete(bucket, key)
except Exception as exc:
logger.warning("Cleanup failed for %s/%s: %s", bucket, key, exc)
_status.state = "done"
logger.info("Migration complete: %d objects moved to %s", _status.total, new_backend.driver_name)
except Exception as exc:
_status.state = "failed"
_status.errors.append(f"Unexpected error: {exc}")
logger.exception("Migration aborted with unexpected error")