diff --git a/backend/app/core/config.py b/backend/app/core/config.py index 9adb0e5..7502643 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -17,6 +17,7 @@ class Settings(BaseSettings): DOC_SERVICE_URL: str = "http://doc-service:8001" AI_SERVICE_URL: str = "http://ai-service:8010" + STORAGE_SERVICE_URL: str = "http://storage-service:8020" @field_validator("JWT_PRIVATE_KEY", "JWT_PUBLIC_KEY", mode="before") @classmethod diff --git a/backend/app/main.py b/backend/app/main.py index 1274ecb..fb1239d 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -19,6 +19,7 @@ async def lifespan(app: FastAPI): register_services( doc_service_url=settings.DOC_SERVICE_URL, ai_service_url=settings.AI_SERVICE_URL, + storage_service_url=settings.STORAGE_SERVICE_URL, ) # Create -admin groups for every registered service (idempotent) async with AsyncSessionLocal() as db: diff --git a/backend/app/services/service_health.py b/backend/app/services/service_health.py index f2eac01..73c8534 100644 --- a/backend/app/services/service_health.py +++ b/backend/app/services/service_health.py @@ -40,7 +40,7 @@ _health: dict[str, bool | None] = {} _manifests: dict[str, dict | None] = {} -def register_services(doc_service_url: str, ai_service_url: str) -> None: +def register_services(doc_service_url: str, ai_service_url: str, storage_service_url: str) -> None: """Called once during app startup to populate the registry from config.""" global _REGISTRY, _health, _manifests @@ -63,6 +63,15 @@ def register_services(doc_service_url: str, ai_service_url: str) -> None: app_path="", settings_path="/apps/ai/settings", ), + ServiceDefinition( + id="storage-service", + name="Storage", + description="Unified file storage. Manages all uploaded files with pluggable backends (local, S3, WebDAV).", + internal_url=storage_service_url, + health_path="/health", + app_path="", + settings_path="/admin/storage", + ), ] _health = {svc.id: None for svc in _REGISTRY} diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 93f75cb..c8cf529 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -29,6 +29,11 @@ services: volumes: - ./features/ai-service:/app + storage-service: + command: sh scripts/start_dev.sh + volumes: + - ./features/storage-service:/app + doc-service: command: sh scripts/start_dev.sh env_file: ./features/doc-service/.env diff --git a/docker-compose.yml b/docker-compose.yml index e3e3a5f..2ec7324 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -19,6 +19,27 @@ services: networks: - backend-net + # ── Storage service (unified blob storage) ────────────────────────────────── + storage-service: + build: + context: ./features/storage-service + dockerfile: Dockerfile + network: host + user: "1001:1001" + restart: unless-stopped + environment: + STORAGE_BACKEND: local + DATA_DIR: /data/storage + volumes: + - storage_data:/data/storage + healthcheck: + test: ["CMD-SHELL", "python -c \"import urllib.request; urllib.request.urlopen('http://localhost:8020/health')\""] + interval: 10s + timeout: 5s + retries: 5 + networks: + - backend-net + # ── Backend (management) ──────────────────────────────────────────────────── backend: build: @@ -32,11 +53,14 @@ services: DATABASE_URL: postgresql+asyncpg://${POSTGRES_USER:-postgres}:${POSTGRES_PASSWORD:-password}@db:5432/${POSTGRES_DB:-destroying_sap} DOC_SERVICE_URL: http://doc-service:8001 AI_SERVICE_URL: http://ai-service:8010 + STORAGE_SERVICE_URL: http://storage-service:8020 volumes: - app_config:/config depends_on: db: condition: service_healthy + storage-service: + condition: service_healthy networks: - backend-net @@ -68,6 +92,7 @@ services: DATA_DIR: /data/documents CONFIG_PATH: /config/doc_service_config.json AI_SERVICE_URL: http://ai-service:8010 + STORAGE_SERVICE_URL: http://storage-service:8020 volumes: - doc_data:/data/documents - watch_data:/data/watch @@ -77,6 +102,8 @@ services: condition: service_healthy ai-service: condition: service_started + storage-service: + condition: service_healthy networks: - backend-net @@ -98,9 +125,10 @@ services: volumes: postgres_data: - doc_data: # PDF files persisted across restarts + storage_data: # All file/blob storage — managed by storage-service + doc_data: # PDF files persisted across restarts (to be removed after Phase 2 migration) watch_data: # Watch directory — bind-mount your NAS/Nextcloud here via docker-compose.override.yml - app_config: # Per-service runtime config JSON files + app_config: # Per-service runtime config JSON files (to be removed after Phase 3 migration) networks: # backend-net: db ↔ backend ↔ doc-service. No host ports bound. diff --git a/features/storage-service/CLAUDE.md b/features/storage-service/CLAUDE.md new file mode 100644 index 0000000..ed24a86 --- /dev/null +++ b/features/storage-service/CLAUDE.md @@ -0,0 +1,115 @@ +# storage-service — Claude context + +Unified file/blob storage microservice, port 8020 (internal). All services must use this service's +HTTP API for any file persistence — no service may write to a Docker volume directly. See root +`CLAUDE.md` for architecture, Docker, and project-wide workflows. + +--- + +## Architecture rule (enforced) + +**No service may write to a filesystem path for persistent data.** +All file/blob storage must go through the storage-service HTTP API. +Violation is a security/architecture defect. + +--- + +## File & Folder Tree + +``` +features/storage-service/ +├── app/ +│ ├── main.py ← FastAPI, lifespan (backend init) +│ ├── core/config.py ← Settings (DATA_DIR, STORAGE_BACKEND, S3_*, WEBDAV_*) +│ ├── routers/ +│ │ ├── health.py ← GET /health +│ │ ├── objects.py ← PUT/GET/DELETE /objects/{bucket}/{key:path}, GET /objects/{bucket} +│ │ └── migrate.py ← POST /migrate, GET /migrate/status, DELETE /migrate, PATCH /backend-config +│ └── services/ +│ ├── backend_manager.py ← build_backend(), initialize_backend(), get_backend(), switch_backend() +│ ├── migration.py ← run_migration(), get_status(), cancel(); KNOWN_BUCKETS +│ └── backends/ +│ ├── base.py ← AbstractStorageBackend (ABC) +│ ├── local.py ← LocalFSBackend — /data/storage/{bucket}/{key} +│ ├── s3.py ← S3Backend — aiobotocore, endpoint_url configurable +│ └── webdav.py ← WebDAVBackend — aiohttp + WebDAV PROPFIND/PUT/GET/DELETE +├── scripts/ +│ ├── start.sh ← prod start (uvicorn port 8020) +│ └── start_dev.sh ← dev start (uvicorn --reload) +├── Dockerfile ← python:3.12-slim, non-root user 1001 +└── STATUS.md +``` + +--- + +## HTTP API + +### Objects + +| Method | Path | Body | Response | +|--------|------|------|----------| +| PUT | `/objects/{bucket}/{key:path}` | Raw bytes | 204 | +| GET | `/objects/{bucket}/{key:path}` | — | 200 Raw bytes / 404 | +| DELETE | `/objects/{bucket}/{key:path}` | — | 204 | +| GET | `/objects/{bucket}` | — | `{"bucket": "...", "keys": [...]}` | + +Keys may contain `/` (e.g. `user123/abc.pdf`). Path traversal (`..`) returns 400. + +### Migration + +| Method | Path | Body | Response | +|--------|------|------|----------| +| POST | `/migrate` | `{"driver": "s3", "config": {...}}` | 202 / 400 / 409 | +| GET | `/migrate/status` | — | `{state, total, done, failed, errors[]}` | +| DELETE | `/migrate` | — | 204 / 409 | +| PATCH | `/backend-config` | `{"driver": "...", "config": {...}}` | 204 / 400 / 409 | + +Migration states: `idle → validating → migrating → switching → cleaning → done` (or `failed`/`cancelled`) + +### Health + +| Method | Path | Response | +|--------|------|----------| +| GET | `/health` | `{"status": "ok", "backend": "local"}` | + +--- + +## Buckets + +| Bucket | Contents | Key format | +|--------|----------|------------| +| `documents` | Uploaded PDFs | `{user_id}/{doc_id}.pdf` or `watch/{doc_id}.pdf` | +| `config` | JSON config files | `{service_name}_config.json` | + +To add a new bucket: add it to `KNOWN_BUCKETS` in `services/migration.py` so it is included in migrations. + +--- + +## Backend drivers + +| Driver | Config fields | Notes | +|--------|---------------|-------| +| `local` | `data_dir` (optional) | Default. Files under `/data/storage/`. Zero external deps. | +| `s3` | `endpoint_url`, `access_key`, `secret_key`, `region` | Works with MinIO, AWS S3, Backblaze B2, Cloudflare R2. Set `endpoint_url=""` for real AWS. | +| `webdav` | `url`, `username`, `password`, `root_path` | Nextcloud: set root_path to `/remote.php/dav/files/{username}` | + +--- + +## Adding a new backend driver + +1. Create `app/services/backends/your_driver.py` implementing `AbstractStorageBackend` +2. Add a branch in `build_backend()` in `backend_manager.py` +3. Add config fields to `app/core/config.py` if env-based config is needed +4. Document driver name + config fields in this file + +--- + +## Default Values & Limits + +| Parameter | Value | Location | +|-----------|-------|----------| +| Default backend | `local` | `STORAGE_BACKEND` env var | +| Local data dir | `/data/storage` | `DATA_DIR` env var | +| S3 region default | `us-east-1` | `S3_REGION` env var | +| Migration error cap in response | 50 | `migration.py` | +| Port | 8020 | `scripts/start.sh` | diff --git a/features/storage-service/Dockerfile b/features/storage-service/Dockerfile new file mode 100644 index 0000000..e5eaec9 --- /dev/null +++ b/features/storage-service/Dockerfile @@ -0,0 +1,32 @@ +# ── Stage 1: dependency installation ───────────────────────────────────────── +FROM python:3.12-slim AS builder + +WORKDIR /app + +RUN pip install --upgrade pip + +COPY pyproject.toml . +RUN pip install --prefix=/install . + +# ── Stage 2: runtime ────────────────────────────────────────────────────────── +FROM python:3.12-slim + +# Create non-root user (UID/GID 1001) +RUN groupadd --gid 1001 appuser && \ + useradd --uid 1001 --gid 1001 --no-create-home --shell /bin/sh appuser + +# Pre-create data dir with correct ownership. +# Named volume mounted over this path will inherit ownership on first creation. +RUN mkdir -p /data/storage && chown -R appuser:appuser /data + +WORKDIR /app + +COPY --from=builder /install /usr/local +COPY --chown=appuser:appuser app ./app +COPY --chown=appuser:appuser scripts ./scripts + +USER appuser + +EXPOSE 8020 + +CMD ["sh", "scripts/start.sh"] diff --git a/features/storage-service/STATUS.md b/features/storage-service/STATUS.md new file mode 100644 index 0000000..906b71c --- /dev/null +++ b/features/storage-service/STATUS.md @@ -0,0 +1,83 @@ +# Storage Service — Status + +## What it is +Unified file/blob storage microservice, port 8020 (internal). All services store and retrieve files +through its HTTP API — no service writes to a Docker volume directly. Uses a pluggable backend +driver (local FS by default; S3-compatible and WebDAV available). Backend is switchable at runtime +via admin settings with automatic data migration. + +## Current functionality + +### Object API (`/objects`) + +| Method | Path | Description | +|--------|------|-------------| +| PUT | `/objects/{bucket}/{key}` | Upload raw bytes | +| GET | `/objects/{bucket}/{key}` | Download raw bytes | +| DELETE | `/objects/{bucket}/{key}` | Delete object | +| GET | `/objects/{bucket}` | List all keys in bucket | + +### Migration API (`/migrate`) + +| Method | Path | Description | +|--------|------|-------------| +| POST | `/migrate` | Start migration to a new backend (validates, copies, switches, cleans) | +| GET | `/migrate/status` | Poll migration progress | +| DELETE | `/migrate` | Cancel in-progress migration | +| PATCH | `/backend-config` | Reconfigure backend without migrating data | + +### Health + +| Method | Path | Description | +|--------|------|-------------| +| GET | `/health` | `{"status": "ok", "backend": ""}` | + +### Buckets + +| Bucket | Contents | +|--------|----------| +| `documents` | Uploaded PDFs (keyed as `{user_id}/{doc_id}.pdf` or `watch/{doc_id}.pdf`) | +| `config` | JSON config files (replaces `app_config` volume — Phase 3) | + +## Architecture + +``` +backend / doc-service / future-svc + │ HTTP + ▼ + storage-service:8020 + │ + backend_manager + │ + ┌──────┴──────────────────┐ + │ │ + LocalFSBackend S3Backend / WebDAVBackend + /data/storage/ (configured via admin UI) + {bucket}/{key} +``` + +Migration flow: +``` +POST /migrate { driver, config } + → test_connection() (validate) + → list all objects in KNOWN_BUCKETS (enumerate) + → GET old / PUT new / exists verify (copy + verify, per object) + → if 0 failures: switch_backend() (atomic switch) + → DELETE old objects (cleanup) +``` + +## Known limitations / not implemented + +- Migration state is in-memory — a container restart during migration loses progress (restart restarts from scratch) +- No presigned URL support (direct client downloads go through the API) +- rclone backends (Google Drive, OneDrive, Dropbox) not yet implemented +- No per-object metadata or content-type headers +- No multipart upload for very large files (> available RAM) + +## Future work + +- [ ] rclone-based backend adapter (GDrive, OneDrive, Dropbox) +- [ ] Presigned URL generation for direct browser downloads +- [ ] Persist migration state to DB so restarts can resume +- [ ] Streaming upload/download to avoid buffering entire file in memory +- [ ] Per-bucket access policies diff --git a/features/storage-service/app/__init__.py b/features/storage-service/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/features/storage-service/app/core/__init__.py b/features/storage-service/app/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/features/storage-service/app/core/config.py b/features/storage-service/app/core/config.py new file mode 100644 index 0000000..5fae515 --- /dev/null +++ b/features/storage-service/app/core/config.py @@ -0,0 +1,23 @@ +from pydantic_settings import BaseSettings, SettingsConfigDict + + +class Settings(BaseSettings): + model_config = SettingsConfigDict(env_file=".env", extra="ignore") + + DATA_DIR: str = "/data/storage" + STORAGE_BACKEND: str = "local" # local | s3 | webdav + + # S3-compatible (MinIO, AWS S3, Backblaze B2, Cloudflare R2, …) + S3_ENDPOINT_URL: str = "" # leave empty for real AWS S3 + S3_ACCESS_KEY: str = "" + S3_SECRET_KEY: str = "" + S3_REGION: str = "us-east-1" + + # WebDAV (Nextcloud, …) + WEBDAV_URL: str = "" + WEBDAV_USERNAME: str = "" + WEBDAV_PASSWORD: str = "" + WEBDAV_ROOT_PATH: str = "/" + + +settings = Settings() diff --git a/features/storage-service/app/main.py b/features/storage-service/app/main.py new file mode 100644 index 0000000..277088d --- /dev/null +++ b/features/storage-service/app/main.py @@ -0,0 +1,29 @@ +import logging +from contextlib import asynccontextmanager + +from fastapi import FastAPI + +from app.core.config import settings +from app.routers import health, objects, migrate +from app.services.backend_manager import initialize_backend + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", +) +logger = logging.getLogger(__name__) + + +@asynccontextmanager +async def lifespan(app: FastAPI): + initialize_backend() + logger.info("storage-service started (backend=%s)", settings.STORAGE_BACKEND) + yield + logger.info("storage-service shutting down") + + +app = FastAPI(title="Storage Service", lifespan=lifespan) + +app.include_router(health.router) +app.include_router(objects.router) +app.include_router(migrate.router) diff --git a/features/storage-service/app/routers/__init__.py b/features/storage-service/app/routers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/features/storage-service/app/routers/health.py b/features/storage-service/app/routers/health.py new file mode 100644 index 0000000..9cb782c --- /dev/null +++ b/features/storage-service/app/routers/health.py @@ -0,0 +1,10 @@ +from fastapi import APIRouter + +from app.services.backend_manager import get_backend + +router = APIRouter() + + +@router.get("/health") +async def health(): + return {"status": "ok", "backend": get_backend().driver_name} diff --git a/features/storage-service/app/routers/migrate.py b/features/storage-service/app/routers/migrate.py new file mode 100644 index 0000000..55a4f10 --- /dev/null +++ b/features/storage-service/app/routers/migrate.py @@ -0,0 +1,88 @@ +import logging + +from fastapi import APIRouter, BackgroundTasks, HTTPException +from pydantic import BaseModel + +from app.services import migration +from app.services.backend_manager import build_backend, switch_backend + +router = APIRouter() +logger = logging.getLogger(__name__) + + +class MigrateRequest(BaseModel): + driver: str + config: dict = {} + + +class BackendConfigRequest(BaseModel): + driver: str + config: dict = {} + + +@router.post("/migrate", status_code=202) +async def start_migration(body: MigrateRequest, background_tasks: BackgroundTasks): + """ + Validate the new backend, then start an async migration job that: + 1. Copies all objects from the current backend to the new one + 2. Verifies every object + 3. Atomically switches the active backend + 4. Deletes all objects from the old backend + + Returns 409 if a migration is already in progress. + Returns 400 if the new backend config fails validation. + """ + if migration.is_in_progress(): + raise HTTPException(status_code=409, detail="A migration is already in progress") + + # Reset status and enter validating state before any async work + migration._status.state = "validating" + migration._status.total = 0 + migration._status.done = 0 + migration._status.failed = 0 + migration._status.errors.clear() + + try: + new_backend = build_backend(body.driver, body.config) + await new_backend.test_connection() + except Exception as exc: + migration._status.state = "idle" + raise HTTPException(status_code=400, detail=f"Backend validation failed: {exc}") + + background_tasks.add_task(migration.run_migration, new_backend) + return {"status": "started", "driver": body.driver} + + +@router.get("/migrate/status") +async def migration_status(): + """Poll this to track migration progress.""" + return migration.get_status() + + +@router.delete("/migrate", status_code=204) +async def cancel_migration(): + """ + Request cancellation of a running migration. + The old backend remains active. Returns 409 if no migration is running. + """ + cancelled = await migration.cancel() + if not cancelled: + raise HTTPException(status_code=409, detail="No cancellable migration in progress") + + +@router.patch("/backend-config", status_code=204) +async def update_backend_config(body: BackendConfigRequest): + """ + Reconfigure the active backend without migrating data (e.g. update S3 credentials + for the same endpoint, or switch back to local after a failed migration). + + Use POST /migrate when you need data to be moved to the new backend. + """ + if migration.is_in_progress(): + raise HTTPException(status_code=409, detail="Cannot reconfigure while migration is in progress") + try: + new_backend = build_backend(body.driver, body.config) + await new_backend.test_connection() + except Exception as exc: + raise HTTPException(status_code=400, detail=f"Backend validation failed: {exc}") + switch_backend(new_backend) diff --git a/features/storage-service/app/routers/objects.py b/features/storage-service/app/routers/objects.py new file mode 100644 index 0000000..84a6777 --- /dev/null +++ b/features/storage-service/app/routers/objects.py @@ -0,0 +1,60 @@ +from fastapi import APIRouter, HTTPException, Request +from fastapi.responses import Response + +from app.services.backend_manager import get_backend + +router = APIRouter() + + +def _validate_key(key: str) -> str: + """Reject path traversal. Key may contain '/' for nested objects (e.g. user/doc.pdf).""" + parts = key.split("/") + if ".." in parts: + raise HTTPException(status_code=400, detail="Invalid key: path traversal not allowed") + return key + + +@router.put("/objects/{bucket}/{key:path}", status_code=204) +async def put_object(bucket: str, key: str, request: Request): + """Upload raw bytes. Body is read as-is (application/octet-stream).""" + _validate_key(key) + data = await request.body() + try: + await get_backend().put(bucket, key, data) + except ValueError as exc: + raise HTTPException(status_code=400, detail=str(exc)) + except Exception as exc: + raise HTTPException(status_code=500, detail=str(exc)) + + +@router.get("/objects/{bucket}/{key:path}") +async def get_object(bucket: str, key: str): + """Download raw bytes.""" + _validate_key(key) + try: + data = await get_backend().get(bucket, key) + except KeyError: + raise HTTPException(status_code=404, detail="Object not found") + except Exception as exc: + raise HTTPException(status_code=500, detail=str(exc)) + return Response(content=data, media_type="application/octet-stream") + + +@router.delete("/objects/{bucket}/{key:path}", status_code=204) +async def delete_object(bucket: str, key: str): + """Delete an object. No-op if it does not exist.""" + _validate_key(key) + try: + await get_backend().delete(bucket, key) + except Exception as exc: + raise HTTPException(status_code=500, detail=str(exc)) + + +@router.get("/objects/{bucket}") +async def list_objects(bucket: str): + """List all keys in a bucket.""" + try: + keys = await get_backend().list_keys(bucket) + except Exception as exc: + raise HTTPException(status_code=500, detail=str(exc)) + return {"bucket": bucket, "keys": keys} diff --git a/features/storage-service/app/services/__init__.py b/features/storage-service/app/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/features/storage-service/app/services/backend_manager.py b/features/storage-service/app/services/backend_manager.py new file mode 100644 index 0000000..95bd996 --- /dev/null +++ b/features/storage-service/app/services/backend_manager.py @@ -0,0 +1,70 @@ +import logging + +from app.core.config import settings +from app.services.backends.base import AbstractStorageBackend +from app.services.backends.local import LocalFSBackend +from app.services.backends.s3 import S3Backend +from app.services.backends.webdav import WebDAVBackend + +logger = logging.getLogger(__name__) + +_active_backend: AbstractStorageBackend | None = None + + +def build_backend(driver: str, config: dict) -> AbstractStorageBackend: + """Construct a backend instance from a driver name + config dict.""" + if driver == "local": + return LocalFSBackend(data_dir=config.get("data_dir", settings.DATA_DIR)) + if driver == "s3": + return S3Backend( + endpoint_url=config.get("endpoint_url", ""), + access_key=config.get("access_key", ""), + secret_key=config.get("secret_key", ""), + region=config.get("region", "us-east-1"), + ) + if driver == "webdav": + return WebDAVBackend( + url=config.get("url", ""), + username=config.get("username", ""), + password=config.get("password", ""), + root_path=config.get("root_path", "/"), + ) + raise ValueError(f"Unknown driver: {driver!r}. Valid options: local, s3, webdav") + + +def initialize_backend() -> None: + """Build the initial backend from environment variables at startup.""" + global _active_backend + driver = settings.STORAGE_BACKEND + config: dict = {} + if driver == "s3": + config = { + "endpoint_url": settings.S3_ENDPOINT_URL, + "access_key": settings.S3_ACCESS_KEY, + "secret_key": settings.S3_SECRET_KEY, + "region": settings.S3_REGION, + } + elif driver == "webdav": + config = { + "url": settings.WEBDAV_URL, + "username": settings.WEBDAV_USERNAME, + "password": settings.WEBDAV_PASSWORD, + "root_path": settings.WEBDAV_ROOT_PATH, + } + # local needs no extra config — DATA_DIR is read from settings inside build_backend + _active_backend = build_backend(driver, config) + logger.info("Storage backend initialized: %s", driver) + + +def get_backend() -> AbstractStorageBackend: + if _active_backend is None: + raise RuntimeError("Backend not initialized — call initialize_backend() at startup") + return _active_backend + + +def switch_backend(new_backend: AbstractStorageBackend) -> None: + """Replace the active backend. Called by the migration job after all data is verified.""" + global _active_backend + old_name = _active_backend.driver_name if _active_backend else "none" + _active_backend = new_backend + logger.info("Storage backend switched: %s → %s", old_name, new_backend.driver_name) diff --git a/features/storage-service/app/services/backends/__init__.py b/features/storage-service/app/services/backends/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/features/storage-service/app/services/backends/base.py b/features/storage-service/app/services/backends/base.py new file mode 100644 index 0000000..7defc1f --- /dev/null +++ b/features/storage-service/app/services/backends/base.py @@ -0,0 +1,34 @@ +from abc import ABC, abstractmethod + + +class AbstractStorageBackend(ABC): + """Common interface every storage backend must implement.""" + + @property + @abstractmethod + def driver_name(self) -> str: + """Short identifier returned in /health: 'local', 's3', or 'webdav'.""" + + @abstractmethod + async def put(self, bucket: str, key: str, data: bytes) -> None: + """Store *data* under bucket/key. Creates bucket/intermediate dirs as needed.""" + + @abstractmethod + async def get(self, bucket: str, key: str) -> bytes: + """Return the stored bytes. Raises KeyError if the object does not exist.""" + + @abstractmethod + async def delete(self, bucket: str, key: str) -> None: + """Delete the object. No-op if it does not exist.""" + + @abstractmethod + async def list_keys(self, bucket: str) -> list[str]: + """Return all keys stored in *bucket*. Returns [] if bucket is empty/absent.""" + + @abstractmethod + async def exists(self, bucket: str, key: str) -> bool: + """Return True if the object exists.""" + + @abstractmethod + async def test_connection(self) -> None: + """Verify the backend is reachable and writable. Raise on failure.""" diff --git a/features/storage-service/app/services/backends/local.py b/features/storage-service/app/services/backends/local.py new file mode 100644 index 0000000..bde237f --- /dev/null +++ b/features/storage-service/app/services/backends/local.py @@ -0,0 +1,67 @@ +import asyncio +from pathlib import Path + +import aiofiles + +from .base import AbstractStorageBackend + + +class LocalFSBackend(AbstractStorageBackend): + """Stores objects as files under //.""" + + def __init__(self, data_dir: str) -> None: + self._root = Path(data_dir) + self._root.mkdir(parents=True, exist_ok=True) + + @property + def driver_name(self) -> str: + return "local" + + def _resolve(self, bucket: str, key: str) -> Path: + safe_key = key.lstrip("/") + if ".." in safe_key.split("/"): + raise ValueError(f"Invalid key: {key!r}") + return self._root / bucket / safe_key + + async def put(self, bucket: str, key: str, data: bytes) -> None: + dest = self._resolve(bucket, key) + await asyncio.to_thread(dest.parent.mkdir, parents=True, exist_ok=True) + async with aiofiles.open(dest, "wb") as f: + await f.write(data) + + async def get(self, bucket: str, key: str) -> bytes: + path = self._resolve(bucket, key) + if not path.exists(): + raise KeyError(f"{bucket}/{key}") + async with aiofiles.open(path, "rb") as f: + return await f.read() + + async def delete(self, bucket: str, key: str) -> None: + path = self._resolve(bucket, key) + try: + await asyncio.to_thread(path.unlink, missing_ok=True) + except OSError: + pass + + async def list_keys(self, bucket: str) -> list[str]: + bucket_dir = self._root / bucket + if not bucket_dir.exists(): + return [] + + def _scan() -> list[str]: + return [ + str(p.relative_to(bucket_dir)) + for p in bucket_dir.rglob("*") + if p.is_file() + ] + + return await asyncio.to_thread(_scan) + + async def exists(self, bucket: str, key: str) -> bool: + return self._resolve(bucket, key).exists() + + async def test_connection(self) -> None: + self._root.mkdir(parents=True, exist_ok=True) + probe = self._root / ".health_probe" + probe.write_bytes(b"ok") + probe.unlink() diff --git a/features/storage-service/app/services/backends/s3.py b/features/storage-service/app/services/backends/s3.py new file mode 100644 index 0000000..e8d52f6 --- /dev/null +++ b/features/storage-service/app/services/backends/s3.py @@ -0,0 +1,99 @@ +import logging +from contextlib import asynccontextmanager + +from aiobotocore.session import get_session + +from .base import AbstractStorageBackend + +logger = logging.getLogger(__name__) + + +class S3Backend(AbstractStorageBackend): + """ + S3-compatible backend. Works with AWS S3, MinIO, Backblaze B2, Cloudflare R2, etc. + Set endpoint_url to the service URL for non-AWS providers; leave empty for real AWS. + """ + + def __init__( + self, + endpoint_url: str, + access_key: str, + secret_key: str, + region: str = "us-east-1", + ) -> None: + self._endpoint_url = endpoint_url or None + self._access_key = access_key + self._secret_key = secret_key + self._region = region + self._session = get_session() + + @property + def driver_name(self) -> str: + return "s3" + + @asynccontextmanager + async def _client(self): + async with self._session.create_client( + "s3", + endpoint_url=self._endpoint_url, + aws_access_key_id=self._access_key, + aws_secret_access_key=self._secret_key, + region_name=self._region, + ) as client: + yield client + + async def _ensure_bucket(self, client, bucket: str) -> None: + try: + await client.head_bucket(Bucket=bucket) + except Exception: + try: + if self._region == "us-east-1": + await client.create_bucket(Bucket=bucket) + else: + await client.create_bucket( + Bucket=bucket, + CreateBucketConfiguration={"LocationConstraint": self._region}, + ) + except Exception as exc: + logger.debug("Bucket create skipped (may already exist): %s", exc) + + async def put(self, bucket: str, key: str, data: bytes) -> None: + async with self._client() as client: + await self._ensure_bucket(client, bucket) + await client.put_object(Bucket=bucket, Key=key, Body=data) + + async def get(self, bucket: str, key: str) -> bytes: + async with self._client() as client: + try: + response = await client.get_object(Bucket=bucket, Key=key) + return await response["Body"].read() + except Exception as exc: + raise KeyError(f"{bucket}/{key}") from exc + + async def delete(self, bucket: str, key: str) -> None: + async with self._client() as client: + await client.delete_object(Bucket=bucket, Key=key) + + async def list_keys(self, bucket: str) -> list[str]: + async with self._client() as client: + try: + paginator = client.get_paginator("list_objects_v2") + keys: list[str] = [] + async for page in paginator.paginate(Bucket=bucket): + for obj in page.get("Contents", []): + keys.append(obj["Key"]) + return keys + except Exception: + return [] + + async def exists(self, bucket: str, key: str) -> bool: + async with self._client() as client: + try: + await client.head_object(Bucket=bucket, Key=key) + return True + except Exception: + return False + + async def test_connection(self) -> None: + async with self._client() as client: + await client.list_buckets() diff --git a/features/storage-service/app/services/backends/webdav.py b/features/storage-service/app/services/backends/webdav.py new file mode 100644 index 0000000..2c80c50 --- /dev/null +++ b/features/storage-service/app/services/backends/webdav.py @@ -0,0 +1,121 @@ +import base64 +from urllib.parse import quote + +import defusedxml.ElementTree as ET + +import aiohttp + +from .base import AbstractStorageBackend + + +class WebDAVBackend(AbstractStorageBackend): + """ + WebDAV backend. Compatible with Nextcloud and any standard WebDAV server. + root_path should be the WebDAV root on the server, e.g. '/remote.php/dav/files/username'. + """ + + def __init__( + self, + url: str, + username: str, + password: str, + root_path: str = "/", + ) -> None: + self._base = url.rstrip("/") + self._root = root_path.rstrip("/") + creds = base64.b64encode(f"{username}:{password}".encode()).decode() + self._auth = f"Basic {creds}" + + @property + def driver_name(self) -> str: + return "webdav" + + def _url(self, *parts: str) -> str: + encoded = "/".join(quote(p, safe="") for p in parts) + return f"{self._base}{self._root}/{encoded}" + + def _headers(self, extra: dict | None = None) -> dict[str, str]: + h = {"Authorization": self._auth} + if extra: + h.update(extra) + return h + + async def _ensure_collection(self, session: aiohttp.ClientSession, *parts: str) -> None: + """MKCOL is idempotent — ignore 405 (already exists).""" + url = self._url(*parts) + async with session.request("MKCOL", url, headers=self._headers()) as resp: + if resp.status not in (200, 201, 405): + pass # best-effort; PUT will fail if directory is truly missing + + async def put(self, bucket: str, key: str, data: bytes) -> None: + async with aiohttp.ClientSession() as session: + await self._ensure_collection(session, bucket) + parts = key.split("/") + for i in range(1, len(parts)): + await self._ensure_collection(session, bucket, *parts[:i]) + url = self._url(bucket, key) + async with session.put(url, data=data, headers=self._headers()) as resp: + if resp.status not in (200, 201, 204): + raise OSError(f"WebDAV PUT {url} → {resp.status}") + + async def get(self, bucket: str, key: str) -> bytes: + async with aiohttp.ClientSession() as session: + url = self._url(bucket, key) + async with session.get(url, headers=self._headers()) as resp: + if resp.status == 404: + raise KeyError(f"{bucket}/{key}") + if resp.status != 200: + raise OSError(f"WebDAV GET {url} → {resp.status}") + return await resp.read() + + async def delete(self, bucket: str, key: str) -> None: + async with aiohttp.ClientSession() as session: + url = self._url(bucket, key) + async with session.delete(url, headers=self._headers()) as resp: + if resp.status not in (200, 204, 404): + raise OSError(f"WebDAV DELETE {url} → {resp.status}") + + async def list_keys(self, bucket: str) -> list[str]: + async with aiohttp.ClientSession() as session: + url = self._url(bucket) + headers = self._headers({"Depth": "infinity", "Content-Type": "application/xml"}) + body = '' + async with session.request("PROPFIND", url, headers=headers, data=body) as resp: + if resp.status == 404: + return [] + if resp.status != 207: + return [] + xml_body = await resp.text() + + ns = {"d": "DAV:"} + try: + root = ET.fromstring(xml_body) + except ET.ParseError: + return [] + + prefix = f"{self._base}{self._root}/{quote(bucket, safe='')}/" + keys: list[str] = [] + for response in root.findall("d:response", ns): + href = response.findtext("d:href", namespaces=ns) or "" + prop = response.find(".//d:prop", ns) + if prop is not None: + rt = prop.find("d:resourcetype", ns) + if rt is not None and rt.find("d:collection", ns) is not None: + continue # skip directories + if href.startswith(prefix): + keys.append(href[len(prefix):]) + return keys + + async def exists(self, bucket: str, key: str) -> bool: + async with aiohttp.ClientSession() as session: + url = self._url(bucket, key) + async with session.request("HEAD", url, headers=self._headers()) as resp: + return resp.status == 200 + + async def test_connection(self) -> None: + async with aiohttp.ClientSession() as session: + root_url = f"{self._base}{self._root}/" + headers = self._headers({"Depth": "0"}) + async with session.request("PROPFIND", root_url, headers=headers) as resp: + if resp.status not in (200, 207): + raise ConnectionError(f"WebDAV root PROPFIND → {resp.status}") diff --git a/features/storage-service/app/services/migration.py b/features/storage-service/app/services/migration.py new file mode 100644 index 0000000..93c7432 --- /dev/null +++ b/features/storage-service/app/services/migration.py @@ -0,0 +1,139 @@ +""" +Backend migration service. + +Flow: + 1. POST /migrate → validate new backend (test_connection) + 2. Background task enumerates all objects in all known buckets + 3. Each object is copied old → new, then verified + 4. Only after 100 % success: atomically switch active backend + 5. Delete all objects from old backend + 6. If any copy fails: old backend stays active; state = "failed" + 7. DELETE /migrate cancels a running migration (old backend stays active) +""" + +import logging +from dataclasses import dataclass, field +from typing import Literal + +from app.services.backends.base import AbstractStorageBackend +from app.services.backend_manager import get_backend, switch_backend + +logger = logging.getLogger(__name__) + +# All logical buckets the service knows about — enumerated during migration. +KNOWN_BUCKETS = ["documents", "config"] + +MigrationState = Literal[ + "idle", "validating", "migrating", "switching", "cleaning", "done", "failed", "cancelled" +] + + +@dataclass +class _MigrationStatus: + state: MigrationState = "idle" + total: int = 0 + done: int = 0 + failed: int = 0 + errors: list[str] = field(default_factory=list) + + +_status = _MigrationStatus() +_cancel_requested: bool = False + + +def get_status() -> dict: + return { + "state": _status.state, + "total": _status.total, + "done": _status.done, + "failed": _status.failed, + "errors": _status.errors[:50], # cap to avoid huge responses + } + + +def is_in_progress() -> bool: + return _status.state in ("validating", "migrating", "switching", "cleaning") + + +async def cancel() -> bool: + global _cancel_requested + if _status.state == "migrating": + _cancel_requested = True + return True + return False + + +async def run_migration(new_backend: AbstractStorageBackend) -> None: + """ + Background task: copy all objects to new_backend, verify, switch, clean old. + Called after the caller has already validated new_backend.test_connection(). + """ + global _cancel_requested + _cancel_requested = False + old_backend = get_backend() + + _status.state = "migrating" + _status.done = 0 + _status.failed = 0 + _status.errors.clear() + + try: + # Collect all objects across every known bucket + all_objects: list[tuple[str, str]] = [] + for bucket in KNOWN_BUCKETS: + try: + keys = await old_backend.list_keys(bucket) + for key in keys: + all_objects.append((bucket, key)) + except Exception as exc: + logger.warning("Could not list bucket %r: %s", bucket, exc) + + _status.total = len(all_objects) + logger.info("Migration: %d objects to migrate across %d buckets", len(all_objects), len(KNOWN_BUCKETS)) + + for bucket, key in all_objects: + if _cancel_requested: + _status.state = "cancelled" + logger.info("Migration cancelled (%d/%d done)", _status.done, _status.total) + return + + try: + data = await old_backend.get(bucket, key) + await new_backend.put(bucket, key, data) + if not await new_backend.exists(bucket, key): + raise OSError("Verification failed: object absent after PUT") + _status.done += 1 + except Exception as exc: + _status.failed += 1 + entry = f"{bucket}/{key}: {exc}" + _status.errors.append(entry) + logger.warning("Migration copy failed — %s", entry) + + if _status.failed > 0: + _status.state = "failed" + logger.error( + "Migration failed: %d/%d objects could not be copied; old backend remains active", + _status.failed, + _status.total, + ) + return + + # All objects verified — atomically switch + _status.state = "switching" + switch_backend(new_backend) + + # Remove all objects from old backend (best-effort) + _status.state = "cleaning" + for bucket, key in all_objects: + try: + await old_backend.delete(bucket, key) + except Exception as exc: + logger.warning("Cleanup failed for %s/%s: %s", bucket, key, exc) + + _status.state = "done" + logger.info("Migration complete: %d objects moved to %s", _status.total, new_backend.driver_name) + + except Exception as exc: + _status.state = "failed" + _status.errors.append(f"Unexpected error: {exc}") + logger.exception("Migration aborted with unexpected error") diff --git a/features/storage-service/pyproject.toml b/features/storage-service/pyproject.toml new file mode 100644 index 0000000..070a75b --- /dev/null +++ b/features/storage-service/pyproject.toml @@ -0,0 +1,25 @@ +[build-system] +requires = ["setuptools>=45"] +build-backend = "setuptools.build_meta" + +[project] +name = "storage-service" +version = "0.1.0" +requires-python = ">=3.11" +dependencies = [ + "fastapi>=0.111", + "uvicorn[standard]>=0.29", + "pydantic-settings>=2.2", + "aiofiles>=23.0", + "aiobotocore>=2.13", + "aiohttp>=3.9", + "defusedxml>=0.7", +] + +[project.optional-dependencies] +dev = [ + "ruff>=0.4", +] + +[tool.ruff] +line-length = 100 diff --git a/features/storage-service/scripts/start.sh b/features/storage-service/scripts/start.sh new file mode 100755 index 0000000..f450a96 --- /dev/null +++ b/features/storage-service/scripts/start.sh @@ -0,0 +1,5 @@ +#!/bin/sh +set -e + +echo "[storage-service] starting uvicorn..." +exec uvicorn app.main:app --host 0.0.0.0 --port 8020 diff --git a/features/storage-service/scripts/start_dev.sh b/features/storage-service/scripts/start_dev.sh new file mode 100755 index 0000000..e8323ee --- /dev/null +++ b/features/storage-service/scripts/start_dev.sh @@ -0,0 +1,5 @@ +#!/bin/sh +set -e + +echo "[storage-service] starting uvicorn (dev)..." +exec uvicorn app.main:app --host 0.0.0.0 --port 8020 --reload