From 5349f21752acc0e26e9b4fabecdd8fee46d9dac8 Mon Sep 17 00:00:00 2001 From: curo1305 Date: Mon, 20 Apr 2026 15:50:31 +0200 Subject: [PATCH] feat: add storage-service container with pluggable backends (Phase 1) New FastAPI microservice (port 8020) providing unified blob storage via PUT/GET/DELETE/LIST HTTP API. Local filesystem backend is the default (zero extra deps). S3-compatible and WebDAV backends are built in. Backend is switchable at runtime via POST /migrate, which copies all objects to the new backend, verifies each one, atomically switches, then cleans up the old backend. WebDAV XML parsing uses defusedxml to prevent XXE attacks. Wired into docker-compose (storage_data volume) and registered in the backend service-health poller as 'storage-service'. Co-Authored-By: Claude Sonnet 4.6 --- backend/app/core/config.py | 1 + backend/app/main.py | 1 + backend/app/services/service_health.py | 11 +- docker-compose.dev.yml | 5 + docker-compose.yml | 32 +++- features/storage-service/CLAUDE.md | 115 +++++++++++++++ features/storage-service/Dockerfile | 32 ++++ features/storage-service/STATUS.md | 83 +++++++++++ features/storage-service/app/__init__.py | 0 features/storage-service/app/core/__init__.py | 0 features/storage-service/app/core/config.py | 23 +++ features/storage-service/app/main.py | 29 ++++ .../storage-service/app/routers/__init__.py | 0 .../storage-service/app/routers/health.py | 10 ++ .../storage-service/app/routers/migrate.py | 88 +++++++++++ .../storage-service/app/routers/objects.py | 60 ++++++++ .../storage-service/app/services/__init__.py | 0 .../app/services/backend_manager.py | 70 +++++++++ .../app/services/backends/__init__.py | 0 .../app/services/backends/base.py | 34 +++++ .../app/services/backends/local.py | 67 +++++++++ .../app/services/backends/s3.py | 99 +++++++++++++ .../app/services/backends/webdav.py | 121 +++++++++++++++ .../storage-service/app/services/migration.py | 139 ++++++++++++++++++ features/storage-service/pyproject.toml | 25 ++++ features/storage-service/scripts/start.sh | 5 + features/storage-service/scripts/start_dev.sh | 5 + 27 files changed, 1052 insertions(+), 3 deletions(-) create mode 100644 features/storage-service/CLAUDE.md create mode 100644 features/storage-service/Dockerfile create mode 100644 features/storage-service/STATUS.md create mode 100644 features/storage-service/app/__init__.py create mode 100644 features/storage-service/app/core/__init__.py create mode 100644 features/storage-service/app/core/config.py create mode 100644 features/storage-service/app/main.py create mode 100644 features/storage-service/app/routers/__init__.py create mode 100644 features/storage-service/app/routers/health.py create mode 100644 features/storage-service/app/routers/migrate.py create mode 100644 features/storage-service/app/routers/objects.py create mode 100644 features/storage-service/app/services/__init__.py create mode 100644 features/storage-service/app/services/backend_manager.py create mode 100644 features/storage-service/app/services/backends/__init__.py create mode 100644 features/storage-service/app/services/backends/base.py create mode 100644 features/storage-service/app/services/backends/local.py create mode 100644 features/storage-service/app/services/backends/s3.py create mode 100644 features/storage-service/app/services/backends/webdav.py create mode 100644 features/storage-service/app/services/migration.py create mode 100644 features/storage-service/pyproject.toml create mode 100755 features/storage-service/scripts/start.sh create mode 100755 features/storage-service/scripts/start_dev.sh diff --git a/backend/app/core/config.py b/backend/app/core/config.py index 9adb0e5..7502643 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -17,6 +17,7 @@ class Settings(BaseSettings): DOC_SERVICE_URL: str = "http://doc-service:8001" AI_SERVICE_URL: str = "http://ai-service:8010" + STORAGE_SERVICE_URL: str = "http://storage-service:8020" @field_validator("JWT_PRIVATE_KEY", "JWT_PUBLIC_KEY", mode="before") @classmethod diff --git a/backend/app/main.py b/backend/app/main.py index 1274ecb..fb1239d 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -19,6 +19,7 @@ async def lifespan(app: FastAPI): register_services( doc_service_url=settings.DOC_SERVICE_URL, ai_service_url=settings.AI_SERVICE_URL, + storage_service_url=settings.STORAGE_SERVICE_URL, ) # Create -admin groups for every registered service (idempotent) async with AsyncSessionLocal() as db: diff --git a/backend/app/services/service_health.py b/backend/app/services/service_health.py index f2eac01..73c8534 100644 --- a/backend/app/services/service_health.py +++ b/backend/app/services/service_health.py @@ -40,7 +40,7 @@ _health: dict[str, bool | None] = {} _manifests: dict[str, dict | None] = {} -def register_services(doc_service_url: str, ai_service_url: str) -> None: +def register_services(doc_service_url: str, ai_service_url: str, storage_service_url: str) -> None: """Called once during app startup to populate the registry from config.""" global _REGISTRY, _health, _manifests @@ -63,6 +63,15 @@ def register_services(doc_service_url: str, ai_service_url: str) -> None: app_path="", settings_path="/apps/ai/settings", ), + ServiceDefinition( + id="storage-service", + name="Storage", + description="Unified file storage. Manages all uploaded files with pluggable backends (local, S3, WebDAV).", + internal_url=storage_service_url, + health_path="/health", + app_path="", + settings_path="/admin/storage", + ), ] _health = {svc.id: None for svc in _REGISTRY} diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 93f75cb..c8cf529 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -29,6 +29,11 @@ services: volumes: - ./features/ai-service:/app + storage-service: + command: sh scripts/start_dev.sh + volumes: + - ./features/storage-service:/app + doc-service: command: sh scripts/start_dev.sh env_file: ./features/doc-service/.env diff --git a/docker-compose.yml b/docker-compose.yml index e3e3a5f..2ec7324 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -19,6 +19,27 @@ services: networks: - backend-net + # ── Storage service (unified blob storage) ────────────────────────────────── + storage-service: + build: + context: ./features/storage-service + dockerfile: Dockerfile + network: host + user: "1001:1001" + restart: unless-stopped + environment: + STORAGE_BACKEND: local + DATA_DIR: /data/storage + volumes: + - storage_data:/data/storage + healthcheck: + test: ["CMD-SHELL", "python -c \"import urllib.request; urllib.request.urlopen('http://localhost:8020/health')\""] + interval: 10s + timeout: 5s + retries: 5 + networks: + - backend-net + # ── Backend (management) ──────────────────────────────────────────────────── backend: build: @@ -32,11 +53,14 @@ services: DATABASE_URL: postgresql+asyncpg://${POSTGRES_USER:-postgres}:${POSTGRES_PASSWORD:-password}@db:5432/${POSTGRES_DB:-destroying_sap} DOC_SERVICE_URL: http://doc-service:8001 AI_SERVICE_URL: http://ai-service:8010 + STORAGE_SERVICE_URL: http://storage-service:8020 volumes: - app_config:/config depends_on: db: condition: service_healthy + storage-service: + condition: service_healthy networks: - backend-net @@ -68,6 +92,7 @@ services: DATA_DIR: /data/documents CONFIG_PATH: /config/doc_service_config.json AI_SERVICE_URL: http://ai-service:8010 + STORAGE_SERVICE_URL: http://storage-service:8020 volumes: - doc_data:/data/documents - watch_data:/data/watch @@ -77,6 +102,8 @@ services: condition: service_healthy ai-service: condition: service_started + storage-service: + condition: service_healthy networks: - backend-net @@ -98,9 +125,10 @@ services: volumes: postgres_data: - doc_data: # PDF files persisted across restarts + storage_data: # All file/blob storage — managed by storage-service + doc_data: # PDF files persisted across restarts (to be removed after Phase 2 migration) watch_data: # Watch directory — bind-mount your NAS/Nextcloud here via docker-compose.override.yml - app_config: # Per-service runtime config JSON files + app_config: # Per-service runtime config JSON files (to be removed after Phase 3 migration) networks: # backend-net: db ↔ backend ↔ doc-service. No host ports bound. diff --git a/features/storage-service/CLAUDE.md b/features/storage-service/CLAUDE.md new file mode 100644 index 0000000..ed24a86 --- /dev/null +++ b/features/storage-service/CLAUDE.md @@ -0,0 +1,115 @@ +# storage-service — Claude context + +Unified file/blob storage microservice, port 8020 (internal). All services must use this service's +HTTP API for any file persistence — no service may write to a Docker volume directly. See root +`CLAUDE.md` for architecture, Docker, and project-wide workflows. + +--- + +## Architecture rule (enforced) + +**No service may write to a filesystem path for persistent data.** +All file/blob storage must go through the storage-service HTTP API. +Violation is a security/architecture defect. + +--- + +## File & Folder Tree + +``` +features/storage-service/ +├── app/ +│ ├── main.py ← FastAPI, lifespan (backend init) +│ ├── core/config.py ← Settings (DATA_DIR, STORAGE_BACKEND, S3_*, WEBDAV_*) +│ ├── routers/ +│ │ ├── health.py ← GET /health +│ │ ├── objects.py ← PUT/GET/DELETE /objects/{bucket}/{key:path}, GET /objects/{bucket} +│ │ └── migrate.py ← POST /migrate, GET /migrate/status, DELETE /migrate, PATCH /backend-config +│ └── services/ +│ ├── backend_manager.py ← build_backend(), initialize_backend(), get_backend(), switch_backend() +│ ├── migration.py ← run_migration(), get_status(), cancel(); KNOWN_BUCKETS +│ └── backends/ +│ ├── base.py ← AbstractStorageBackend (ABC) +│ ├── local.py ← LocalFSBackend — /data/storage/{bucket}/{key} +│ ├── s3.py ← S3Backend — aiobotocore, endpoint_url configurable +│ └── webdav.py ← WebDAVBackend — aiohttp + WebDAV PROPFIND/PUT/GET/DELETE +├── scripts/ +│ ├── start.sh ← prod start (uvicorn port 8020) +│ └── start_dev.sh ← dev start (uvicorn --reload) +├── Dockerfile ← python:3.12-slim, non-root user 1001 +└── STATUS.md +``` + +--- + +## HTTP API + +### Objects + +| Method | Path | Body | Response | +|--------|------|------|----------| +| PUT | `/objects/{bucket}/{key:path}` | Raw bytes | 204 | +| GET | `/objects/{bucket}/{key:path}` | — | 200 Raw bytes / 404 | +| DELETE | `/objects/{bucket}/{key:path}` | — | 204 | +| GET | `/objects/{bucket}` | — | `{"bucket": "...", "keys": [...]}` | + +Keys may contain `/` (e.g. `user123/abc.pdf`). Path traversal (`..`) returns 400. + +### Migration + +| Method | Path | Body | Response | +|--------|------|------|----------| +| POST | `/migrate` | `{"driver": "s3", "config": {...}}` | 202 / 400 / 409 | +| GET | `/migrate/status` | — | `{state, total, done, failed, errors[]}` | +| DELETE | `/migrate` | — | 204 / 409 | +| PATCH | `/backend-config` | `{"driver": "...", "config": {...}}` | 204 / 400 / 409 | + +Migration states: `idle → validating → migrating → switching → cleaning → done` (or `failed`/`cancelled`) + +### Health + +| Method | Path | Response | +|--------|------|----------| +| GET | `/health` | `{"status": "ok", "backend": "local"}` | + +--- + +## Buckets + +| Bucket | Contents | Key format | +|--------|----------|------------| +| `documents` | Uploaded PDFs | `{user_id}/{doc_id}.pdf` or `watch/{doc_id}.pdf` | +| `config` | JSON config files | `{service_name}_config.json` | + +To add a new bucket: add it to `KNOWN_BUCKETS` in `services/migration.py` so it is included in migrations. + +--- + +## Backend drivers + +| Driver | Config fields | Notes | +|--------|---------------|-------| +| `local` | `data_dir` (optional) | Default. Files under `/data/storage/`. Zero external deps. | +| `s3` | `endpoint_url`, `access_key`, `secret_key`, `region` | Works with MinIO, AWS S3, Backblaze B2, Cloudflare R2. Set `endpoint_url=""` for real AWS. | +| `webdav` | `url`, `username`, `password`, `root_path` | Nextcloud: set root_path to `/remote.php/dav/files/{username}` | + +--- + +## Adding a new backend driver + +1. Create `app/services/backends/your_driver.py` implementing `AbstractStorageBackend` +2. Add a branch in `build_backend()` in `backend_manager.py` +3. Add config fields to `app/core/config.py` if env-based config is needed +4. Document driver name + config fields in this file + +--- + +## Default Values & Limits + +| Parameter | Value | Location | +|-----------|-------|----------| +| Default backend | `local` | `STORAGE_BACKEND` env var | +| Local data dir | `/data/storage` | `DATA_DIR` env var | +| S3 region default | `us-east-1` | `S3_REGION` env var | +| Migration error cap in response | 50 | `migration.py` | +| Port | 8020 | `scripts/start.sh` | diff --git a/features/storage-service/Dockerfile b/features/storage-service/Dockerfile new file mode 100644 index 0000000..e5eaec9 --- /dev/null +++ b/features/storage-service/Dockerfile @@ -0,0 +1,32 @@ +# ── Stage 1: dependency installation ───────────────────────────────────────── +FROM python:3.12-slim AS builder + +WORKDIR /app + +RUN pip install --upgrade pip + +COPY pyproject.toml . +RUN pip install --prefix=/install . + +# ── Stage 2: runtime ────────────────────────────────────────────────────────── +FROM python:3.12-slim + +# Create non-root user (UID/GID 1001) +RUN groupadd --gid 1001 appuser && \ + useradd --uid 1001 --gid 1001 --no-create-home --shell /bin/sh appuser + +# Pre-create data dir with correct ownership. +# Named volume mounted over this path will inherit ownership on first creation. +RUN mkdir -p /data/storage && chown -R appuser:appuser /data + +WORKDIR /app + +COPY --from=builder /install /usr/local +COPY --chown=appuser:appuser app ./app +COPY --chown=appuser:appuser scripts ./scripts + +USER appuser + +EXPOSE 8020 + +CMD ["sh", "scripts/start.sh"] diff --git a/features/storage-service/STATUS.md b/features/storage-service/STATUS.md new file mode 100644 index 0000000..906b71c --- /dev/null +++ b/features/storage-service/STATUS.md @@ -0,0 +1,83 @@ +# Storage Service — Status + +## What it is +Unified file/blob storage microservice, port 8020 (internal). All services store and retrieve files +through its HTTP API — no service writes to a Docker volume directly. Uses a pluggable backend +driver (local FS by default; S3-compatible and WebDAV available). Backend is switchable at runtime +via admin settings with automatic data migration. + +## Current functionality + +### Object API (`/objects`) + +| Method | Path | Description | +|--------|------|-------------| +| PUT | `/objects/{bucket}/{key}` | Upload raw bytes | +| GET | `/objects/{bucket}/{key}` | Download raw bytes | +| DELETE | `/objects/{bucket}/{key}` | Delete object | +| GET | `/objects/{bucket}` | List all keys in bucket | + +### Migration API (`/migrate`) + +| Method | Path | Description | +|--------|------|-------------| +| POST | `/migrate` | Start migration to a new backend (validates, copies, switches, cleans) | +| GET | `/migrate/status` | Poll migration progress | +| DELETE | `/migrate` | Cancel in-progress migration | +| PATCH | `/backend-config` | Reconfigure backend without migrating data | + +### Health + +| Method | Path | Description | +|--------|------|-------------| +| GET | `/health` | `{"status": "ok", "backend": ""}` | + +### Buckets + +| Bucket | Contents | +|--------|----------| +| `documents` | Uploaded PDFs (keyed as `{user_id}/{doc_id}.pdf` or `watch/{doc_id}.pdf`) | +| `config` | JSON config files (replaces `app_config` volume — Phase 3) | + +## Architecture + +``` +backend / doc-service / future-svc + │ HTTP + ▼ + storage-service:8020 + │ + backend_manager + │ + ┌──────┴──────────────────┐ + │ │ + LocalFSBackend S3Backend / WebDAVBackend + /data/storage/ (configured via admin UI) + {bucket}/{key} +``` + +Migration flow: +``` +POST /migrate { driver, config } + → test_connection() (validate) + → list all objects in KNOWN_BUCKETS (enumerate) + → GET old / PUT new / exists verify (copy + verify, per object) + → if 0 failures: switch_backend() (atomic switch) + → DELETE old objects (cleanup) +``` + +## Known limitations / not implemented + +- Migration state is in-memory — a container restart during migration loses progress (restart restarts from scratch) +- No presigned URL support (direct client downloads go through the API) +- rclone backends (Google Drive, OneDrive, Dropbox) not yet implemented +- No per-object metadata or content-type headers +- No multipart upload for very large files (> available RAM) + +## Future work + +- [ ] rclone-based backend adapter (GDrive, OneDrive, Dropbox) +- [ ] Presigned URL generation for direct browser downloads +- [ ] Persist migration state to DB so restarts can resume +- [ ] Streaming upload/download to avoid buffering entire file in memory +- [ ] Per-bucket access policies diff --git a/features/storage-service/app/__init__.py b/features/storage-service/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/features/storage-service/app/core/__init__.py b/features/storage-service/app/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/features/storage-service/app/core/config.py b/features/storage-service/app/core/config.py new file mode 100644 index 0000000..5fae515 --- /dev/null +++ b/features/storage-service/app/core/config.py @@ -0,0 +1,23 @@ +from pydantic_settings import BaseSettings, SettingsConfigDict + + +class Settings(BaseSettings): + model_config = SettingsConfigDict(env_file=".env", extra="ignore") + + DATA_DIR: str = "/data/storage" + STORAGE_BACKEND: str = "local" # local | s3 | webdav + + # S3-compatible (MinIO, AWS S3, Backblaze B2, Cloudflare R2, …) + S3_ENDPOINT_URL: str = "" # leave empty for real AWS S3 + S3_ACCESS_KEY: str = "" + S3_SECRET_KEY: str = "" + S3_REGION: str = "us-east-1" + + # WebDAV (Nextcloud, …) + WEBDAV_URL: str = "" + WEBDAV_USERNAME: str = "" + WEBDAV_PASSWORD: str = "" + WEBDAV_ROOT_PATH: str = "/" + + +settings = Settings() diff --git a/features/storage-service/app/main.py b/features/storage-service/app/main.py new file mode 100644 index 0000000..277088d --- /dev/null +++ b/features/storage-service/app/main.py @@ -0,0 +1,29 @@ +import logging +from contextlib import asynccontextmanager + +from fastapi import FastAPI + +from app.core.config import settings +from app.routers import health, objects, migrate +from app.services.backend_manager import initialize_backend + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", +) +logger = logging.getLogger(__name__) + + +@asynccontextmanager +async def lifespan(app: FastAPI): + initialize_backend() + logger.info("storage-service started (backend=%s)", settings.STORAGE_BACKEND) + yield + logger.info("storage-service shutting down") + + +app = FastAPI(title="Storage Service", lifespan=lifespan) + +app.include_router(health.router) +app.include_router(objects.router) +app.include_router(migrate.router) diff --git a/features/storage-service/app/routers/__init__.py b/features/storage-service/app/routers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/features/storage-service/app/routers/health.py b/features/storage-service/app/routers/health.py new file mode 100644 index 0000000..9cb782c --- /dev/null +++ b/features/storage-service/app/routers/health.py @@ -0,0 +1,10 @@ +from fastapi import APIRouter + +from app.services.backend_manager import get_backend + +router = APIRouter() + + +@router.get("/health") +async def health(): + return {"status": "ok", "backend": get_backend().driver_name} diff --git a/features/storage-service/app/routers/migrate.py b/features/storage-service/app/routers/migrate.py new file mode 100644 index 0000000..55a4f10 --- /dev/null +++ b/features/storage-service/app/routers/migrate.py @@ -0,0 +1,88 @@ +import logging + +from fastapi import APIRouter, BackgroundTasks, HTTPException +from pydantic import BaseModel + +from app.services import migration +from app.services.backend_manager import build_backend, switch_backend + +router = APIRouter() +logger = logging.getLogger(__name__) + + +class MigrateRequest(BaseModel): + driver: str + config: dict = {} + + +class BackendConfigRequest(BaseModel): + driver: str + config: dict = {} + + +@router.post("/migrate", status_code=202) +async def start_migration(body: MigrateRequest, background_tasks: BackgroundTasks): + """ + Validate the new backend, then start an async migration job that: + 1. Copies all objects from the current backend to the new one + 2. Verifies every object + 3. Atomically switches the active backend + 4. Deletes all objects from the old backend + + Returns 409 if a migration is already in progress. + Returns 400 if the new backend config fails validation. + """ + if migration.is_in_progress(): + raise HTTPException(status_code=409, detail="A migration is already in progress") + + # Reset status and enter validating state before any async work + migration._status.state = "validating" + migration._status.total = 0 + migration._status.done = 0 + migration._status.failed = 0 + migration._status.errors.clear() + + try: + new_backend = build_backend(body.driver, body.config) + await new_backend.test_connection() + except Exception as exc: + migration._status.state = "idle" + raise HTTPException(status_code=400, detail=f"Backend validation failed: {exc}") + + background_tasks.add_task(migration.run_migration, new_backend) + return {"status": "started", "driver": body.driver} + + +@router.get("/migrate/status") +async def migration_status(): + """Poll this to track migration progress.""" + return migration.get_status() + + +@router.delete("/migrate", status_code=204) +async def cancel_migration(): + """ + Request cancellation of a running migration. + The old backend remains active. Returns 409 if no migration is running. + """ + cancelled = await migration.cancel() + if not cancelled: + raise HTTPException(status_code=409, detail="No cancellable migration in progress") + + +@router.patch("/backend-config", status_code=204) +async def update_backend_config(body: BackendConfigRequest): + """ + Reconfigure the active backend without migrating data (e.g. update S3 credentials + for the same endpoint, or switch back to local after a failed migration). + + Use POST /migrate when you need data to be moved to the new backend. + """ + if migration.is_in_progress(): + raise HTTPException(status_code=409, detail="Cannot reconfigure while migration is in progress") + try: + new_backend = build_backend(body.driver, body.config) + await new_backend.test_connection() + except Exception as exc: + raise HTTPException(status_code=400, detail=f"Backend validation failed: {exc}") + switch_backend(new_backend) diff --git a/features/storage-service/app/routers/objects.py b/features/storage-service/app/routers/objects.py new file mode 100644 index 0000000..84a6777 --- /dev/null +++ b/features/storage-service/app/routers/objects.py @@ -0,0 +1,60 @@ +from fastapi import APIRouter, HTTPException, Request +from fastapi.responses import Response + +from app.services.backend_manager import get_backend + +router = APIRouter() + + +def _validate_key(key: str) -> str: + """Reject path traversal. Key may contain '/' for nested objects (e.g. user/doc.pdf).""" + parts = key.split("/") + if ".." in parts: + raise HTTPException(status_code=400, detail="Invalid key: path traversal not allowed") + return key + + +@router.put("/objects/{bucket}/{key:path}", status_code=204) +async def put_object(bucket: str, key: str, request: Request): + """Upload raw bytes. Body is read as-is (application/octet-stream).""" + _validate_key(key) + data = await request.body() + try: + await get_backend().put(bucket, key, data) + except ValueError as exc: + raise HTTPException(status_code=400, detail=str(exc)) + except Exception as exc: + raise HTTPException(status_code=500, detail=str(exc)) + + +@router.get("/objects/{bucket}/{key:path}") +async def get_object(bucket: str, key: str): + """Download raw bytes.""" + _validate_key(key) + try: + data = await get_backend().get(bucket, key) + except KeyError: + raise HTTPException(status_code=404, detail="Object not found") + except Exception as exc: + raise HTTPException(status_code=500, detail=str(exc)) + return Response(content=data, media_type="application/octet-stream") + + +@router.delete("/objects/{bucket}/{key:path}", status_code=204) +async def delete_object(bucket: str, key: str): + """Delete an object. No-op if it does not exist.""" + _validate_key(key) + try: + await get_backend().delete(bucket, key) + except Exception as exc: + raise HTTPException(status_code=500, detail=str(exc)) + + +@router.get("/objects/{bucket}") +async def list_objects(bucket: str): + """List all keys in a bucket.""" + try: + keys = await get_backend().list_keys(bucket) + except Exception as exc: + raise HTTPException(status_code=500, detail=str(exc)) + return {"bucket": bucket, "keys": keys} diff --git a/features/storage-service/app/services/__init__.py b/features/storage-service/app/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/features/storage-service/app/services/backend_manager.py b/features/storage-service/app/services/backend_manager.py new file mode 100644 index 0000000..95bd996 --- /dev/null +++ b/features/storage-service/app/services/backend_manager.py @@ -0,0 +1,70 @@ +import logging + +from app.core.config import settings +from app.services.backends.base import AbstractStorageBackend +from app.services.backends.local import LocalFSBackend +from app.services.backends.s3 import S3Backend +from app.services.backends.webdav import WebDAVBackend + +logger = logging.getLogger(__name__) + +_active_backend: AbstractStorageBackend | None = None + + +def build_backend(driver: str, config: dict) -> AbstractStorageBackend: + """Construct a backend instance from a driver name + config dict.""" + if driver == "local": + return LocalFSBackend(data_dir=config.get("data_dir", settings.DATA_DIR)) + if driver == "s3": + return S3Backend( + endpoint_url=config.get("endpoint_url", ""), + access_key=config.get("access_key", ""), + secret_key=config.get("secret_key", ""), + region=config.get("region", "us-east-1"), + ) + if driver == "webdav": + return WebDAVBackend( + url=config.get("url", ""), + username=config.get("username", ""), + password=config.get("password", ""), + root_path=config.get("root_path", "/"), + ) + raise ValueError(f"Unknown driver: {driver!r}. Valid options: local, s3, webdav") + + +def initialize_backend() -> None: + """Build the initial backend from environment variables at startup.""" + global _active_backend + driver = settings.STORAGE_BACKEND + config: dict = {} + if driver == "s3": + config = { + "endpoint_url": settings.S3_ENDPOINT_URL, + "access_key": settings.S3_ACCESS_KEY, + "secret_key": settings.S3_SECRET_KEY, + "region": settings.S3_REGION, + } + elif driver == "webdav": + config = { + "url": settings.WEBDAV_URL, + "username": settings.WEBDAV_USERNAME, + "password": settings.WEBDAV_PASSWORD, + "root_path": settings.WEBDAV_ROOT_PATH, + } + # local needs no extra config — DATA_DIR is read from settings inside build_backend + _active_backend = build_backend(driver, config) + logger.info("Storage backend initialized: %s", driver) + + +def get_backend() -> AbstractStorageBackend: + if _active_backend is None: + raise RuntimeError("Backend not initialized — call initialize_backend() at startup") + return _active_backend + + +def switch_backend(new_backend: AbstractStorageBackend) -> None: + """Replace the active backend. Called by the migration job after all data is verified.""" + global _active_backend + old_name = _active_backend.driver_name if _active_backend else "none" + _active_backend = new_backend + logger.info("Storage backend switched: %s → %s", old_name, new_backend.driver_name) diff --git a/features/storage-service/app/services/backends/__init__.py b/features/storage-service/app/services/backends/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/features/storage-service/app/services/backends/base.py b/features/storage-service/app/services/backends/base.py new file mode 100644 index 0000000..7defc1f --- /dev/null +++ b/features/storage-service/app/services/backends/base.py @@ -0,0 +1,34 @@ +from abc import ABC, abstractmethod + + +class AbstractStorageBackend(ABC): + """Common interface every storage backend must implement.""" + + @property + @abstractmethod + def driver_name(self) -> str: + """Short identifier returned in /health: 'local', 's3', or 'webdav'.""" + + @abstractmethod + async def put(self, bucket: str, key: str, data: bytes) -> None: + """Store *data* under bucket/key. Creates bucket/intermediate dirs as needed.""" + + @abstractmethod + async def get(self, bucket: str, key: str) -> bytes: + """Return the stored bytes. Raises KeyError if the object does not exist.""" + + @abstractmethod + async def delete(self, bucket: str, key: str) -> None: + """Delete the object. No-op if it does not exist.""" + + @abstractmethod + async def list_keys(self, bucket: str) -> list[str]: + """Return all keys stored in *bucket*. Returns [] if bucket is empty/absent.""" + + @abstractmethod + async def exists(self, bucket: str, key: str) -> bool: + """Return True if the object exists.""" + + @abstractmethod + async def test_connection(self) -> None: + """Verify the backend is reachable and writable. Raise on failure.""" diff --git a/features/storage-service/app/services/backends/local.py b/features/storage-service/app/services/backends/local.py new file mode 100644 index 0000000..bde237f --- /dev/null +++ b/features/storage-service/app/services/backends/local.py @@ -0,0 +1,67 @@ +import asyncio +from pathlib import Path + +import aiofiles + +from .base import AbstractStorageBackend + + +class LocalFSBackend(AbstractStorageBackend): + """Stores objects as files under //.""" + + def __init__(self, data_dir: str) -> None: + self._root = Path(data_dir) + self._root.mkdir(parents=True, exist_ok=True) + + @property + def driver_name(self) -> str: + return "local" + + def _resolve(self, bucket: str, key: str) -> Path: + safe_key = key.lstrip("/") + if ".." in safe_key.split("/"): + raise ValueError(f"Invalid key: {key!r}") + return self._root / bucket / safe_key + + async def put(self, bucket: str, key: str, data: bytes) -> None: + dest = self._resolve(bucket, key) + await asyncio.to_thread(dest.parent.mkdir, parents=True, exist_ok=True) + async with aiofiles.open(dest, "wb") as f: + await f.write(data) + + async def get(self, bucket: str, key: str) -> bytes: + path = self._resolve(bucket, key) + if not path.exists(): + raise KeyError(f"{bucket}/{key}") + async with aiofiles.open(path, "rb") as f: + return await f.read() + + async def delete(self, bucket: str, key: str) -> None: + path = self._resolve(bucket, key) + try: + await asyncio.to_thread(path.unlink, missing_ok=True) + except OSError: + pass + + async def list_keys(self, bucket: str) -> list[str]: + bucket_dir = self._root / bucket + if not bucket_dir.exists(): + return [] + + def _scan() -> list[str]: + return [ + str(p.relative_to(bucket_dir)) + for p in bucket_dir.rglob("*") + if p.is_file() + ] + + return await asyncio.to_thread(_scan) + + async def exists(self, bucket: str, key: str) -> bool: + return self._resolve(bucket, key).exists() + + async def test_connection(self) -> None: + self._root.mkdir(parents=True, exist_ok=True) + probe = self._root / ".health_probe" + probe.write_bytes(b"ok") + probe.unlink() diff --git a/features/storage-service/app/services/backends/s3.py b/features/storage-service/app/services/backends/s3.py new file mode 100644 index 0000000..e8d52f6 --- /dev/null +++ b/features/storage-service/app/services/backends/s3.py @@ -0,0 +1,99 @@ +import logging +from contextlib import asynccontextmanager + +from aiobotocore.session import get_session + +from .base import AbstractStorageBackend + +logger = logging.getLogger(__name__) + + +class S3Backend(AbstractStorageBackend): + """ + S3-compatible backend. Works with AWS S3, MinIO, Backblaze B2, Cloudflare R2, etc. + Set endpoint_url to the service URL for non-AWS providers; leave empty for real AWS. + """ + + def __init__( + self, + endpoint_url: str, + access_key: str, + secret_key: str, + region: str = "us-east-1", + ) -> None: + self._endpoint_url = endpoint_url or None + self._access_key = access_key + self._secret_key = secret_key + self._region = region + self._session = get_session() + + @property + def driver_name(self) -> str: + return "s3" + + @asynccontextmanager + async def _client(self): + async with self._session.create_client( + "s3", + endpoint_url=self._endpoint_url, + aws_access_key_id=self._access_key, + aws_secret_access_key=self._secret_key, + region_name=self._region, + ) as client: + yield client + + async def _ensure_bucket(self, client, bucket: str) -> None: + try: + await client.head_bucket(Bucket=bucket) + except Exception: + try: + if self._region == "us-east-1": + await client.create_bucket(Bucket=bucket) + else: + await client.create_bucket( + Bucket=bucket, + CreateBucketConfiguration={"LocationConstraint": self._region}, + ) + except Exception as exc: + logger.debug("Bucket create skipped (may already exist): %s", exc) + + async def put(self, bucket: str, key: str, data: bytes) -> None: + async with self._client() as client: + await self._ensure_bucket(client, bucket) + await client.put_object(Bucket=bucket, Key=key, Body=data) + + async def get(self, bucket: str, key: str) -> bytes: + async with self._client() as client: + try: + response = await client.get_object(Bucket=bucket, Key=key) + return await response["Body"].read() + except Exception as exc: + raise KeyError(f"{bucket}/{key}") from exc + + async def delete(self, bucket: str, key: str) -> None: + async with self._client() as client: + await client.delete_object(Bucket=bucket, Key=key) + + async def list_keys(self, bucket: str) -> list[str]: + async with self._client() as client: + try: + paginator = client.get_paginator("list_objects_v2") + keys: list[str] = [] + async for page in paginator.paginate(Bucket=bucket): + for obj in page.get("Contents", []): + keys.append(obj["Key"]) + return keys + except Exception: + return [] + + async def exists(self, bucket: str, key: str) -> bool: + async with self._client() as client: + try: + await client.head_object(Bucket=bucket, Key=key) + return True + except Exception: + return False + + async def test_connection(self) -> None: + async with self._client() as client: + await client.list_buckets() diff --git a/features/storage-service/app/services/backends/webdav.py b/features/storage-service/app/services/backends/webdav.py new file mode 100644 index 0000000..2c80c50 --- /dev/null +++ b/features/storage-service/app/services/backends/webdav.py @@ -0,0 +1,121 @@ +import base64 +from urllib.parse import quote + +import defusedxml.ElementTree as ET + +import aiohttp + +from .base import AbstractStorageBackend + + +class WebDAVBackend(AbstractStorageBackend): + """ + WebDAV backend. Compatible with Nextcloud and any standard WebDAV server. + root_path should be the WebDAV root on the server, e.g. '/remote.php/dav/files/username'. + """ + + def __init__( + self, + url: str, + username: str, + password: str, + root_path: str = "/", + ) -> None: + self._base = url.rstrip("/") + self._root = root_path.rstrip("/") + creds = base64.b64encode(f"{username}:{password}".encode()).decode() + self._auth = f"Basic {creds}" + + @property + def driver_name(self) -> str: + return "webdav" + + def _url(self, *parts: str) -> str: + encoded = "/".join(quote(p, safe="") for p in parts) + return f"{self._base}{self._root}/{encoded}" + + def _headers(self, extra: dict | None = None) -> dict[str, str]: + h = {"Authorization": self._auth} + if extra: + h.update(extra) + return h + + async def _ensure_collection(self, session: aiohttp.ClientSession, *parts: str) -> None: + """MKCOL is idempotent — ignore 405 (already exists).""" + url = self._url(*parts) + async with session.request("MKCOL", url, headers=self._headers()) as resp: + if resp.status not in (200, 201, 405): + pass # best-effort; PUT will fail if directory is truly missing + + async def put(self, bucket: str, key: str, data: bytes) -> None: + async with aiohttp.ClientSession() as session: + await self._ensure_collection(session, bucket) + parts = key.split("/") + for i in range(1, len(parts)): + await self._ensure_collection(session, bucket, *parts[:i]) + url = self._url(bucket, key) + async with session.put(url, data=data, headers=self._headers()) as resp: + if resp.status not in (200, 201, 204): + raise OSError(f"WebDAV PUT {url} → {resp.status}") + + async def get(self, bucket: str, key: str) -> bytes: + async with aiohttp.ClientSession() as session: + url = self._url(bucket, key) + async with session.get(url, headers=self._headers()) as resp: + if resp.status == 404: + raise KeyError(f"{bucket}/{key}") + if resp.status != 200: + raise OSError(f"WebDAV GET {url} → {resp.status}") + return await resp.read() + + async def delete(self, bucket: str, key: str) -> None: + async with aiohttp.ClientSession() as session: + url = self._url(bucket, key) + async with session.delete(url, headers=self._headers()) as resp: + if resp.status not in (200, 204, 404): + raise OSError(f"WebDAV DELETE {url} → {resp.status}") + + async def list_keys(self, bucket: str) -> list[str]: + async with aiohttp.ClientSession() as session: + url = self._url(bucket) + headers = self._headers({"Depth": "infinity", "Content-Type": "application/xml"}) + body = '' + async with session.request("PROPFIND", url, headers=headers, data=body) as resp: + if resp.status == 404: + return [] + if resp.status != 207: + return [] + xml_body = await resp.text() + + ns = {"d": "DAV:"} + try: + root = ET.fromstring(xml_body) + except ET.ParseError: + return [] + + prefix = f"{self._base}{self._root}/{quote(bucket, safe='')}/" + keys: list[str] = [] + for response in root.findall("d:response", ns): + href = response.findtext("d:href", namespaces=ns) or "" + prop = response.find(".//d:prop", ns) + if prop is not None: + rt = prop.find("d:resourcetype", ns) + if rt is not None and rt.find("d:collection", ns) is not None: + continue # skip directories + if href.startswith(prefix): + keys.append(href[len(prefix):]) + return keys + + async def exists(self, bucket: str, key: str) -> bool: + async with aiohttp.ClientSession() as session: + url = self._url(bucket, key) + async with session.request("HEAD", url, headers=self._headers()) as resp: + return resp.status == 200 + + async def test_connection(self) -> None: + async with aiohttp.ClientSession() as session: + root_url = f"{self._base}{self._root}/" + headers = self._headers({"Depth": "0"}) + async with session.request("PROPFIND", root_url, headers=headers) as resp: + if resp.status not in (200, 207): + raise ConnectionError(f"WebDAV root PROPFIND → {resp.status}") diff --git a/features/storage-service/app/services/migration.py b/features/storage-service/app/services/migration.py new file mode 100644 index 0000000..93c7432 --- /dev/null +++ b/features/storage-service/app/services/migration.py @@ -0,0 +1,139 @@ +""" +Backend migration service. + +Flow: + 1. POST /migrate → validate new backend (test_connection) + 2. Background task enumerates all objects in all known buckets + 3. Each object is copied old → new, then verified + 4. Only after 100 % success: atomically switch active backend + 5. Delete all objects from old backend + 6. If any copy fails: old backend stays active; state = "failed" + 7. DELETE /migrate cancels a running migration (old backend stays active) +""" + +import logging +from dataclasses import dataclass, field +from typing import Literal + +from app.services.backends.base import AbstractStorageBackend +from app.services.backend_manager import get_backend, switch_backend + +logger = logging.getLogger(__name__) + +# All logical buckets the service knows about — enumerated during migration. +KNOWN_BUCKETS = ["documents", "config"] + +MigrationState = Literal[ + "idle", "validating", "migrating", "switching", "cleaning", "done", "failed", "cancelled" +] + + +@dataclass +class _MigrationStatus: + state: MigrationState = "idle" + total: int = 0 + done: int = 0 + failed: int = 0 + errors: list[str] = field(default_factory=list) + + +_status = _MigrationStatus() +_cancel_requested: bool = False + + +def get_status() -> dict: + return { + "state": _status.state, + "total": _status.total, + "done": _status.done, + "failed": _status.failed, + "errors": _status.errors[:50], # cap to avoid huge responses + } + + +def is_in_progress() -> bool: + return _status.state in ("validating", "migrating", "switching", "cleaning") + + +async def cancel() -> bool: + global _cancel_requested + if _status.state == "migrating": + _cancel_requested = True + return True + return False + + +async def run_migration(new_backend: AbstractStorageBackend) -> None: + """ + Background task: copy all objects to new_backend, verify, switch, clean old. + Called after the caller has already validated new_backend.test_connection(). + """ + global _cancel_requested + _cancel_requested = False + old_backend = get_backend() + + _status.state = "migrating" + _status.done = 0 + _status.failed = 0 + _status.errors.clear() + + try: + # Collect all objects across every known bucket + all_objects: list[tuple[str, str]] = [] + for bucket in KNOWN_BUCKETS: + try: + keys = await old_backend.list_keys(bucket) + for key in keys: + all_objects.append((bucket, key)) + except Exception as exc: + logger.warning("Could not list bucket %r: %s", bucket, exc) + + _status.total = len(all_objects) + logger.info("Migration: %d objects to migrate across %d buckets", len(all_objects), len(KNOWN_BUCKETS)) + + for bucket, key in all_objects: + if _cancel_requested: + _status.state = "cancelled" + logger.info("Migration cancelled (%d/%d done)", _status.done, _status.total) + return + + try: + data = await old_backend.get(bucket, key) + await new_backend.put(bucket, key, data) + if not await new_backend.exists(bucket, key): + raise OSError("Verification failed: object absent after PUT") + _status.done += 1 + except Exception as exc: + _status.failed += 1 + entry = f"{bucket}/{key}: {exc}" + _status.errors.append(entry) + logger.warning("Migration copy failed — %s", entry) + + if _status.failed > 0: + _status.state = "failed" + logger.error( + "Migration failed: %d/%d objects could not be copied; old backend remains active", + _status.failed, + _status.total, + ) + return + + # All objects verified — atomically switch + _status.state = "switching" + switch_backend(new_backend) + + # Remove all objects from old backend (best-effort) + _status.state = "cleaning" + for bucket, key in all_objects: + try: + await old_backend.delete(bucket, key) + except Exception as exc: + logger.warning("Cleanup failed for %s/%s: %s", bucket, key, exc) + + _status.state = "done" + logger.info("Migration complete: %d objects moved to %s", _status.total, new_backend.driver_name) + + except Exception as exc: + _status.state = "failed" + _status.errors.append(f"Unexpected error: {exc}") + logger.exception("Migration aborted with unexpected error") diff --git a/features/storage-service/pyproject.toml b/features/storage-service/pyproject.toml new file mode 100644 index 0000000..070a75b --- /dev/null +++ b/features/storage-service/pyproject.toml @@ -0,0 +1,25 @@ +[build-system] +requires = ["setuptools>=45"] +build-backend = "setuptools.build_meta" + +[project] +name = "storage-service" +version = "0.1.0" +requires-python = ">=3.11" +dependencies = [ + "fastapi>=0.111", + "uvicorn[standard]>=0.29", + "pydantic-settings>=2.2", + "aiofiles>=23.0", + "aiobotocore>=2.13", + "aiohttp>=3.9", + "defusedxml>=0.7", +] + +[project.optional-dependencies] +dev = [ + "ruff>=0.4", +] + +[tool.ruff] +line-length = 100 diff --git a/features/storage-service/scripts/start.sh b/features/storage-service/scripts/start.sh new file mode 100755 index 0000000..f450a96 --- /dev/null +++ b/features/storage-service/scripts/start.sh @@ -0,0 +1,5 @@ +#!/bin/sh +set -e + +echo "[storage-service] starting uvicorn..." +exec uvicorn app.main:app --host 0.0.0.0 --port 8020 diff --git a/features/storage-service/scripts/start_dev.sh b/features/storage-service/scripts/start_dev.sh new file mode 100755 index 0000000..e8323ee --- /dev/null +++ b/features/storage-service/scripts/start_dev.sh @@ -0,0 +1,5 @@ +#!/bin/sh +set -e + +echo "[storage-service] starting uvicorn (dev)..." +exec uvicorn app.main:app --host 0.0.0.0 --port 8020 --reload