diff --git a/CLAUDE.md b/CLAUDE.md index 0345fc0..2501c82 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -6,17 +6,19 @@ This file provides permanent, authoritative guidance to Claude Code for every se - `frontend/CLAUDE.md` — routes, components, API client patterns, XSS prevention - `features/ai-service/CLAUDE.md` — /chat, /health, /queue endpoints; queue service - `features/doc-service/CLAUDE.md` — document/category/share endpoints; DB models; PDF limits; file watcher +- `features/storage-service/CLAUDE.md` — storage API, pluggable backend drivers (local/S3/WebDAV), migration --- ## Merge checklist -Before merging any feature branch into `main`, every test relevant to the changed area in `tests/ALL_TESTS.md` (and the relevant service-specific file) must be marked passing. The test suite covers all 19 feature areas across four service files: +Before merging any feature branch into `main`, every test relevant to the changed area in `tests/ALL_TESTS.md` (and the relevant service-specific file) must be marked passing. The test suite covers all 20 feature areas across five service files: - `tests/backend_tests.md` — §1–9, §18 - `tests/frontend_tests.md` — §19 - `tests/doc-service_tests.md` — §10–16 - `tests/ai-service_tests.md` — §17 +- `tests/storage-service_tests.md` — §20 Do not merge without it. @@ -35,7 +37,7 @@ Do not merge without it. - New Docker service, volume, network, or env var → update **Docker Infrastructure** in this file - Stack version changed → update **Stack** in this file -- New feature or endpoint added → add test rows to **both** `tests/ALL_TESTS.md` (in the relevant section) **and** the matching service-specific file (`tests/backend_tests.md`, `tests/frontend_tests.md`, `tests/doc-service_tests.md`, or `tests/ai-service_tests.md`). Use the same test number and format as existing rows. +- New feature or endpoint added → add test rows to **both** `tests/ALL_TESTS.md` (in the relevant section) **and** the matching service-specific file (`tests/backend_tests.md`, `tests/frontend_tests.md`, `tests/doc-service_tests.md`, `tests/ai-service_tests.md`, or `tests/storage-service_tests.md`). Use the same test number and format as existing rows. This check is mandatory — treat it the same as updating STATUS.md. @@ -143,7 +145,17 @@ These standards are **non-negotiable**. Every change must comply. Implementation - `backend-net`: all containers except frontend; not reachable from host in prod. - `frontend-net`: only frontend; single host port (80 prod / 5173 dev). -- DB, backend, doc-service, ai-service have **no** host port bindings in prod. +- DB, backend, doc-service, ai-service, storage-service have **no** host port bindings in prod. + +### Storage rule (non-negotiable) + +**No service may write to a filesystem path for persistent data.** All file/blob storage must go through the storage-service HTTP API (`PUT/GET/DELETE /objects/{bucket}/{key}`). Config JSON files must be stored in the `config` bucket. Uploaded files must be stored in the `documents` bucket. Violation is a security and architecture defect. + +The only two persistent storage mechanisms in the project are: +1. **PostgreSQL** — structured/relational data +2. **storage-service** — all file/blob/config data (local filesystem by default; switchable to S3-compatible or WebDAV) + +New services and features must follow this pattern. See `features/storage-service/CLAUDE.md` for the API reference. ### Pre-commit security hook @@ -179,9 +191,10 @@ All other per-service defaults are in the relevant sub-CLAUDE.md file. | Service | Image base | Internal port | User | Volumes | Network | |---------|-----------|---------------|------|---------|---------| | `db` | postgres:16-alpine | 5432 | 70:70 | `postgres_data` | backend-net | -| `backend` | python:3.12-slim | 8000 | 1001:1001 | `app_config` | backend-net | -| `ai-service` | python:3.12-slim | 8010 | 1001:1001 | `app_config` | backend-net | -| `doc-service` | python:3.12-slim | 8001 | 1001:1001 | `doc_data`, `watch_data`, `app_config` | backend-net | +| `backend` | python:3.12-slim | 8000 | 1001:1001 | — | backend-net | +| `ai-service` | python:3.12-slim | 8010 | 1001:1001 | — | backend-net | +| `doc-service` | python:3.12-slim | 8001 | 1001:1001 | `watch_data` | backend-net | +| `storage-service` | python:3.12-slim | 8020 | 1001:1001 | `storage_data` | backend-net | | `frontend` | nginx-unprivileged:alpine | 8080 | 1001:1001 | — | backend-net, frontend-net | ### Volumes @@ -189,15 +202,14 @@ All other per-service defaults are in the relevant sub-CLAUDE.md file. | Volume | Mount path | Contains | |--------|-----------|---------| | `postgres_data` | `/var/lib/postgresql/data` | PostgreSQL data | -| `doc_data` | `/data/documents` | Uploaded PDF files | +| `storage_data` | `/data/storage` | All file/blob storage: PDFs (`documents/`) and config JSONs (`config/`) | | `watch_data` | `/data/watch` | Watch directory (bind-mount NAS/Nextcloud via docker-compose.override.yml) | -| `app_config` | `/config` | Per-service runtime config JSON files | ### Networks | Network | Host-accessible | Members | |---------|----------------|---------| -| `backend-net` | No (no host ports in prod) | db, backend, ai-service, doc-service, frontend | +| `backend-net` | No (no host ports in prod) | db, backend, ai-service, doc-service, storage-service, frontend | | `frontend-net` | Yes (port 80 → frontend:8080) | frontend | ### Environment variables (required in `backend/.env`) @@ -213,6 +225,7 @@ Injected by docker-compose (not in `.env`): ``` DOC_SERVICE_URL=http://doc-service:8001 AI_SERVICE_URL=http://ai-service:8010 +STORAGE_SERVICE_URL=http://storage-service:8020 ``` --- @@ -316,23 +329,18 @@ git checkout -b feat/ # e.g. feat/user-profile-avatar-uploa ``` #### 2 — Spin up an isolated Docker stack for the feature -A dedicated compose stack runs alongside the main dev stack so both can be tested independently. +The feature stack always uses port `5173` (same as the main dev stack). Stop the main stack before starting a feature stack, and restart it when done. -**Find the next free port** (main dev stack owns 5173): +**Stop the main dev stack first:** ```bash -for port in $(seq 5174 5200); do - lsof -iTCP:$port -sTCP:LISTEN -t &>/dev/null || { echo "$port"; break; } -done +docker compose -f docker-compose.yml -f docker-compose.dev.yml down ``` -Use the first free port returned (call it `$PORT`). **Create a per-feature override file** at `docker-compose.feat-.yml` (gitignored): ```yaml # docker-compose.feat-.yml — feature test stack, never committed to main services: frontend: - ports: - - "$PORT:8080" # e.g. 5174:8080 container_name: frontend- backend: container_name: backend- @@ -358,8 +366,7 @@ docker compose -f docker-compose.yml \ --project-name up --build ``` -The feature frontend is now reachable at `http://localhost:$PORT`. -The main dev stack continues running unaffected on `:5173`. +The feature frontend is now reachable at `http://localhost:5173`. #### 3 — Develop on the feature branch All code changes happen on `feat/`. Commit and push normally: @@ -370,7 +377,7 @@ git push -u origin feat/ ``` #### 4 — Confirm functionality -Before merging, verify all of the following on `http://localhost:$PORT`: +Before merging, verify all of the following on `http://localhost:5173`: - [ ] Login and registration work end-to-end - [ ] The specific feature works as intended - [ ] No regressions visible in the UI @@ -387,13 +394,16 @@ git branch -d feat/ git push origin --delete feat/ ``` -#### 6 — Tear down the feature stack +#### 6 — Tear down the feature stack and restart main dev stack ```bash docker compose -f docker-compose.yml \ -f docker-compose.dev.yml \ -f docker-compose.feat-.yml \ --project-name down --volumes --remove-orphans rm docker-compose.feat-.yml + +# Restart the main dev stack on :5173 +docker compose -f docker-compose.yml -f docker-compose.dev.yml up --build -d ``` --- diff --git a/backend/CLAUDE.md b/backend/CLAUDE.md index 49ba78a..0da5456 100644 --- a/backend/CLAUDE.md +++ b/backend/CLAUDE.md @@ -36,7 +36,8 @@ backend/ │ │ ├── config.py ← All settings via pydantic-settings (reads .env) │ │ ├── security.py ← JWT sign/verify (RS256), bcrypt hash/verify │ │ ├── sanitize.py ← Input sanitization helpers (see Security Standards) -│ │ └── app_config.py ← Per-service config load/save to /config volume; theme files in /config/themes/ +│ │ ├── app_config.py ← Per-service config load/save via storage-service; theme files in config/themes/{id}.json +│ │ └── config_storage.py ← Thin async HTTP helpers: read_json/write_json/delete_key/list_keys → storage-service config bucket │ ├── models/ │ │ ├── __init__.py ← Imports all models (required for Alembic autogenerate) │ │ ├── user.py ← User model @@ -56,7 +57,8 @@ backend/ │ │ ├── services.py ← GET /services (health status) │ │ ├── plugins.py ← Generic plugin proxy (GET/PATCH /api/plugins/*) │ │ ├── categories_proxy.py ← Transparent proxy → doc-service /categories/* -│ │ └── documents_proxy.py ← Transparent proxy → doc-service /documents/* +│ │ ├── documents_proxy.py ← Transparent proxy → doc-service /documents/* +│ │ └── storage_config.py ← Admin proxy → storage-service config + migration endpoints │ └── services/ │ ├── service_health.py ← Background 30s health-check loop; caches /plugin/manifest per service │ └── group_bootstrap.py ← Ensures {service-id}-admin group exists for every registered service at startup @@ -216,6 +218,16 @@ Unique constraint: `(group_id, user_id)` Auth: is_superuser OR member of group listed in manifest `required_groups`. Returns 404 (not 403) to hide existence. +### Admin — Storage (`/api/admin`) — admin-only + +| Method | Path | Description | +|--------|------|-------------| +| GET | `/api/admin/storage-config` | Current backend driver + health → proxied from storage-service `/health` | +| PATCH | `/api/admin/storage-config` | Reconfigure backend without data migration (same-backend credential update) | +| POST | `/api/admin/storage-config/migrate` | Start async migration to a new backend (copy → verify → switch → cleanup) | +| GET | `/api/admin/storage-config/migrate/status` | Poll migration progress: `{state, total, done, failed, errors[]}` | +| DELETE | `/api/admin/storage-config/migrate` | Cancel a running migration; old backend remains active | + ### Documents and Categories — proxied `/api/documents/*` and `/api/documents/categories/*` are transparently proxied to `doc-service:8001`. The backend injects `x-user-id`, `x-user-groups`, and `x-user-is-admin` headers. See `features/doc-service/CLAUDE.md` for the internal endpoint list. diff --git a/backend/STATUS.md b/backend/STATUS.md index 963a870..808e3e3 100644 --- a/backend/STATUS.md +++ b/backend/STATUS.md @@ -75,10 +75,20 @@ A background task (`service_health.py`) polls each service's `/health` endpoint | `GET` | `/api/settings/system-prompts` | All editable system prompts — superuser OR `ai-service-admin` member | | `PATCH` | `/api/settings/system-prompts/{id}` | Update system prompt — same access | -Settings are persisted to JSON files on the `app_config` Docker named volume and read by the respective feature services. +Settings are persisted to the `config` bucket of `storage-service:8020` via `core/config_storage.py`. All config I/O is async HTTP; no filesystem volumes are used. Access to service-specific settings endpoints is enforced by `get_service_admin(service_id)` in `deps.py` — grants access to superusers OR members of the `{service_id}-admin` group. +### Storage config (`/api/admin`) + +| Method | Path | Description | +|--------|------|-------------| +| `GET` | `/api/admin/storage-config` | Current backend driver + health (proxied from storage-service) | +| `PATCH` | `/api/admin/storage-config` | Reconfigure backend without migration | +| `POST` | `/api/admin/storage-config/migrate` | Start async migration to a new backend | +| `GET` | `/api/admin/storage-config/migrate/status` | Poll migration progress | +| `DELETE` | `/api/admin/storage-config/migrate` | Cancel running migration | + ### Feature proxies All `/api/documents/*` and `/api/documents/categories/*` requests are transparently proxied to `doc-service:8001` via `httpx.AsyncClient`. The proxy: @@ -129,8 +139,11 @@ Browser (port 5173 dev / 80 prod) ┌───────────┼────────────┬──────────────┐ /auth /settings /documents/* /services /users (JSON │ │ - /admin volume) └── proxy → health-check loop - /profile doc-service:8001 (30s poll) + /admin /storage- └── proxy → health-check loop + /profile config doc-service:8001 (30s poll) + (proxy) + │ + storage-service:8020 ``` --- diff --git a/backend/app/core/app_config.py b/backend/app/core/app_config.py index bd7b462..1b6faa5 100644 --- a/backend/app/core/app_config.py +++ b/backend/app/core/app_config.py @@ -1,21 +1,25 @@ """ Per-service runtime config helpers. -Config files live on the shared `app_config` Docker volume at /config/. -Each service has its own JSON file. +All config files are stored in the 'config' bucket of the storage-service. +Every function is async — callers must await them. -Atomic write pattern: write to .tmp in same dir, then os.replace() so -services never read a partial file. +Key layout in the config bucket: + ai_service_config.json + doc_service_config.json + appearance_config.json + themes/{id}.json """ import copy -import json -import os +import logging import re -from pathlib import Path +from copy import deepcopy from pydantic import BaseModel -_CONFIG_DIR = Path(os.environ.get("APP_CONFIG_DIR", "/config")) +from app.core import config_storage + +logger = logging.getLogger(__name__) # ── AI service config schemas ────────────────────────────────────────────────── @@ -108,59 +112,50 @@ def _mask_ai_config(data: dict) -> dict: # ── Load / Save ──────────────────────────────────────────────────────────────── -def _config_path(service: str) -> Path: - return _CONFIG_DIR / f"{service}_config.json" - - -def load_service_config(service: str) -> dict: - path = _config_path(service) - if not path.exists(): +async def load_service_config(service: str) -> dict: + data = await config_storage.read_json(f"{service}_config.json") + if data is None: if service == "ai_service": return AIServiceConfig().model_dump() if service == "doc_service": return DocServiceConfig().model_dump() return {} - with path.open() as f: - return json.load(f) + return data -def save_service_config(service: str, data: dict) -> None: - path = _config_path(service) - path.parent.mkdir(parents=True, exist_ok=True) - tmp = path.with_suffix(".tmp") - tmp.write_text(json.dumps(data, indent=2)) - os.replace(tmp, path) +async def save_service_config(service: str, data: dict) -> None: + await config_storage.write_json(f"{service}_config.json", data) # AI service helpers -def load_ai_service_config() -> AIServiceConfig: - raw = load_service_config("ai_service") +async def load_ai_service_config() -> AIServiceConfig: + raw = await load_service_config("ai_service") return AIServiceConfig.model_validate(raw) -def save_ai_service_config(config: AIServiceConfig) -> None: - save_service_config("ai_service", config.model_dump()) +async def save_ai_service_config(config: AIServiceConfig) -> None: + await save_service_config("ai_service", config.model_dump()) -def load_ai_service_config_masked() -> dict: - raw = load_service_config("ai_service") +async def load_ai_service_config_masked() -> dict: + raw = await load_service_config("ai_service") return _mask_ai_config(raw) # Doc service helpers -def load_doc_service_config() -> DocServiceConfig: - raw = load_service_config("doc_service") +async def load_doc_service_config() -> DocServiceConfig: + raw = await load_service_config("doc_service") return DocServiceConfig.model_validate(raw) -def save_doc_service_config(config: DocServiceConfig) -> None: - save_service_config("doc_service", config.model_dump()) +async def save_doc_service_config(config: DocServiceConfig) -> None: + await save_service_config("doc_service", config.model_dump()) -def load_doc_service_config_masked() -> dict: - return load_service_config("doc_service") +async def load_doc_service_config_masked() -> dict: + return await load_service_config("doc_service") def _merge_api_key(new_key: str, existing_key: str) -> str: @@ -172,18 +167,16 @@ def _merge_api_key(new_key: str, existing_key: str) -> str: # ── System prompts helpers ───────────────────────────────────────────────────── -# Registry of all services that have editable system prompts. -# key = service identifier, value = human-readable label SYSTEM_PROMPT_SERVICES: dict[str, str] = { "doc_service": "Document Service", } -def load_all_system_prompts() -> dict: +async def load_all_system_prompts() -> dict: """Return {service_id: {label, system, user_template, default_system, default_user_template}}.""" result: dict = {} for service_id, label in SYSTEM_PROMPT_SERVICES.items(): - config = load_service_config(service_id) + config = await load_service_config(service_id) prompts = config.get("system_prompts", {}) defaults = _get_service_prompt_defaults(service_id) result[service_id] = { @@ -196,15 +189,14 @@ def load_all_system_prompts() -> dict: return result -def save_service_system_prompts(service_id: str, system: str, user_template: str) -> None: - """Persist updated system prompts into the service's config file.""" +async def save_service_system_prompts(service_id: str, system: str, user_template: str) -> None: if service_id not in SYSTEM_PROMPT_SERVICES: raise ValueError(f"Unknown service: {service_id!r}") - config = load_service_config(service_id) + config = await load_service_config(service_id) config.setdefault("system_prompts", {}) config["system_prompts"]["system"] = system config["system_prompts"]["user_template"] = user_template - save_service_config(service_id, config) + await save_service_config(service_id, config) def _get_service_prompt_defaults(service_id: str) -> dict: @@ -221,26 +213,19 @@ class AppearanceConfig(BaseModel): default_mode: str = "system" -def load_appearance_config() -> AppearanceConfig: - path = _CONFIG_DIR / "appearance_config.json" - if not path.exists(): +async def load_appearance_config() -> AppearanceConfig: + data = await config_storage.read_json("appearance_config.json") + if data is None: return AppearanceConfig() - with path.open() as f: - return AppearanceConfig.model_validate(json.load(f)) + return AppearanceConfig.model_validate(data) -def save_appearance_config(config: AppearanceConfig) -> None: - path = _CONFIG_DIR / "appearance_config.json" - path.parent.mkdir(parents=True, exist_ok=True) - tmp = path.with_suffix(".tmp") - tmp.write_text(json.dumps(config.model_dump(), indent=2)) - os.replace(tmp, path) +async def save_appearance_config(config: AppearanceConfig) -> None: + await config_storage.write_json("appearance_config.json", config.model_dump()) # ── Theme file management ────────────────────────────────────────────────────── -_THEMES_DIR = _CONFIG_DIR / "themes" - # 9 required colour tokens per mode _REQUIRED_TOKENS = frozenset({ "primary", "primary_hover", "accent", "accent_hover", @@ -361,36 +346,57 @@ _BUILTIN_THEMES: list[dict] = [ ] -def seed_builtin_themes() -> None: - """Create /config/themes/ and write built-in theme files if missing.""" - _THEMES_DIR.mkdir(parents=True, exist_ok=True) +async def seed_builtin_themes() -> None: + """Write built-in theme files to storage-service if they are not already there.""" + existing_keys = await config_storage.list_keys(prefix="themes/") + existing_ids = {k.removeprefix("themes/").removesuffix(".json") for k in existing_keys} for theme in _BUILTIN_THEMES: - path = _THEMES_DIR / f"{theme['id']}.json" - if not path.exists(): - path.write_text(json.dumps(theme, indent=2)) + if theme["id"] not in existing_ids: + await config_storage.write_json(f"themes/{theme['id']}.json", theme) + logger.info("Built-in themes seeded (%d themes)", len(_BUILTIN_THEMES)) -def load_all_themes() -> list[dict]: - """Return all themes from /config/themes/*.json, built-ins first.""" - if not _THEMES_DIR.exists(): - seed_builtin_themes() - themes = [] - for f in sorted(_THEMES_DIR.glob("*.json")): - try: - themes.append(json.loads(f.read_text())) - except (json.JSONDecodeError, OSError): - pass - # Sort: built-ins first (preserving their original order), then custom by label +async def load_all_themes() -> list[dict]: + """Return all themes from storage-service, built-ins first then custom by label.""" + keys = await config_storage.list_keys(prefix="themes/") + themes: list[dict] = [] + for key in keys: + data = await config_storage.read_json(key) + if data: + themes.append(data) + builtin_ids = [t["id"] for t in _BUILTIN_THEMES] + def sort_key(t: dict) -> tuple: tid = t.get("id", "") try: return (0, builtin_ids.index(tid)) except ValueError: return (1, t.get("label", tid).lower()) + return sorted(themes, key=sort_key) +async def load_theme_by_id(theme_id: str) -> dict | None: + """Return a single theme dict, or None if not found.""" + return await config_storage.read_json(f"themes/{theme_id}.json") + + +async def save_theme(theme: dict) -> None: + """Write a theme to storage-service.""" + await config_storage.write_json(f"themes/{theme['id']}.json", theme) + + +async def delete_theme(theme_id: str) -> None: + """Delete a custom theme. Raises ValueError for built-ins, KeyError if not found.""" + data = await config_storage.read_json(f"themes/{theme_id}.json") + if data is None: + raise FileNotFoundError(theme_id) + if data.get("builtin"): + raise ValueError("Cannot delete a built-in theme") + await config_storage.delete_key(f"themes/{theme_id}.json") + + def validate_theme_tokens(colors: dict) -> list[str]: """Return a list of validation error messages, empty if valid.""" errors = [] @@ -401,23 +407,3 @@ def validate_theme_tokens(colors: dict) -> list[str]: if key in _REQUIRED_TOKENS and not _RGB_RE.match(str(val)): errors.append(f"Token '{key}' must be an RGB triplet like '37 99 235', got: {val!r}") return errors - - -def save_theme(theme: dict) -> None: - """Write a theme file atomically.""" - _THEMES_DIR.mkdir(parents=True, exist_ok=True) - path = _THEMES_DIR / f"{theme['id']}.json" - tmp = path.with_suffix(".tmp") - tmp.write_text(json.dumps(theme, indent=2)) - os.replace(tmp, path) - - -def delete_theme(theme_id: str) -> None: - """Delete a custom theme file. Raises ValueError for built-ins, FileNotFoundError if missing.""" - path = _THEMES_DIR / f"{theme_id}.json" - if not path.exists(): - raise FileNotFoundError(theme_id) - data = json.loads(path.read_text()) - if data.get("builtin"): - raise ValueError("Cannot delete a built-in theme") - path.unlink() diff --git a/backend/app/core/config.py b/backend/app/core/config.py index 9adb0e5..7502643 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -17,6 +17,7 @@ class Settings(BaseSettings): DOC_SERVICE_URL: str = "http://doc-service:8001" AI_SERVICE_URL: str = "http://ai-service:8010" + STORAGE_SERVICE_URL: str = "http://storage-service:8020" @field_validator("JWT_PRIVATE_KEY", "JWT_PUBLIC_KEY", mode="before") @classmethod diff --git a/backend/app/core/config_storage.py b/backend/app/core/config_storage.py new file mode 100644 index 0000000..8a9b8e2 --- /dev/null +++ b/backend/app/core/config_storage.py @@ -0,0 +1,63 @@ +""" +Async HTTP client for the 'config' bucket in storage-service. + +All JSON config files (AI settings, doc settings, appearance, themes, …) are stored +in the 'config' bucket under the storage-service. This module provides thin +async helpers so app_config.py does not depend on the filesystem at all. +""" +import json +import logging + +import httpx + +from app.core.config import settings + +logger = logging.getLogger(__name__) + +_BUCKET = "config" +_TIMEOUT = 10.0 + + +def _url(key: str) -> str: + return f"{settings.STORAGE_SERVICE_URL}/objects/{_BUCKET}/{key}" + + +async def read_json(key: str) -> dict | None: + """Return parsed JSON from the config bucket, or None if the key does not exist.""" + async with httpx.AsyncClient(timeout=_TIMEOUT) as client: + resp = await client.get(_url(key)) + if resp.status_code == 404: + return None + resp.raise_for_status() + return resp.json() + + +async def write_json(key: str, data: dict) -> None: + """Serialise *data* to JSON and PUT it into the config bucket.""" + payload = json.dumps(data, indent=2).encode() + async with httpx.AsyncClient(timeout=_TIMEOUT) as client: + resp = await client.put( + _url(key), + content=payload, + headers={"Content-Type": "application/octet-stream"}, + ) + resp.raise_for_status() + + +async def delete_key(key: str) -> None: + """Delete a key from the config bucket. No-op if it does not exist.""" + async with httpx.AsyncClient(timeout=_TIMEOUT) as client: + resp = await client.delete(_url(key)) + if resp.status_code not in (204, 404): + resp.raise_for_status() + + +async def list_keys(prefix: str = "") -> list[str]: + """List all keys in the config bucket, optionally filtered by *prefix*.""" + async with httpx.AsyncClient(timeout=_TIMEOUT) as client: + resp = await client.get(f"{settings.STORAGE_SERVICE_URL}/objects/{_BUCKET}") + resp.raise_for_status() + keys: list[str] = resp.json().get("keys", []) + if prefix: + keys = [k for k in keys if k.startswith(prefix)] + return keys diff --git a/backend/app/main.py b/backend/app/main.py index 1274ecb..95e99dd 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -9,16 +9,18 @@ from app.core.config import settings from app.database import AsyncSessionLocal from app.routers import admin, auth, categories_proxy, documents_proxy, groups, plugins, profile, services, users from app.routers import settings as settings_router +from app.routers import storage_config from app.services.group_bootstrap import ensure_service_admin_groups from app.services.service_health import check_all, health_check_loop, register_services @asynccontextmanager async def lifespan(app: FastAPI): - await asyncio.to_thread(seed_builtin_themes) + await seed_builtin_themes() register_services( doc_service_url=settings.DOC_SERVICE_URL, ai_service_url=settings.AI_SERVICE_URL, + storage_service_url=settings.STORAGE_SERVICE_URL, ) # Create -admin groups for every registered service (idempotent) async with AsyncSessionLocal() as db: @@ -51,6 +53,7 @@ app.include_router(admin.router, prefix="/api/admin", tags=["admin"]) app.include_router(groups.router, prefix="/api/admin/groups", tags=["admin"]) app.include_router(settings_router.router, prefix="/api/settings", tags=["settings"]) app.include_router(services.router, prefix="/api/services", tags=["services"]) +app.include_router(storage_config.router, prefix="/api/admin", tags=["admin"]) app.include_router(plugins.router, prefix="/api/plugins", tags=["plugins"]) # categories_proxy MUST be registered before documents_proxy — # otherwise /api/documents/{path:path} swallows /api/documents/categories/* diff --git a/backend/app/routers/settings.py b/backend/app/routers/settings.py index b8b6bdc..069fe08 100644 --- a/backend/app/routers/settings.py +++ b/backend/app/routers/settings.py @@ -2,10 +2,9 @@ Admin-only settings API for per-service runtime configuration. All endpoints require the caller to be an admin (Depends(get_current_admin)). -Config files live on the shared app_config volume (/config/). +Config files are stored in the 'config' bucket of storage-service. """ -import asyncio -import json +import re as _re import httpx from fastapi import APIRouter, Depends, HTTPException @@ -21,10 +20,11 @@ from app.core.app_config import ( load_all_system_prompts, load_all_themes, load_appearance_config, - save_appearance_config, load_doc_service_config, load_doc_service_config_masked, + load_theme_by_id, save_ai_service_config, + save_appearance_config, save_doc_service_config, save_service_system_prompts, save_theme, @@ -36,6 +36,8 @@ from app.models.user import User router = APIRouter() +_THEME_ID_RE = _re.compile(r"^[a-z0-9_-]{1,64}$") + # ── Pydantic request bodies ──────────────────────────────────────────────────── @@ -98,7 +100,7 @@ class ThemeUpdate(BaseModel): async def get_ai_settings( _: User = Depends(get_service_admin("ai-service")), ) -> dict: - return load_ai_service_config_masked() + return await load_ai_service_config_masked() @router.patch("/ai") @@ -110,7 +112,7 @@ async def update_ai_settings( if body.provider not in valid_providers: raise HTTPException(status_code=422, detail=f"provider must be one of {valid_providers}") - config = load_ai_service_config() + config = await load_ai_service_config() config.provider = body.provider # Anthropic @@ -139,8 +141,8 @@ async def update_ai_settings( body.lmstudio_api_key, config.lmstudio.api_key ) - await asyncio.to_thread(save_ai_service_config, config) - return load_ai_service_config_masked() + await save_ai_service_config(config) + return await load_ai_service_config_masked() @router.post("/ai/test") @@ -173,7 +175,7 @@ async def test_ai_connection( async def get_documents_limits( _: User = Depends(get_service_admin("doc-service")), ) -> dict: - return load_doc_service_config_masked() + return await load_doc_service_config_masked() @router.patch("/documents/limits") @@ -184,10 +186,10 @@ async def update_documents_limits( if body.max_pdf_mb < 1 or body.max_pdf_mb > 200: raise HTTPException(status_code=422, detail="max_pdf_mb must be between 1 and 200") - config = load_doc_service_config() + config = await load_doc_service_config() config.documents.max_pdf_bytes = body.max_pdf_mb * 1024 * 1024 - await asyncio.to_thread(save_doc_service_config, config) - return load_doc_service_config_masked() + await save_doc_service_config(config) + return await load_doc_service_config_masked() # ── System prompts ───────────────────────────────────────────────────────────── @@ -197,8 +199,7 @@ async def update_documents_limits( async def get_system_prompts( _: User = Depends(get_service_admin("ai-service")), ) -> dict: - """Return all editable system prompts, keyed by service id.""" - return await asyncio.to_thread(load_all_system_prompts) + return await load_all_system_prompts() @router.patch("/system-prompts/{service_id}") @@ -207,26 +208,20 @@ async def update_system_prompt( body: SystemPromptUpdate, _: User = Depends(get_service_admin("ai-service")), ) -> dict: - """Update the system prompts for a single service.""" if service_id not in SYSTEM_PROMPT_SERVICES: raise HTTPException(status_code=404, detail=f"No system prompts registered for {service_id!r}") - await asyncio.to_thread( - save_service_system_prompts, service_id, body.system, body.user_template - ) - return await asyncio.to_thread(load_all_system_prompts) + await save_service_system_prompts(service_id, body.system, body.user_template) + return await load_all_system_prompts() # ── Appearance (global default — auth read, admin write) ─────────────────────── -import re as _re -_THEME_ID_RE = _re.compile(r"^[a-z0-9_-]{1,64}$") - @router.get("/appearance") async def get_appearance( _: User = Depends(get_current_user), ) -> dict: - config = await asyncio.to_thread(load_appearance_config) + config = await load_appearance_config() return config.model_dump() @@ -237,12 +232,12 @@ async def update_appearance( ) -> dict: if body.default_mode not in ("light", "dark", "system"): raise HTTPException(status_code=422, detail="default_mode must be 'light', 'dark', or 'system'") - themes = await asyncio.to_thread(load_all_themes) + themes = await load_all_themes() theme_ids = {t["id"] for t in themes} if body.theme not in theme_ids: raise HTTPException(status_code=422, detail=f"Unknown theme: {body.theme!r}") config = AppearanceConfig(theme=body.theme, default_mode=body.default_mode) - await asyncio.to_thread(save_appearance_config, config) + await save_appearance_config(config) return config.model_dump() @@ -253,7 +248,7 @@ async def update_appearance( async def list_themes( _: User = Depends(get_current_user), ) -> list: - return await asyncio.to_thread(load_all_themes) + return await load_all_themes() @router.post("/themes", status_code=201) @@ -263,7 +258,7 @@ async def create_theme( ) -> dict: if not _THEME_ID_RE.match(body.id): raise HTTPException(status_code=422, detail="Theme ID must match [a-z0-9_-]{1,64}") - existing = {t["id"] for t in await asyncio.to_thread(load_all_themes)} + existing = {t["id"] for t in await load_all_themes()} if body.id in existing: raise HTTPException(status_code=400, detail=f"Theme ID already in use: {body.id!r}") light = body.light.model_dump() @@ -273,7 +268,7 @@ async def create_theme( if errors: raise HTTPException(status_code=422, detail=f"{mode}: {'; '.join(errors)}") theme = {"id": body.id, "label": body.label, "builtin": False, "light": light, "dark": dark} - await asyncio.to_thread(save_theme, theme) + await save_theme(theme) return theme @@ -283,11 +278,9 @@ async def update_theme( body: ThemeUpdate, _: User = Depends(get_current_admin), ) -> dict: - from app.core.app_config import _THEMES_DIR - path = _THEMES_DIR / f"{theme_id}.json" - if not path.exists(): + theme = await load_theme_by_id(theme_id) + if theme is None: raise HTTPException(status_code=404, detail="Theme not found") - theme = json.loads(path.read_text()) if theme.get("builtin"): raise HTTPException(status_code=400, detail="Cannot edit a built-in theme") if body.label is not None: @@ -304,7 +297,7 @@ async def update_theme( if errors: raise HTTPException(status_code=422, detail=f"dark: {'; '.join(errors)}") theme["dark"] = dark - await asyncio.to_thread(save_theme, theme) + await save_theme(theme) return theme @@ -314,7 +307,7 @@ async def remove_theme( _: User = Depends(get_current_admin), ) -> None: try: - await asyncio.to_thread(delete_theme, theme_id) + await delete_theme(theme_id) except FileNotFoundError: raise HTTPException(status_code=404, detail="Theme not found") except ValueError as exc: diff --git a/backend/app/routers/storage_config.py b/backend/app/routers/storage_config.py new file mode 100644 index 0000000..bfd1728 --- /dev/null +++ b/backend/app/routers/storage_config.py @@ -0,0 +1,126 @@ +""" +Admin-only endpoints for storage-service backend configuration. + +GET /admin/storage-config — current backend driver + health +PATCH /admin/storage-config — update backend config (no data migration) +POST /admin/storage-config/migrate — start migration to a new backend +GET /admin/storage-config/migrate/status — poll migration progress +DELETE /admin/storage-config/migrate — cancel in-progress migration + +All endpoints proxy to storage-service:8020. +""" +import logging + +import httpx +from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel + +from app.core.config import settings +from app.deps import get_current_admin +from app.models.user import User + +router = APIRouter() +logger = logging.getLogger(__name__) + +_STORAGE_BASE = settings.STORAGE_SERVICE_URL + + +class BackendConfigUpdate(BaseModel): + driver: str + config: dict = {} + + +class MigrateRequest(BaseModel): + driver: str + config: dict = {} + + +def _storage_url(path: str) -> str: + return f"{_STORAGE_BASE}{path}" + + +async def _proxy_get(path: str) -> dict: + async with httpx.AsyncClient(timeout=15.0) as client: + resp = await client.get(_storage_url(path)) + if resp.status_code == 404: + raise HTTPException(status_code=404, detail="Not found") + resp.raise_for_status() + return resp.json() + + +@router.get("/storage-config") +async def get_storage_config( + _: User = Depends(get_current_admin), +) -> dict: + """Return current backend driver and health status.""" + async with httpx.AsyncClient(timeout=10.0) as client: + resp = await client.get(_storage_url("/health")) + resp.raise_for_status() + return resp.json() + + +@router.patch("/storage-config", status_code=204) +async def update_storage_config( + body: BackendConfigUpdate, + _: User = Depends(get_current_admin), +) -> None: + """ + Reconfigure the active backend without migrating data. + Use when changing credentials for the same backend type, or reverting to local. + To move data to a new backend, use POST /admin/storage-config/migrate instead. + """ + async with httpx.AsyncClient(timeout=15.0) as client: + resp = await client.patch( + _storage_url("/backend-config"), + json={"driver": body.driver, "config": body.config}, + ) + if resp.status_code == 400: + raise HTTPException(status_code=400, detail=resp.json().get("detail", "Validation failed")) + if resp.status_code == 409: + raise HTTPException(status_code=409, detail="Migration in progress — cannot reconfigure now") + resp.raise_for_status() + + +@router.post("/storage-config/migrate", status_code=202) +async def start_migration( + body: MigrateRequest, + _: User = Depends(get_current_admin), +) -> dict: + """ + Start an async migration to a new backend. + + Flow: validate new backend → copy all objects → verify → switch → delete old objects. + The old backend stays active until 100% of objects are verified on the new one. + Poll GET /admin/storage-config/migrate/status to track progress. + """ + async with httpx.AsyncClient(timeout=30.0) as client: + resp = await client.post( + _storage_url("/migrate"), + json={"driver": body.driver, "config": body.config}, + ) + if resp.status_code == 400: + raise HTTPException(status_code=400, detail=resp.json().get("detail", "Validation failed")) + if resp.status_code == 409: + raise HTTPException(status_code=409, detail="A migration is already in progress") + resp.raise_for_status() + return resp.json() + + +@router.get("/storage-config/migrate/status") +async def migration_status( + _: User = Depends(get_current_admin), +) -> dict: + """Poll migration progress. State: idle → validating → migrating → switching → cleaning → done.""" + return await _proxy_get("/migrate/status") + + +@router.delete("/storage-config/migrate", status_code=204) +async def cancel_migration( + _: User = Depends(get_current_admin), +) -> None: + """Cancel a running migration. The old backend remains active.""" + async with httpx.AsyncClient(timeout=10.0) as client: + resp = await client.delete(_storage_url("/migrate")) + if resp.status_code == 409: + raise HTTPException(status_code=409, detail="No cancellable migration in progress") + resp.raise_for_status() diff --git a/backend/app/services/service_health.py b/backend/app/services/service_health.py index f2eac01..73c8534 100644 --- a/backend/app/services/service_health.py +++ b/backend/app/services/service_health.py @@ -40,7 +40,7 @@ _health: dict[str, bool | None] = {} _manifests: dict[str, dict | None] = {} -def register_services(doc_service_url: str, ai_service_url: str) -> None: +def register_services(doc_service_url: str, ai_service_url: str, storage_service_url: str) -> None: """Called once during app startup to populate the registry from config.""" global _REGISTRY, _health, _manifests @@ -63,6 +63,15 @@ def register_services(doc_service_url: str, ai_service_url: str) -> None: app_path="", settings_path="/apps/ai/settings", ), + ServiceDefinition( + id="storage-service", + name="Storage", + description="Unified file storage. Manages all uploaded files with pluggable backends (local, S3, WebDAV).", + internal_url=storage_service_url, + health_path="/health", + app_path="", + settings_path="/admin/storage", + ), ] _health = {svc.id: None for svc in _REGISTRY} diff --git a/changelog/2026-04-20_storage-service.md b/changelog/2026-04-20_storage-service.md new file mode 100644 index 0000000..e57c501 --- /dev/null +++ b/changelog/2026-04-20_storage-service.md @@ -0,0 +1,60 @@ +# 2026-04-20 — Dedicated storage-service with pluggable backends + +**Timestamp:** 2026-04-20T00:00:00Z + +## Summary + +Introduced a dedicated `storage-service` container (port 8020) as the single file/blob persistence layer for the entire stack. All services now route file and config I/O through this service's HTTP API. The service supports pluggable storage backends (local filesystem by default; S3-compatible and WebDAV built in) with a zero-data-loss migration flow. The `doc_data` and `app_config` Docker volumes were removed. + +## Files Added + +- `features/storage-service/app/main.py` — FastAPI app, lifespan (backend init) +- `features/storage-service/app/core/config.py` — Settings (DATA_DIR, STORAGE_BACKEND, S3_*, WEBDAV_*) +- `features/storage-service/app/routers/health.py` — GET /health +- `features/storage-service/app/routers/objects.py` — PUT/GET/DELETE /objects/{bucket}/{key:path}, GET /objects/{bucket} +- `features/storage-service/app/routers/migrate.py` — POST/GET/DELETE /migrate, PATCH /backend-config +- `features/storage-service/app/services/backend_manager.py` — Driver factory, singleton, atomic switch +- `features/storage-service/app/services/migration.py` — Async migration: copy → verify → switch → cleanup +- `features/storage-service/app/services/backends/base.py` — AbstractStorageBackend ABC +- `features/storage-service/app/services/backends/local.py` — LocalFSBackend (path traversal guard) +- `features/storage-service/app/services/backends/s3.py` — S3Backend (aiobotocore, endpoint_url configurable) +- `features/storage-service/app/services/backends/webdav.py` — WebDAVBackend (aiohttp + defusedxml) +- `features/storage-service/scripts/start.sh` — prod uvicorn start +- `features/storage-service/scripts/start_dev.sh` — dev uvicorn --reload start +- `features/storage-service/pyproject.toml` — Dependencies +- `features/storage-service/Dockerfile` — python:3.12-slim, non-root user 1001, port 8020 +- `features/storage-service/CLAUDE.md` — API reference, bucket docs, driver docs +- `features/storage-service/STATUS.md` — Service status +- `backend/app/core/config_storage.py` — Thin async helpers: read_json/write_json/delete_key/list_keys +- `backend/app/routers/storage_config.py` — Admin proxy endpoints for storage config + migration +- `features/doc-service/alembic/versions/0008_rename_file_path_to_storage_key.py` — DB migration +- `frontend/src/pages/StorageAdminPage.tsx` — Admin UI: backend status, driver form, migration progress +- `tests/storage-service_tests.md` — §20 storage-service test suite + +## Files Modified + +- `docker-compose.yml` — Added storage-service, storage_data volume; removed doc_data, app_config; added depends_on service_healthy +- `docker-compose.dev.yml` — Added storage-service dev override +- `backend/app/core/config.py` — Added STORAGE_SERVICE_URL +- `backend/app/core/app_config.py` — Full async rewrite using config_storage HTTP helpers (no filesystem) +- `backend/app/routers/settings.py` — Removed all asyncio.to_thread wrappers; direct await calls +- `backend/app/main.py` — Register storage_config router; update register_services call +- `backend/app/services/service_health.py` — Register storage-service +- `features/doc-service/app/core/config.py` — Added STORAGE_SERVICE_URL +- `features/doc-service/app/models/document.py` — file_path → storage_key +- `features/doc-service/app/services/storage.py` — Complete rewrite: HTTP client calls to storage-service +- `features/doc-service/app/services/config_reader.py` — Complete rewrite: reads/writes via storage-service config bucket +- `features/doc-service/app/services/file_watcher.py` — Uses save_upload() → storage-service +- `features/doc-service/app/routers/documents.py` — storage_key refs, pdfplumber(io.BytesIO), streaming from storage-service +- `features/ai-service/app/core/config.py` — Added STORAGE_SERVICE_URL; removed CONFIG_PATH +- `features/ai-service/app/services/config_reader.py` — Complete rewrite: reads/writes via storage-service config bucket +- `frontend/src/api/client.ts` — Added StorageStatus, MigrationStatus, StorageBackendConfig interfaces + 5 API functions +- `frontend/src/App.tsx` — Added /admin/storage route (AdminRoute → StorageAdminPage) +- `tests/ALL_TESTS.md` — Updated to 20 feature areas; added §20 storage-service tests +- `CLAUDE.md` — Added storage-service to Services/Volumes/Networks tables; storage enforcement rule; §20 test file +- `backend/CLAUDE.md` — Added config_storage.py, storage_config.py to tree; added admin storage endpoints +- `frontend/CLAUDE.md` — Added StorageAdminPage to tree; added /admin/storage route +- `features/doc-service/CLAUDE.md` — Updated storage.py description; file_path → storage_key; added migration 0008 +- `features/ai-service/CLAUDE.md` — Added config_reader.py description +- `backend/STATUS.md` — Added storage-config endpoints; updated settings persistence note +- `frontend/STATUS.md` — Added /admin/storage route; added StorageAdminPage description diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 93f75cb..c8cf529 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -29,6 +29,11 @@ services: volumes: - ./features/ai-service:/app + storage-service: + command: sh scripts/start_dev.sh + volumes: + - ./features/storage-service:/app + doc-service: command: sh scripts/start_dev.sh env_file: ./features/doc-service/.env diff --git a/docker-compose.yml b/docker-compose.yml index e3e3a5f..dec204b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -19,6 +19,27 @@ services: networks: - backend-net + # ── Storage service (unified blob storage) ────────────────────────────────── + storage-service: + build: + context: ./features/storage-service + dockerfile: Dockerfile + network: host + user: "1001:1001" + restart: unless-stopped + environment: + STORAGE_BACKEND: local + DATA_DIR: /data/storage + volumes: + - storage_data:/data/storage + healthcheck: + test: ["CMD-SHELL", "python -c \"import urllib.request; urllib.request.urlopen('http://localhost:8020/health')\""] + interval: 10s + timeout: 5s + retries: 5 + networks: + - backend-net + # ── Backend (management) ──────────────────────────────────────────────────── backend: build: @@ -32,11 +53,12 @@ services: DATABASE_URL: postgresql+asyncpg://${POSTGRES_USER:-postgres}:${POSTGRES_PASSWORD:-password}@db:5432/${POSTGRES_DB:-destroying_sap} DOC_SERVICE_URL: http://doc-service:8001 AI_SERVICE_URL: http://ai-service:8010 - volumes: - - app_config:/config + STORAGE_SERVICE_URL: http://storage-service:8020 depends_on: db: condition: service_healthy + storage-service: + condition: service_healthy networks: - backend-net @@ -49,9 +71,10 @@ services: user: "1001:1001" restart: unless-stopped environment: - CONFIG_PATH: /config/ai_service_config.json - volumes: - - app_config:/config + STORAGE_SERVICE_URL: http://storage-service:8020 + depends_on: + storage-service: + condition: service_healthy networks: - backend-net @@ -65,18 +88,17 @@ services: restart: unless-stopped environment: DATABASE_URL: postgresql+asyncpg://${POSTGRES_USER:-postgres}:${POSTGRES_PASSWORD:-password}@db:5432/${POSTGRES_DB:-destroying_sap} - DATA_DIR: /data/documents - CONFIG_PATH: /config/doc_service_config.json AI_SERVICE_URL: http://ai-service:8010 + STORAGE_SERVICE_URL: http://storage-service:8020 volumes: - - doc_data:/data/documents - watch_data:/data/watch - - app_config:/config depends_on: db: condition: service_healthy ai-service: condition: service_started + storage-service: + condition: service_healthy networks: - backend-net @@ -98,9 +120,8 @@ services: volumes: postgres_data: - doc_data: # PDF files persisted across restarts + storage_data: # All file/blob storage — managed by storage-service (documents + config) watch_data: # Watch directory — bind-mount your NAS/Nextcloud here via docker-compose.override.yml - app_config: # Per-service runtime config JSON files networks: # backend-net: db ↔ backend ↔ doc-service. No host ports bound. diff --git a/features/ai-service/CLAUDE.md b/features/ai-service/CLAUDE.md index 75582ed..26734cf 100644 --- a/features/ai-service/CLAUDE.md +++ b/features/ai-service/CLAUDE.md @@ -22,7 +22,8 @@ features/ai-service/ │ │ ├── queue.py ← GET /queue/status, /pause, /resume, /cancel/{id} │ │ └── plugin.py ← GET /plugin/manifest (access rules for ai-service-admin group) │ └── services/ -│ └── queue.py ← Priority queue (CRITICAL > HIGH > NORMAL) +│ ├── queue.py ← Priority queue (CRITICAL > HIGH > NORMAL) +│ └── config_reader.py ← Reads ai_service_config.json from storage-service config bucket (30 s TTL cache) ├── Dockerfile ← python:3.12-slim, non-root user 1001 └── STATUS.md ``` diff --git a/features/ai-service/app/core/config.py b/features/ai-service/app/core/config.py index bbeb4d0..7f1f52a 100644 --- a/features/ai-service/app/core/config.py +++ b/features/ai-service/app/core/config.py @@ -3,7 +3,7 @@ from pydantic_settings import BaseSettings class Settings(BaseSettings): PROJECT_NAME: str = "ai-service" - CONFIG_PATH: str = "/config/ai_service_config.json" + STORAGE_SERVICE_URL: str = "http://storage-service:8020" model_config = {"env_file": ".env", "extra": "ignore"} diff --git a/features/ai-service/app/services/config_reader.py b/features/ai-service/app/services/config_reader.py index 4caad3a..c5bed16 100644 --- a/features/ai-service/app/services/config_reader.py +++ b/features/ai-service/app/services/config_reader.py @@ -1,5 +1,5 @@ """ -Reads ai_service_config.json from the shared config volume. +Reads ai_service_config.json from the storage-service config bucket. 30-second TTL cache + env var overrides (dev credentials stay out of git). Env var overrides (all optional): @@ -8,15 +8,17 @@ Env var overrides (all optional): OLLAMA_BASE_URL, OLLAMA_MODEL, OLLAMA_API_KEY ANTHROPIC_API_KEY, ANTHROPIC_MODEL """ -import asyncio import json import os import time from copy import deepcopy -from pathlib import Path + +import httpx from app.core.config import settings +_CONFIG_KEY = "ai_service_config.json" + _DEFAULT_CONFIG: dict = { "provider": "lmstudio", "timeout_seconds": 60, @@ -31,12 +33,18 @@ _cache_at: float = 0.0 _CACHE_TTL = 30.0 -def _read_config_sync() -> dict: - path = Path(settings.CONFIG_PATH) - if not path.exists(): - return _apply_env_overrides(deepcopy(_DEFAULT_CONFIG)) - with open(path) as f: - return _apply_env_overrides(json.load(f)) +def _storage_url() -> str: + return f"{settings.STORAGE_SERVICE_URL}/objects/config/{_CONFIG_KEY}" + + +async def _fetch_config() -> dict: + """Fetch config from storage-service. Returns defaults if not found.""" + async with httpx.AsyncClient(timeout=10.0) as client: + resp = await client.get(_storage_url()) + if resp.status_code == 404: + return deepcopy(_DEFAULT_CONFIG) + resp.raise_for_status() + return resp.json() def _apply_env_overrides(config: dict) -> dict: @@ -75,7 +83,8 @@ async def load_ai_config() -> dict: now = time.monotonic() if _cache is not None and (now - _cache_at) < _CACHE_TTL: return _cache - data = await asyncio.to_thread(_read_config_sync) + raw = await _fetch_config() + data = _apply_env_overrides(raw) _cache = data _cache_at = now return data diff --git a/features/doc-service/CLAUDE.md b/features/doc-service/CLAUDE.md index 3dc60a3..d8d82a5 100644 --- a/features/doc-service/CLAUDE.md +++ b/features/doc-service/CLAUDE.md @@ -1,6 +1,6 @@ # doc-service — Claude context -PDF extraction microservice, port 8001 (internal). Shares the same PostgreSQL instance as the backend. Receives proxied requests from `backend:8000`, which injects `x-user-id` and `x-user-groups` headers — doc-service trusts these headers directly. Calls `ai-service:8010` for document classification. See root `CLAUDE.md` for architecture, Docker, and project-wide workflows. +PDF extraction microservice, port 8001 (internal). Shares the same PostgreSQL instance as the backend. Receives proxied requests from `backend:8000`, which injects `x-user-id` and `x-user-groups` headers — doc-service trusts these headers directly. Calls `ai-service:8010` for document classification. All file/blob storage goes through `storage-service:8020` — no files are written directly to the filesystem. See root `CLAUDE.md` for architecture, Docker, and project-wide workflows. --- @@ -38,13 +38,14 @@ features/doc-service/ │ │ ├── categories.py ← Category CRUD (includes watch-owned categories) │ │ └── plugin.py ← GET /plugin/manifest, GET+PATCH /plugin/settings │ └── services/ -│ ├── storage.py ← File I/O +│ ├── storage.py ← Storage client: save_upload/download_file/delete_file → storage-service:8020 documents bucket │ ├── ai_client.py ← classify_document() → ai-service:8010/chat -│ ├── config_reader.py ← Config load/save including storage/watch settings +│ ├── config_reader.py ← Config load/save via storage-service config bucket (doc_service_config.json) │ └── file_watcher.py ← watchdog-based PDF watcher + startup scan + ingestion ├── alembic/versions/ ← Migration chain │ ├── 0003_add_watch_columns.py ← source, watch_path, suggested_folder, suggested_filename -│ └── 0004_add_document_shares.py ← document_shares table (group-based sharing) +│ ├── 0004_add_document_shares.py ← document_shares table (group-based sharing) +│ └── 0008_rename_file_path_to_storage_key.py ← file_path → storage_key; strips /data/documents/ prefix from existing rows ├── Dockerfile ← python:3.12-slim, non-root user 1001 └── STATUS.md ``` @@ -60,7 +61,7 @@ features/doc-service/ | `id` | String | PK, UUID | | | `user_id` | String | indexed | not FK — trusts x-user-id header | | `filename` | String | NOT NULL | | -| `file_path` | String | NOT NULL | absolute path under /data/documents | +| `storage_key` | String | NOT NULL | storage-service key: `{user_id}/{doc_id}.pdf` (documents bucket) | | `file_size` | Integer | NOT NULL | bytes | | `status` | String | default="pending" | pending / processing / done / failed | | `title` | String(500) | nullable | AI-extracted | @@ -118,6 +119,7 @@ Unique constraint: `(document_id, group_id)` | `0005` | `add_share_can_delete` | | `0006` | `add_category_scope` | | `0007` | `capitalize_system_category_names` | +| `0008` | `rename_file_path_to_storage_key` | --- diff --git a/features/doc-service/alembic/versions/0008_rename_file_path_to_storage_key.py b/features/doc-service/alembic/versions/0008_rename_file_path_to_storage_key.py new file mode 100644 index 0000000..5228a30 --- /dev/null +++ b/features/doc-service/alembic/versions/0008_rename_file_path_to_storage_key.py @@ -0,0 +1,56 @@ +"""rename file_path to storage_key and strip filesystem prefix from existing rows + +Revision ID: 0008 +Revises: 0007 +Create Date: 2026-04-20 + +Renames the documents.file_path column to storage_key. +Existing rows have paths like '/data/documents/{user_id}/{doc_id}.pdf' or +'/data/documents/watch/{doc_id}.pdf'. The migration strips the leading +'/data/documents/' prefix so the value becomes a plain storage key +(e.g. '{user_id}/{doc_id}.pdf') that the storage-service uses as the object key. +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + +revision: str = "0008" +down_revision: Union[str, None] = "0007" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + with op.batch_alter_table("documents") as batch_op: + batch_op.alter_column( + "file_path", + new_column_name="storage_key", + existing_type=sa.String(), + existing_nullable=False, + ) + + # Strip the '/data/documents/' filesystem prefix from pre-migration rows. + op.execute( + sa.text( + "UPDATE documents SET storage_key = REPLACE(storage_key, '/data/documents/', '')" + " WHERE storage_key LIKE '/data/documents/%'" + ) + ) + + +def downgrade() -> None: + # Restore the filesystem prefix so old code can still find the files. + op.execute( + sa.text( + "UPDATE documents SET storage_key = '/data/documents/' || storage_key" + " WHERE storage_key NOT LIKE '/data/documents/%'" + ) + ) + with op.batch_alter_table("documents") as batch_op: + batch_op.alter_column( + "storage_key", + new_column_name="file_path", + existing_type=sa.String(), + existing_nullable=False, + ) diff --git a/features/doc-service/app/core/config.py b/features/doc-service/app/core/config.py index 8af5112..978c4db 100644 --- a/features/doc-service/app/core/config.py +++ b/features/doc-service/app/core/config.py @@ -7,6 +7,7 @@ class Settings(BaseSettings): DATA_DIR: str = "/data/documents" CONFIG_PATH: str = "/config/doc_service_config.json" AI_SERVICE_URL: str = "http://ai-service:8010" + STORAGE_SERVICE_URL: str = "http://storage-service:8020" class Config: env_file = ".env" diff --git a/features/doc-service/app/models/document.py b/features/doc-service/app/models/document.py index 6339c3e..7aaa0b9 100644 --- a/features/doc-service/app/models/document.py +++ b/features/doc-service/app/models/document.py @@ -13,7 +13,7 @@ class Document(Base): id: Mapped[str] = mapped_column(String, primary_key=True, default=lambda: str(uuid.uuid4())) user_id: Mapped[str] = mapped_column(String, nullable=False, index=True) filename: Mapped[str] = mapped_column(String, nullable=False) - file_path: Mapped[str] = mapped_column(String, nullable=False) + storage_key: Mapped[str] = mapped_column(String, nullable=False) file_size: Mapped[int] = mapped_column(Integer, nullable=False) status: Mapped[str] = mapped_column(String, nullable=False, default="pending") title: Mapped[str | None] = mapped_column(String(500), nullable=True) diff --git a/features/doc-service/app/routers/documents.py b/features/doc-service/app/routers/documents.py index 942c997..9e4f797 100644 --- a/features/doc-service/app/routers/documents.py +++ b/features/doc-service/app/routers/documents.py @@ -1,10 +1,10 @@ import asyncio +import io import json import math import uuid from datetime import datetime, timezone -import aiofiles import pdfplumber from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query, UploadFile from fastapi.responses import StreamingResponse @@ -29,7 +29,7 @@ from app.schemas.document import ( from app.schemas.share import DocumentShareCreate, DocumentShareOut, SharedDocumentOut from app.services.ai_client import AIServiceError, classify_document from app.services.config_reader import load_doc_config -from app.services.storage import delete_file, get_upload_path, save_upload +from app.services.storage import delete_file, download_file as storage_download, save_upload router = APIRouter() @@ -118,10 +118,10 @@ def _doc_with_categories( ) -def _extract_pdf_text(file_path: str) -> str: +def _extract_pdf_text(pdf_bytes: bytes) -> str: """Synchronous — must be called via asyncio.to_thread.""" text_parts = [] - with pdfplumber.open(file_path) as pdf: + with pdfplumber.open(io.BytesIO(pdf_bytes)) as pdf: for page in pdf.pages: page_text = page.extract_text() if page_text: @@ -146,7 +146,8 @@ async def process_document(doc_id: str) -> None: await db.commit() try: - text = await asyncio.to_thread(_extract_pdf_text, doc.file_path) + pdf_bytes = await storage_download(doc.storage_key) + text = await asyncio.to_thread(_extract_pdf_text, pdf_bytes) result = await classify_document(text) doc.raw_text = text[:500_000] # cap stored text at 500k chars @@ -187,13 +188,13 @@ async def upload_document( ) doc_id = str(uuid.uuid4()) - dest = await save_upload(file_data, user_id, doc_id) + storage_key = await save_upload(file_data, user_id, doc_id) doc = Document( id=doc_id, user_id=user_id, filename=file.filename or "upload.pdf", - file_path=str(dest), + storage_key=storage_key, file_size=len(file_data), status="pending", ) @@ -578,7 +579,7 @@ async def delete_document( if not can_delete_via_share and not can_delete_as_group_admin: raise HTTPException(status_code=403, detail="Not allowed to delete this document") - delete_file(doc.file_path) + await delete_file(doc.storage_key) await db.delete(doc) await db.commit() @@ -609,13 +610,13 @@ async def download_file( if doc is None: raise HTTPException(status_code=404, detail="Document not found") - async def file_generator(): - async with aiofiles.open(doc.file_path, "rb") as f: - while chunk := await f.read(64 * 1024): - yield chunk + try: + pdf_bytes = await storage_download(doc.storage_key) + except FileNotFoundError: + raise HTTPException(status_code=404, detail="File not found in storage") return StreamingResponse( - file_generator(), + iter([pdf_bytes]), media_type="application/pdf", headers={"Content-Disposition": f'inline; filename="{doc.filename}"'}, ) diff --git a/features/doc-service/app/services/config_reader.py b/features/doc-service/app/services/config_reader.py index a3db812..da2dd3d 100644 --- a/features/doc-service/app/services/config_reader.py +++ b/features/doc-service/app/services/config_reader.py @@ -1,19 +1,20 @@ """ -Reads doc_service_config.json from the shared config volume. +Reads doc_service_config.json from the storage-service config bucket. 30-second TTL cache + env var overrides. Env var overrides (all optional): DOC_MAX_PDF_MB — max upload size in megabytes (e.g. "50") """ -import asyncio -import json import os import time from copy import deepcopy -from pathlib import Path + +import httpx from app.core.config import settings +_CONFIG_KEY = "doc_service_config.json" + _DEFAULT_STORAGE_CONFIG: dict = { "watch_enabled": False, "watch_path": "/data/watch", @@ -63,33 +64,30 @@ _cache_at: float = 0.0 _CACHE_TTL = 30.0 -def _read_config_sync() -> dict: - path = Path(settings.CONFIG_PATH) - if not path.exists(): - base = deepcopy(_DEFAULT_CONFIG) - else: - with open(path) as f: - base = json.load(f) - return _apply_env_overrides(base) +def _storage_url() -> str: + return f"{settings.STORAGE_SERVICE_URL}/objects/config/{_CONFIG_KEY}" -def _read_config_sync_raw() -> dict: - """Read without env overrides — used when we need to write back to disk.""" - path = Path(settings.CONFIG_PATH) - if not path.exists(): - return deepcopy(_DEFAULT_CONFIG) - with open(path) as f: - return json.load(f) +async def _fetch_config() -> dict: + """Fetch config from storage-service. Returns defaults if not found.""" + async with httpx.AsyncClient(timeout=10.0) as client: + resp = await client.get(_storage_url()) + if resp.status_code == 404: + return deepcopy(_DEFAULT_CONFIG) + resp.raise_for_status() + return resp.json() -def _write_config_sync(config: dict) -> None: - """Atomically write config JSON to disk.""" - path = Path(settings.CONFIG_PATH) - tmp = path.with_suffix(".tmp") - tmp.parent.mkdir(parents=True, exist_ok=True) - with open(tmp, "w") as f: - json.dump(config, f, indent=2) - os.replace(tmp, path) +async def _write_config(data: dict) -> None: + import json + payload = json.dumps(data, indent=2).encode() + async with httpx.AsyncClient(timeout=10.0) as client: + resp = await client.put( + _storage_url(), + content=payload, + headers={"Content-Type": "application/octet-stream"}, + ) + resp.raise_for_status() def _apply_env_overrides(config: dict) -> dict: @@ -108,7 +106,8 @@ async def load_doc_config() -> dict: now = time.monotonic() if _cache is not None and (now - _cache_at) < _CACHE_TTL: return _cache - data = await asyncio.to_thread(_read_config_sync) + raw = await _fetch_config() + data = _apply_env_overrides(raw) _cache = data _cache_at = now return data @@ -123,11 +122,10 @@ async def get_storage_config() -> dict: async def save_storage_config(data: dict) -> None: - """Merge data into the storage config block and persist to disk.""" + """Merge data into the storage config block and persist to storage-service.""" global _cache, _cache_at - raw = await asyncio.to_thread(_read_config_sync_raw) + raw = await _fetch_config() raw.setdefault("storage", {}).update(data) - await asyncio.to_thread(_write_config_sync, raw) - # Invalidate cache so next read picks up the new values + await _write_config(raw) _cache = None _cache_at = 0.0 diff --git a/features/doc-service/app/services/file_watcher.py b/features/doc-service/app/services/file_watcher.py index adb06f7..99e04fe 100644 --- a/features/doc-service/app/services/file_watcher.py +++ b/features/doc-service/app/services/file_watcher.py @@ -3,7 +3,7 @@ File-system watcher for the watch directory. Uses the watchdog library to monitor a configured directory for new PDF files. When a PDF is detected, it is automatically ingested into the document service -(copied to /data/documents, a DB record is created, and the AI pipeline runs). +(uploaded to storage-service, a DB record is created, and the AI pipeline runs). Key design decisions: - No-remove policy: on_deleted and on_moved events are intentionally ignored. @@ -82,13 +82,13 @@ async def ingest_file(path_str: str, watch_root: Path, config: dict) -> None: logger.warning("[watcher] Cannot read %s: %s", path_str, exc) return - # Save a copy to /data/documents/watch/{doc_id}.pdf + # Upload to storage-service under documents/watch/{doc_id}.pdf doc_id = existing.id if existing is not None else str(uuid.uuid4()) - dest = await save_upload(file_data, WATCH_USER_ID, doc_id) + storage_key = await save_upload(file_data, WATCH_USER_ID, doc_id) if existing is not None: # Re-ingest a previously failed document - existing.file_path = str(dest) + existing.storage_key = storage_key existing.file_size = len(file_data) existing.status = "pending" existing.error_message = None @@ -100,7 +100,7 @@ async def ingest_file(path_str: str, watch_root: Path, config: dict) -> None: source="watch", watch_path=path_str, filename=path.name, - file_path=str(dest), + storage_key=storage_key, file_size=len(file_data), status="pending", ) diff --git a/features/doc-service/app/services/storage.py b/features/doc-service/app/services/storage.py index 45ea418..c0fb45d 100644 --- a/features/doc-service/app/services/storage.py +++ b/features/doc-service/app/services/storage.py @@ -1,27 +1,61 @@ -import asyncio -from pathlib import Path +""" +Storage client for the storage-service HTTP API. -import aiofiles +All persistent file I/O goes through storage-service:8020. +The bucket for all document PDFs is 'documents'. +Keys follow the pattern: + uploaded: {user_id}/{doc_id}.pdf + watch-ingested: watch/{doc_id}.pdf +""" +import logging + +import httpx from app.core.config import settings +logger = logging.getLogger(__name__) -def get_upload_path(user_id: str, doc_id: str) -> Path: - """Return /data/documents/{user_id}/{doc_id}.pdf, creating the directory if needed.""" - user_dir = Path(settings.DATA_DIR) / user_id - user_dir.mkdir(parents=True, exist_ok=True) - return user_dir / f"{doc_id}.pdf" +_BUCKET = "documents" -async def save_upload(file_data: bytes, user_id: str, doc_id: str) -> Path: - dest = get_upload_path(user_id, doc_id) - async with aiofiles.open(dest, "wb") as f: - await f.write(file_data) - return dest +def _storage_url(key: str) -> str: + return f"{settings.STORAGE_SERVICE_URL}/objects/{_BUCKET}/{key}" -def delete_file(file_path: str) -> None: +def build_storage_key(user_id: str, doc_id: str) -> str: + """Return the canonical storage key for a document.""" + return f"{user_id}/{doc_id}.pdf" + + +async def save_upload(file_data: bytes, user_id: str, doc_id: str) -> str: + """Upload bytes to storage-service. Returns the storage key.""" + key = build_storage_key(user_id, doc_id) + async with httpx.AsyncClient(timeout=30.0) as client: + resp = await client.put( + _storage_url(key), + content=file_data, + headers={"Content-Type": "application/octet-stream"}, + ) + resp.raise_for_status() + return key + + +async def download_file(storage_key: str) -> bytes: + """Download bytes from storage-service by storage key.""" + async with httpx.AsyncClient(timeout=60.0) as client: + resp = await client.get(_storage_url(storage_key)) + if resp.status_code == 404: + raise FileNotFoundError(f"Object not found: {storage_key}") + resp.raise_for_status() + return resp.content + + +async def delete_file(storage_key: str) -> None: + """Delete an object from storage-service. Swallows errors — deletion failure must not 500.""" try: - Path(file_path).unlink(missing_ok=True) - except OSError: - pass # log but do not raise — deletion failure must not 500 + async with httpx.AsyncClient(timeout=10.0) as client: + resp = await client.delete(_storage_url(storage_key)) + if resp.status_code not in (204, 404): + logger.warning("storage-service DELETE returned %s for key %s", resp.status_code, storage_key) + except Exception as exc: + logger.warning("Could not delete %s from storage-service: %s", storage_key, exc) diff --git a/features/storage-service/CLAUDE.md b/features/storage-service/CLAUDE.md new file mode 100644 index 0000000..ed24a86 --- /dev/null +++ b/features/storage-service/CLAUDE.md @@ -0,0 +1,115 @@ +# storage-service — Claude context + +Unified file/blob storage microservice, port 8020 (internal). All services must use this service's +HTTP API for any file persistence — no service may write to a Docker volume directly. See root +`CLAUDE.md` for architecture, Docker, and project-wide workflows. + +--- + +## Architecture rule (enforced) + +**No service may write to a filesystem path for persistent data.** +All file/blob storage must go through the storage-service HTTP API. +Violation is a security/architecture defect. + +--- + +## File & Folder Tree + +``` +features/storage-service/ +├── app/ +│ ├── main.py ← FastAPI, lifespan (backend init) +│ ├── core/config.py ← Settings (DATA_DIR, STORAGE_BACKEND, S3_*, WEBDAV_*) +│ ├── routers/ +│ │ ├── health.py ← GET /health +│ │ ├── objects.py ← PUT/GET/DELETE /objects/{bucket}/{key:path}, GET /objects/{bucket} +│ │ └── migrate.py ← POST /migrate, GET /migrate/status, DELETE /migrate, PATCH /backend-config +│ └── services/ +│ ├── backend_manager.py ← build_backend(), initialize_backend(), get_backend(), switch_backend() +│ ├── migration.py ← run_migration(), get_status(), cancel(); KNOWN_BUCKETS +│ └── backends/ +│ ├── base.py ← AbstractStorageBackend (ABC) +│ ├── local.py ← LocalFSBackend — /data/storage/{bucket}/{key} +│ ├── s3.py ← S3Backend — aiobotocore, endpoint_url configurable +│ └── webdav.py ← WebDAVBackend — aiohttp + WebDAV PROPFIND/PUT/GET/DELETE +├── scripts/ +│ ├── start.sh ← prod start (uvicorn port 8020) +│ └── start_dev.sh ← dev start (uvicorn --reload) +├── Dockerfile ← python:3.12-slim, non-root user 1001 +└── STATUS.md +``` + +--- + +## HTTP API + +### Objects + +| Method | Path | Body | Response | +|--------|------|------|----------| +| PUT | `/objects/{bucket}/{key:path}` | Raw bytes | 204 | +| GET | `/objects/{bucket}/{key:path}` | — | 200 Raw bytes / 404 | +| DELETE | `/objects/{bucket}/{key:path}` | — | 204 | +| GET | `/objects/{bucket}` | — | `{"bucket": "...", "keys": [...]}` | + +Keys may contain `/` (e.g. `user123/abc.pdf`). Path traversal (`..`) returns 400. + +### Migration + +| Method | Path | Body | Response | +|--------|------|------|----------| +| POST | `/migrate` | `{"driver": "s3", "config": {...}}` | 202 / 400 / 409 | +| GET | `/migrate/status` | — | `{state, total, done, failed, errors[]}` | +| DELETE | `/migrate` | — | 204 / 409 | +| PATCH | `/backend-config` | `{"driver": "...", "config": {...}}` | 204 / 400 / 409 | + +Migration states: `idle → validating → migrating → switching → cleaning → done` (or `failed`/`cancelled`) + +### Health + +| Method | Path | Response | +|--------|------|----------| +| GET | `/health` | `{"status": "ok", "backend": "local"}` | + +--- + +## Buckets + +| Bucket | Contents | Key format | +|--------|----------|------------| +| `documents` | Uploaded PDFs | `{user_id}/{doc_id}.pdf` or `watch/{doc_id}.pdf` | +| `config` | JSON config files | `{service_name}_config.json` | + +To add a new bucket: add it to `KNOWN_BUCKETS` in `services/migration.py` so it is included in migrations. + +--- + +## Backend drivers + +| Driver | Config fields | Notes | +|--------|---------------|-------| +| `local` | `data_dir` (optional) | Default. Files under `/data/storage/`. Zero external deps. | +| `s3` | `endpoint_url`, `access_key`, `secret_key`, `region` | Works with MinIO, AWS S3, Backblaze B2, Cloudflare R2. Set `endpoint_url=""` for real AWS. | +| `webdav` | `url`, `username`, `password`, `root_path` | Nextcloud: set root_path to `/remote.php/dav/files/{username}` | + +--- + +## Adding a new backend driver + +1. Create `app/services/backends/your_driver.py` implementing `AbstractStorageBackend` +2. Add a branch in `build_backend()` in `backend_manager.py` +3. Add config fields to `app/core/config.py` if env-based config is needed +4. Document driver name + config fields in this file + +--- + +## Default Values & Limits + +| Parameter | Value | Location | +|-----------|-------|----------| +| Default backend | `local` | `STORAGE_BACKEND` env var | +| Local data dir | `/data/storage` | `DATA_DIR` env var | +| S3 region default | `us-east-1` | `S3_REGION` env var | +| Migration error cap in response | 50 | `migration.py` | +| Port | 8020 | `scripts/start.sh` | diff --git a/features/storage-service/Dockerfile b/features/storage-service/Dockerfile new file mode 100644 index 0000000..e5eaec9 --- /dev/null +++ b/features/storage-service/Dockerfile @@ -0,0 +1,32 @@ +# ── Stage 1: dependency installation ───────────────────────────────────────── +FROM python:3.12-slim AS builder + +WORKDIR /app + +RUN pip install --upgrade pip + +COPY pyproject.toml . +RUN pip install --prefix=/install . + +# ── Stage 2: runtime ────────────────────────────────────────────────────────── +FROM python:3.12-slim + +# Create non-root user (UID/GID 1001) +RUN groupadd --gid 1001 appuser && \ + useradd --uid 1001 --gid 1001 --no-create-home --shell /bin/sh appuser + +# Pre-create data dir with correct ownership. +# Named volume mounted over this path will inherit ownership on first creation. +RUN mkdir -p /data/storage && chown -R appuser:appuser /data + +WORKDIR /app + +COPY --from=builder /install /usr/local +COPY --chown=appuser:appuser app ./app +COPY --chown=appuser:appuser scripts ./scripts + +USER appuser + +EXPOSE 8020 + +CMD ["sh", "scripts/start.sh"] diff --git a/features/storage-service/STATUS.md b/features/storage-service/STATUS.md new file mode 100644 index 0000000..906b71c --- /dev/null +++ b/features/storage-service/STATUS.md @@ -0,0 +1,83 @@ +# Storage Service — Status + +## What it is +Unified file/blob storage microservice, port 8020 (internal). All services store and retrieve files +through its HTTP API — no service writes to a Docker volume directly. Uses a pluggable backend +driver (local FS by default; S3-compatible and WebDAV available). Backend is switchable at runtime +via admin settings with automatic data migration. + +## Current functionality + +### Object API (`/objects`) + +| Method | Path | Description | +|--------|------|-------------| +| PUT | `/objects/{bucket}/{key}` | Upload raw bytes | +| GET | `/objects/{bucket}/{key}` | Download raw bytes | +| DELETE | `/objects/{bucket}/{key}` | Delete object | +| GET | `/objects/{bucket}` | List all keys in bucket | + +### Migration API (`/migrate`) + +| Method | Path | Description | +|--------|------|-------------| +| POST | `/migrate` | Start migration to a new backend (validates, copies, switches, cleans) | +| GET | `/migrate/status` | Poll migration progress | +| DELETE | `/migrate` | Cancel in-progress migration | +| PATCH | `/backend-config` | Reconfigure backend without migrating data | + +### Health + +| Method | Path | Description | +|--------|------|-------------| +| GET | `/health` | `{"status": "ok", "backend": ""}` | + +### Buckets + +| Bucket | Contents | +|--------|----------| +| `documents` | Uploaded PDFs (keyed as `{user_id}/{doc_id}.pdf` or `watch/{doc_id}.pdf`) | +| `config` | JSON config files (replaces `app_config` volume — Phase 3) | + +## Architecture + +``` +backend / doc-service / future-svc + │ HTTP + ▼ + storage-service:8020 + │ + backend_manager + │ + ┌──────┴──────────────────┐ + │ │ + LocalFSBackend S3Backend / WebDAVBackend + /data/storage/ (configured via admin UI) + {bucket}/{key} +``` + +Migration flow: +``` +POST /migrate { driver, config } + → test_connection() (validate) + → list all objects in KNOWN_BUCKETS (enumerate) + → GET old / PUT new / exists verify (copy + verify, per object) + → if 0 failures: switch_backend() (atomic switch) + → DELETE old objects (cleanup) +``` + +## Known limitations / not implemented + +- Migration state is in-memory — a container restart during migration loses progress (restart restarts from scratch) +- No presigned URL support (direct client downloads go through the API) +- rclone backends (Google Drive, OneDrive, Dropbox) not yet implemented +- No per-object metadata or content-type headers +- No multipart upload for very large files (> available RAM) + +## Future work + +- [ ] rclone-based backend adapter (GDrive, OneDrive, Dropbox) +- [ ] Presigned URL generation for direct browser downloads +- [ ] Persist migration state to DB so restarts can resume +- [ ] Streaming upload/download to avoid buffering entire file in memory +- [ ] Per-bucket access policies diff --git a/features/storage-service/app/__init__.py b/features/storage-service/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/features/storage-service/app/core/__init__.py b/features/storage-service/app/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/features/storage-service/app/core/config.py b/features/storage-service/app/core/config.py new file mode 100644 index 0000000..5fae515 --- /dev/null +++ b/features/storage-service/app/core/config.py @@ -0,0 +1,23 @@ +from pydantic_settings import BaseSettings, SettingsConfigDict + + +class Settings(BaseSettings): + model_config = SettingsConfigDict(env_file=".env", extra="ignore") + + DATA_DIR: str = "/data/storage" + STORAGE_BACKEND: str = "local" # local | s3 | webdav + + # S3-compatible (MinIO, AWS S3, Backblaze B2, Cloudflare R2, …) + S3_ENDPOINT_URL: str = "" # leave empty for real AWS S3 + S3_ACCESS_KEY: str = "" + S3_SECRET_KEY: str = "" + S3_REGION: str = "us-east-1" + + # WebDAV (Nextcloud, …) + WEBDAV_URL: str = "" + WEBDAV_USERNAME: str = "" + WEBDAV_PASSWORD: str = "" + WEBDAV_ROOT_PATH: str = "/" + + +settings = Settings() diff --git a/features/storage-service/app/main.py b/features/storage-service/app/main.py new file mode 100644 index 0000000..277088d --- /dev/null +++ b/features/storage-service/app/main.py @@ -0,0 +1,29 @@ +import logging +from contextlib import asynccontextmanager + +from fastapi import FastAPI + +from app.core.config import settings +from app.routers import health, objects, migrate +from app.services.backend_manager import initialize_backend + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", +) +logger = logging.getLogger(__name__) + + +@asynccontextmanager +async def lifespan(app: FastAPI): + initialize_backend() + logger.info("storage-service started (backend=%s)", settings.STORAGE_BACKEND) + yield + logger.info("storage-service shutting down") + + +app = FastAPI(title="Storage Service", lifespan=lifespan) + +app.include_router(health.router) +app.include_router(objects.router) +app.include_router(migrate.router) diff --git a/features/storage-service/app/routers/__init__.py b/features/storage-service/app/routers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/features/storage-service/app/routers/health.py b/features/storage-service/app/routers/health.py new file mode 100644 index 0000000..9cb782c --- /dev/null +++ b/features/storage-service/app/routers/health.py @@ -0,0 +1,10 @@ +from fastapi import APIRouter + +from app.services.backend_manager import get_backend + +router = APIRouter() + + +@router.get("/health") +async def health(): + return {"status": "ok", "backend": get_backend().driver_name} diff --git a/features/storage-service/app/routers/migrate.py b/features/storage-service/app/routers/migrate.py new file mode 100644 index 0000000..55a4f10 --- /dev/null +++ b/features/storage-service/app/routers/migrate.py @@ -0,0 +1,88 @@ +import logging + +from fastapi import APIRouter, BackgroundTasks, HTTPException +from pydantic import BaseModel + +from app.services import migration +from app.services.backend_manager import build_backend, switch_backend + +router = APIRouter() +logger = logging.getLogger(__name__) + + +class MigrateRequest(BaseModel): + driver: str + config: dict = {} + + +class BackendConfigRequest(BaseModel): + driver: str + config: dict = {} + + +@router.post("/migrate", status_code=202) +async def start_migration(body: MigrateRequest, background_tasks: BackgroundTasks): + """ + Validate the new backend, then start an async migration job that: + 1. Copies all objects from the current backend to the new one + 2. Verifies every object + 3. Atomically switches the active backend + 4. Deletes all objects from the old backend + + Returns 409 if a migration is already in progress. + Returns 400 if the new backend config fails validation. + """ + if migration.is_in_progress(): + raise HTTPException(status_code=409, detail="A migration is already in progress") + + # Reset status and enter validating state before any async work + migration._status.state = "validating" + migration._status.total = 0 + migration._status.done = 0 + migration._status.failed = 0 + migration._status.errors.clear() + + try: + new_backend = build_backend(body.driver, body.config) + await new_backend.test_connection() + except Exception as exc: + migration._status.state = "idle" + raise HTTPException(status_code=400, detail=f"Backend validation failed: {exc}") + + background_tasks.add_task(migration.run_migration, new_backend) + return {"status": "started", "driver": body.driver} + + +@router.get("/migrate/status") +async def migration_status(): + """Poll this to track migration progress.""" + return migration.get_status() + + +@router.delete("/migrate", status_code=204) +async def cancel_migration(): + """ + Request cancellation of a running migration. + The old backend remains active. Returns 409 if no migration is running. + """ + cancelled = await migration.cancel() + if not cancelled: + raise HTTPException(status_code=409, detail="No cancellable migration in progress") + + +@router.patch("/backend-config", status_code=204) +async def update_backend_config(body: BackendConfigRequest): + """ + Reconfigure the active backend without migrating data (e.g. update S3 credentials + for the same endpoint, or switch back to local after a failed migration). + + Use POST /migrate when you need data to be moved to the new backend. + """ + if migration.is_in_progress(): + raise HTTPException(status_code=409, detail="Cannot reconfigure while migration is in progress") + try: + new_backend = build_backend(body.driver, body.config) + await new_backend.test_connection() + except Exception as exc: + raise HTTPException(status_code=400, detail=f"Backend validation failed: {exc}") + switch_backend(new_backend) diff --git a/features/storage-service/app/routers/objects.py b/features/storage-service/app/routers/objects.py new file mode 100644 index 0000000..84a6777 --- /dev/null +++ b/features/storage-service/app/routers/objects.py @@ -0,0 +1,60 @@ +from fastapi import APIRouter, HTTPException, Request +from fastapi.responses import Response + +from app.services.backend_manager import get_backend + +router = APIRouter() + + +def _validate_key(key: str) -> str: + """Reject path traversal. Key may contain '/' for nested objects (e.g. user/doc.pdf).""" + parts = key.split("/") + if ".." in parts: + raise HTTPException(status_code=400, detail="Invalid key: path traversal not allowed") + return key + + +@router.put("/objects/{bucket}/{key:path}", status_code=204) +async def put_object(bucket: str, key: str, request: Request): + """Upload raw bytes. Body is read as-is (application/octet-stream).""" + _validate_key(key) + data = await request.body() + try: + await get_backend().put(bucket, key, data) + except ValueError as exc: + raise HTTPException(status_code=400, detail=str(exc)) + except Exception as exc: + raise HTTPException(status_code=500, detail=str(exc)) + + +@router.get("/objects/{bucket}/{key:path}") +async def get_object(bucket: str, key: str): + """Download raw bytes.""" + _validate_key(key) + try: + data = await get_backend().get(bucket, key) + except KeyError: + raise HTTPException(status_code=404, detail="Object not found") + except Exception as exc: + raise HTTPException(status_code=500, detail=str(exc)) + return Response(content=data, media_type="application/octet-stream") + + +@router.delete("/objects/{bucket}/{key:path}", status_code=204) +async def delete_object(bucket: str, key: str): + """Delete an object. No-op if it does not exist.""" + _validate_key(key) + try: + await get_backend().delete(bucket, key) + except Exception as exc: + raise HTTPException(status_code=500, detail=str(exc)) + + +@router.get("/objects/{bucket}") +async def list_objects(bucket: str): + """List all keys in a bucket.""" + try: + keys = await get_backend().list_keys(bucket) + except Exception as exc: + raise HTTPException(status_code=500, detail=str(exc)) + return {"bucket": bucket, "keys": keys} diff --git a/features/storage-service/app/services/__init__.py b/features/storage-service/app/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/features/storage-service/app/services/backend_manager.py b/features/storage-service/app/services/backend_manager.py new file mode 100644 index 0000000..95bd996 --- /dev/null +++ b/features/storage-service/app/services/backend_manager.py @@ -0,0 +1,70 @@ +import logging + +from app.core.config import settings +from app.services.backends.base import AbstractStorageBackend +from app.services.backends.local import LocalFSBackend +from app.services.backends.s3 import S3Backend +from app.services.backends.webdav import WebDAVBackend + +logger = logging.getLogger(__name__) + +_active_backend: AbstractStorageBackend | None = None + + +def build_backend(driver: str, config: dict) -> AbstractStorageBackend: + """Construct a backend instance from a driver name + config dict.""" + if driver == "local": + return LocalFSBackend(data_dir=config.get("data_dir", settings.DATA_DIR)) + if driver == "s3": + return S3Backend( + endpoint_url=config.get("endpoint_url", ""), + access_key=config.get("access_key", ""), + secret_key=config.get("secret_key", ""), + region=config.get("region", "us-east-1"), + ) + if driver == "webdav": + return WebDAVBackend( + url=config.get("url", ""), + username=config.get("username", ""), + password=config.get("password", ""), + root_path=config.get("root_path", "/"), + ) + raise ValueError(f"Unknown driver: {driver!r}. Valid options: local, s3, webdav") + + +def initialize_backend() -> None: + """Build the initial backend from environment variables at startup.""" + global _active_backend + driver = settings.STORAGE_BACKEND + config: dict = {} + if driver == "s3": + config = { + "endpoint_url": settings.S3_ENDPOINT_URL, + "access_key": settings.S3_ACCESS_KEY, + "secret_key": settings.S3_SECRET_KEY, + "region": settings.S3_REGION, + } + elif driver == "webdav": + config = { + "url": settings.WEBDAV_URL, + "username": settings.WEBDAV_USERNAME, + "password": settings.WEBDAV_PASSWORD, + "root_path": settings.WEBDAV_ROOT_PATH, + } + # local needs no extra config — DATA_DIR is read from settings inside build_backend + _active_backend = build_backend(driver, config) + logger.info("Storage backend initialized: %s", driver) + + +def get_backend() -> AbstractStorageBackend: + if _active_backend is None: + raise RuntimeError("Backend not initialized — call initialize_backend() at startup") + return _active_backend + + +def switch_backend(new_backend: AbstractStorageBackend) -> None: + """Replace the active backend. Called by the migration job after all data is verified.""" + global _active_backend + old_name = _active_backend.driver_name if _active_backend else "none" + _active_backend = new_backend + logger.info("Storage backend switched: %s → %s", old_name, new_backend.driver_name) diff --git a/features/storage-service/app/services/backends/__init__.py b/features/storage-service/app/services/backends/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/features/storage-service/app/services/backends/base.py b/features/storage-service/app/services/backends/base.py new file mode 100644 index 0000000..7defc1f --- /dev/null +++ b/features/storage-service/app/services/backends/base.py @@ -0,0 +1,34 @@ +from abc import ABC, abstractmethod + + +class AbstractStorageBackend(ABC): + """Common interface every storage backend must implement.""" + + @property + @abstractmethod + def driver_name(self) -> str: + """Short identifier returned in /health: 'local', 's3', or 'webdav'.""" + + @abstractmethod + async def put(self, bucket: str, key: str, data: bytes) -> None: + """Store *data* under bucket/key. Creates bucket/intermediate dirs as needed.""" + + @abstractmethod + async def get(self, bucket: str, key: str) -> bytes: + """Return the stored bytes. Raises KeyError if the object does not exist.""" + + @abstractmethod + async def delete(self, bucket: str, key: str) -> None: + """Delete the object. No-op if it does not exist.""" + + @abstractmethod + async def list_keys(self, bucket: str) -> list[str]: + """Return all keys stored in *bucket*. Returns [] if bucket is empty/absent.""" + + @abstractmethod + async def exists(self, bucket: str, key: str) -> bool: + """Return True if the object exists.""" + + @abstractmethod + async def test_connection(self) -> None: + """Verify the backend is reachable and writable. Raise on failure.""" diff --git a/features/storage-service/app/services/backends/local.py b/features/storage-service/app/services/backends/local.py new file mode 100644 index 0000000..bde237f --- /dev/null +++ b/features/storage-service/app/services/backends/local.py @@ -0,0 +1,67 @@ +import asyncio +from pathlib import Path + +import aiofiles + +from .base import AbstractStorageBackend + + +class LocalFSBackend(AbstractStorageBackend): + """Stores objects as files under //.""" + + def __init__(self, data_dir: str) -> None: + self._root = Path(data_dir) + self._root.mkdir(parents=True, exist_ok=True) + + @property + def driver_name(self) -> str: + return "local" + + def _resolve(self, bucket: str, key: str) -> Path: + safe_key = key.lstrip("/") + if ".." in safe_key.split("/"): + raise ValueError(f"Invalid key: {key!r}") + return self._root / bucket / safe_key + + async def put(self, bucket: str, key: str, data: bytes) -> None: + dest = self._resolve(bucket, key) + await asyncio.to_thread(dest.parent.mkdir, parents=True, exist_ok=True) + async with aiofiles.open(dest, "wb") as f: + await f.write(data) + + async def get(self, bucket: str, key: str) -> bytes: + path = self._resolve(bucket, key) + if not path.exists(): + raise KeyError(f"{bucket}/{key}") + async with aiofiles.open(path, "rb") as f: + return await f.read() + + async def delete(self, bucket: str, key: str) -> None: + path = self._resolve(bucket, key) + try: + await asyncio.to_thread(path.unlink, missing_ok=True) + except OSError: + pass + + async def list_keys(self, bucket: str) -> list[str]: + bucket_dir = self._root / bucket + if not bucket_dir.exists(): + return [] + + def _scan() -> list[str]: + return [ + str(p.relative_to(bucket_dir)) + for p in bucket_dir.rglob("*") + if p.is_file() + ] + + return await asyncio.to_thread(_scan) + + async def exists(self, bucket: str, key: str) -> bool: + return self._resolve(bucket, key).exists() + + async def test_connection(self) -> None: + self._root.mkdir(parents=True, exist_ok=True) + probe = self._root / ".health_probe" + probe.write_bytes(b"ok") + probe.unlink() diff --git a/features/storage-service/app/services/backends/s3.py b/features/storage-service/app/services/backends/s3.py new file mode 100644 index 0000000..e8d52f6 --- /dev/null +++ b/features/storage-service/app/services/backends/s3.py @@ -0,0 +1,99 @@ +import logging +from contextlib import asynccontextmanager + +from aiobotocore.session import get_session + +from .base import AbstractStorageBackend + +logger = logging.getLogger(__name__) + + +class S3Backend(AbstractStorageBackend): + """ + S3-compatible backend. Works with AWS S3, MinIO, Backblaze B2, Cloudflare R2, etc. + Set endpoint_url to the service URL for non-AWS providers; leave empty for real AWS. + """ + + def __init__( + self, + endpoint_url: str, + access_key: str, + secret_key: str, + region: str = "us-east-1", + ) -> None: + self._endpoint_url = endpoint_url or None + self._access_key = access_key + self._secret_key = secret_key + self._region = region + self._session = get_session() + + @property + def driver_name(self) -> str: + return "s3" + + @asynccontextmanager + async def _client(self): + async with self._session.create_client( + "s3", + endpoint_url=self._endpoint_url, + aws_access_key_id=self._access_key, + aws_secret_access_key=self._secret_key, + region_name=self._region, + ) as client: + yield client + + async def _ensure_bucket(self, client, bucket: str) -> None: + try: + await client.head_bucket(Bucket=bucket) + except Exception: + try: + if self._region == "us-east-1": + await client.create_bucket(Bucket=bucket) + else: + await client.create_bucket( + Bucket=bucket, + CreateBucketConfiguration={"LocationConstraint": self._region}, + ) + except Exception as exc: + logger.debug("Bucket create skipped (may already exist): %s", exc) + + async def put(self, bucket: str, key: str, data: bytes) -> None: + async with self._client() as client: + await self._ensure_bucket(client, bucket) + await client.put_object(Bucket=bucket, Key=key, Body=data) + + async def get(self, bucket: str, key: str) -> bytes: + async with self._client() as client: + try: + response = await client.get_object(Bucket=bucket, Key=key) + return await response["Body"].read() + except Exception as exc: + raise KeyError(f"{bucket}/{key}") from exc + + async def delete(self, bucket: str, key: str) -> None: + async with self._client() as client: + await client.delete_object(Bucket=bucket, Key=key) + + async def list_keys(self, bucket: str) -> list[str]: + async with self._client() as client: + try: + paginator = client.get_paginator("list_objects_v2") + keys: list[str] = [] + async for page in paginator.paginate(Bucket=bucket): + for obj in page.get("Contents", []): + keys.append(obj["Key"]) + return keys + except Exception: + return [] + + async def exists(self, bucket: str, key: str) -> bool: + async with self._client() as client: + try: + await client.head_object(Bucket=bucket, Key=key) + return True + except Exception: + return False + + async def test_connection(self) -> None: + async with self._client() as client: + await client.list_buckets() diff --git a/features/storage-service/app/services/backends/webdav.py b/features/storage-service/app/services/backends/webdav.py new file mode 100644 index 0000000..2c80c50 --- /dev/null +++ b/features/storage-service/app/services/backends/webdav.py @@ -0,0 +1,121 @@ +import base64 +from urllib.parse import quote + +import defusedxml.ElementTree as ET + +import aiohttp + +from .base import AbstractStorageBackend + + +class WebDAVBackend(AbstractStorageBackend): + """ + WebDAV backend. Compatible with Nextcloud and any standard WebDAV server. + root_path should be the WebDAV root on the server, e.g. '/remote.php/dav/files/username'. + """ + + def __init__( + self, + url: str, + username: str, + password: str, + root_path: str = "/", + ) -> None: + self._base = url.rstrip("/") + self._root = root_path.rstrip("/") + creds = base64.b64encode(f"{username}:{password}".encode()).decode() + self._auth = f"Basic {creds}" + + @property + def driver_name(self) -> str: + return "webdav" + + def _url(self, *parts: str) -> str: + encoded = "/".join(quote(p, safe="") for p in parts) + return f"{self._base}{self._root}/{encoded}" + + def _headers(self, extra: dict | None = None) -> dict[str, str]: + h = {"Authorization": self._auth} + if extra: + h.update(extra) + return h + + async def _ensure_collection(self, session: aiohttp.ClientSession, *parts: str) -> None: + """MKCOL is idempotent — ignore 405 (already exists).""" + url = self._url(*parts) + async with session.request("MKCOL", url, headers=self._headers()) as resp: + if resp.status not in (200, 201, 405): + pass # best-effort; PUT will fail if directory is truly missing + + async def put(self, bucket: str, key: str, data: bytes) -> None: + async with aiohttp.ClientSession() as session: + await self._ensure_collection(session, bucket) + parts = key.split("/") + for i in range(1, len(parts)): + await self._ensure_collection(session, bucket, *parts[:i]) + url = self._url(bucket, key) + async with session.put(url, data=data, headers=self._headers()) as resp: + if resp.status not in (200, 201, 204): + raise OSError(f"WebDAV PUT {url} → {resp.status}") + + async def get(self, bucket: str, key: str) -> bytes: + async with aiohttp.ClientSession() as session: + url = self._url(bucket, key) + async with session.get(url, headers=self._headers()) as resp: + if resp.status == 404: + raise KeyError(f"{bucket}/{key}") + if resp.status != 200: + raise OSError(f"WebDAV GET {url} → {resp.status}") + return await resp.read() + + async def delete(self, bucket: str, key: str) -> None: + async with aiohttp.ClientSession() as session: + url = self._url(bucket, key) + async with session.delete(url, headers=self._headers()) as resp: + if resp.status not in (200, 204, 404): + raise OSError(f"WebDAV DELETE {url} → {resp.status}") + + async def list_keys(self, bucket: str) -> list[str]: + async with aiohttp.ClientSession() as session: + url = self._url(bucket) + headers = self._headers({"Depth": "infinity", "Content-Type": "application/xml"}) + body = '' + async with session.request("PROPFIND", url, headers=headers, data=body) as resp: + if resp.status == 404: + return [] + if resp.status != 207: + return [] + xml_body = await resp.text() + + ns = {"d": "DAV:"} + try: + root = ET.fromstring(xml_body) + except ET.ParseError: + return [] + + prefix = f"{self._base}{self._root}/{quote(bucket, safe='')}/" + keys: list[str] = [] + for response in root.findall("d:response", ns): + href = response.findtext("d:href", namespaces=ns) or "" + prop = response.find(".//d:prop", ns) + if prop is not None: + rt = prop.find("d:resourcetype", ns) + if rt is not None and rt.find("d:collection", ns) is not None: + continue # skip directories + if href.startswith(prefix): + keys.append(href[len(prefix):]) + return keys + + async def exists(self, bucket: str, key: str) -> bool: + async with aiohttp.ClientSession() as session: + url = self._url(bucket, key) + async with session.request("HEAD", url, headers=self._headers()) as resp: + return resp.status == 200 + + async def test_connection(self) -> None: + async with aiohttp.ClientSession() as session: + root_url = f"{self._base}{self._root}/" + headers = self._headers({"Depth": "0"}) + async with session.request("PROPFIND", root_url, headers=headers) as resp: + if resp.status not in (200, 207): + raise ConnectionError(f"WebDAV root PROPFIND → {resp.status}") diff --git a/features/storage-service/app/services/migration.py b/features/storage-service/app/services/migration.py new file mode 100644 index 0000000..93c7432 --- /dev/null +++ b/features/storage-service/app/services/migration.py @@ -0,0 +1,139 @@ +""" +Backend migration service. + +Flow: + 1. POST /migrate → validate new backend (test_connection) + 2. Background task enumerates all objects in all known buckets + 3. Each object is copied old → new, then verified + 4. Only after 100 % success: atomically switch active backend + 5. Delete all objects from old backend + 6. If any copy fails: old backend stays active; state = "failed" + 7. DELETE /migrate cancels a running migration (old backend stays active) +""" + +import logging +from dataclasses import dataclass, field +from typing import Literal + +from app.services.backends.base import AbstractStorageBackend +from app.services.backend_manager import get_backend, switch_backend + +logger = logging.getLogger(__name__) + +# All logical buckets the service knows about — enumerated during migration. +KNOWN_BUCKETS = ["documents", "config"] + +MigrationState = Literal[ + "idle", "validating", "migrating", "switching", "cleaning", "done", "failed", "cancelled" +] + + +@dataclass +class _MigrationStatus: + state: MigrationState = "idle" + total: int = 0 + done: int = 0 + failed: int = 0 + errors: list[str] = field(default_factory=list) + + +_status = _MigrationStatus() +_cancel_requested: bool = False + + +def get_status() -> dict: + return { + "state": _status.state, + "total": _status.total, + "done": _status.done, + "failed": _status.failed, + "errors": _status.errors[:50], # cap to avoid huge responses + } + + +def is_in_progress() -> bool: + return _status.state in ("validating", "migrating", "switching", "cleaning") + + +async def cancel() -> bool: + global _cancel_requested + if _status.state == "migrating": + _cancel_requested = True + return True + return False + + +async def run_migration(new_backend: AbstractStorageBackend) -> None: + """ + Background task: copy all objects to new_backend, verify, switch, clean old. + Called after the caller has already validated new_backend.test_connection(). + """ + global _cancel_requested + _cancel_requested = False + old_backend = get_backend() + + _status.state = "migrating" + _status.done = 0 + _status.failed = 0 + _status.errors.clear() + + try: + # Collect all objects across every known bucket + all_objects: list[tuple[str, str]] = [] + for bucket in KNOWN_BUCKETS: + try: + keys = await old_backend.list_keys(bucket) + for key in keys: + all_objects.append((bucket, key)) + except Exception as exc: + logger.warning("Could not list bucket %r: %s", bucket, exc) + + _status.total = len(all_objects) + logger.info("Migration: %d objects to migrate across %d buckets", len(all_objects), len(KNOWN_BUCKETS)) + + for bucket, key in all_objects: + if _cancel_requested: + _status.state = "cancelled" + logger.info("Migration cancelled (%d/%d done)", _status.done, _status.total) + return + + try: + data = await old_backend.get(bucket, key) + await new_backend.put(bucket, key, data) + if not await new_backend.exists(bucket, key): + raise OSError("Verification failed: object absent after PUT") + _status.done += 1 + except Exception as exc: + _status.failed += 1 + entry = f"{bucket}/{key}: {exc}" + _status.errors.append(entry) + logger.warning("Migration copy failed — %s", entry) + + if _status.failed > 0: + _status.state = "failed" + logger.error( + "Migration failed: %d/%d objects could not be copied; old backend remains active", + _status.failed, + _status.total, + ) + return + + # All objects verified — atomically switch + _status.state = "switching" + switch_backend(new_backend) + + # Remove all objects from old backend (best-effort) + _status.state = "cleaning" + for bucket, key in all_objects: + try: + await old_backend.delete(bucket, key) + except Exception as exc: + logger.warning("Cleanup failed for %s/%s: %s", bucket, key, exc) + + _status.state = "done" + logger.info("Migration complete: %d objects moved to %s", _status.total, new_backend.driver_name) + + except Exception as exc: + _status.state = "failed" + _status.errors.append(f"Unexpected error: {exc}") + logger.exception("Migration aborted with unexpected error") diff --git a/features/storage-service/pyproject.toml b/features/storage-service/pyproject.toml new file mode 100644 index 0000000..070a75b --- /dev/null +++ b/features/storage-service/pyproject.toml @@ -0,0 +1,25 @@ +[build-system] +requires = ["setuptools>=45"] +build-backend = "setuptools.build_meta" + +[project] +name = "storage-service" +version = "0.1.0" +requires-python = ">=3.11" +dependencies = [ + "fastapi>=0.111", + "uvicorn[standard]>=0.29", + "pydantic-settings>=2.2", + "aiofiles>=23.0", + "aiobotocore>=2.13", + "aiohttp>=3.9", + "defusedxml>=0.7", +] + +[project.optional-dependencies] +dev = [ + "ruff>=0.4", +] + +[tool.ruff] +line-length = 100 diff --git a/features/storage-service/scripts/start.sh b/features/storage-service/scripts/start.sh new file mode 100755 index 0000000..f450a96 --- /dev/null +++ b/features/storage-service/scripts/start.sh @@ -0,0 +1,5 @@ +#!/bin/sh +set -e + +echo "[storage-service] starting uvicorn..." +exec uvicorn app.main:app --host 0.0.0.0 --port 8020 diff --git a/features/storage-service/scripts/start_dev.sh b/features/storage-service/scripts/start_dev.sh new file mode 100755 index 0000000..e8323ee --- /dev/null +++ b/features/storage-service/scripts/start_dev.sh @@ -0,0 +1,5 @@ +#!/bin/sh +set -e + +echo "[storage-service] starting uvicorn (dev)..." +exec uvicorn app.main:app --host 0.0.0.0 --port 8020 --reload diff --git a/frontend/CLAUDE.md b/frontend/CLAUDE.md index ec69e2f..874b586 100644 --- a/frontend/CLAUDE.md +++ b/frontend/CLAUDE.md @@ -37,7 +37,8 @@ frontend/ │ │ └── ui/ ← shadcn/ui components (Button, Input, …) │ ├── pages/ ← One file per route │ │ ├── DocServiceSettingsPage.tsx ← Combined doc-service settings: upload limits + watch directory -│ │ └── PluginSettingsPage.tsx ← Generic plugin settings page driven by manifest +│ │ ├── PluginSettingsPage.tsx ← Generic plugin settings page driven by manifest +│ │ └── StorageAdminPage.tsx ← Admin storage backend config + live migration progress │ ├── lib/utils.ts ← cn() = clsx + tailwind-merge │ └── styles/theme.css ← CSS custom properties, Tailwind setup ├── vite.config.ts ← /api/* proxied to backend:8000 @@ -66,6 +67,7 @@ frontend/ | `/admin/users` | `AdminUsersPage` | AdminRoute | | `/admin/groups` | `AdminGroupsPage` | AdminRoute | | `/admin/appearance` | `AdminAppearancePage` | AdminRoute | +| `/admin/storage` | `StorageAdminPage` | AdminRoute | | `*` | redirect to `/` | — | `PrivateRoute` — checks `token` from `useAuth`, redirects to `/login` if absent. diff --git a/frontend/STATUS.md b/frontend/STATUS.md index 5facebb..2c23acb 100644 --- a/frontend/STATUS.md +++ b/frontend/STATUS.md @@ -21,6 +21,7 @@ All API calls go through `src/api/client.ts` (single Axios instance, JWT injecte | `/admin` | `AdminPage` (redirects to `/admin/users`) | Admin only | | `/admin/users` | `AdminUsersPage` | Admin only | | `/admin/groups` | `AdminGroupsPage` | Admin only | +| `/admin/storage` | `StorageAdminPage` | Admin only | | `/profile` | `ProfilePage` | Required | | `/settings` | `SettingsPage` (placeholder) | Required | | `/settings/plugins/:id` | `PluginSettingsPage` | Required (per-plugin access control) | @@ -114,6 +115,10 @@ Provider selector, per-provider fields, Test Connection, Save. Upload limits + watch directory config. +### Admin — Storage page (`/admin/storage`) + +Current backend status (green/red health dot). Driver selector (local/S3/WebDAV) with conditional credential fields. "Test & Migrate" button triggers an async migration that copies all objects to the new backend, verifies, then switches atomically. Live progress bar with 2s polling (states: validating → migrating → switching → cleaning → done). Cancel button during in-progress migrations. + ### Admin — Users page (`/admin/users`) User list, toggle active, create user, delete user. @@ -202,6 +207,7 @@ Key document-related functions: - [x] AI suggestion confirm/reject UI (folder + filename) - [x] Groups admin UI - [x] Replace Axios with native fetch; add global 401 → `/login` redirect for expired sessions +- [x] Admin storage page with live migration progress bar - [ ] Toast notification system - [ ] Loading skeletons - [ ] Cmd+K global search (`CommandDialog`) diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index 561c405..a24e6a6 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -16,6 +16,7 @@ import DocServiceSettingsPage from "./pages/DocServiceSettingsPage"; import AIAdminSettingsPage from "./pages/AIAdminSettingsPage"; import SettingsPage from "./pages/SettingsPage"; import PluginSettingsPage from "./pages/PluginSettingsPage"; +import StorageAdminPage from "./pages/StorageAdminPage"; function PrivateRoute({ children }: { children: React.ReactNode }) { const { token } = useAuth(); @@ -102,6 +103,7 @@ export default function App() { } /> } /> } /> + } /> {/* Catch-all */} } /> diff --git a/frontend/src/api/client.ts b/frontend/src/api/client.ts index 3205c11..742b24f 100644 --- a/frontend/src/api/client.ts +++ b/frontend/src/api/client.ts @@ -585,6 +585,49 @@ export interface PluginManifest { }; } +// ── Storage admin ────────────────────────────────────────────────────────────── + +export interface StorageStatus { + status: string; + backend: string; +} + +export interface MigrationStatus { + state: + | "idle" + | "validating" + | "migrating" + | "switching" + | "cleaning" + | "done" + | "failed" + | "cancelled"; + total: number; + done: number; + failed: number; + errors: string[]; +} + +export interface StorageBackendConfig { + driver: string; + config: Record; +} + +export const getStorageConfig = () => api.get("/admin/storage-config"); + +export const updateStorageConfig = (body: StorageBackendConfig) => + api.patch("/admin/storage-config", body); + +export const startStorageMigration = (body: StorageBackendConfig) => + api.post<{ status: string; driver: string }>("/admin/storage-config/migrate", body); + +export const getMigrationStatus = () => + api.get("/admin/storage-config/migrate/status"); + +export const cancelMigration = () => api.delete("/admin/storage-config/migrate"); + +// ── Plugins ──────────────────────────────────────────────────────────────────── + export const getPlugins = () => api.get("/plugins"); export const getPluginManifest = (id: string) => diff --git a/frontend/src/pages/StorageAdminPage.tsx b/frontend/src/pages/StorageAdminPage.tsx new file mode 100644 index 0000000..d27231a --- /dev/null +++ b/frontend/src/pages/StorageAdminPage.tsx @@ -0,0 +1,381 @@ +import { useState } from "react"; +import { useQuery, useMutation, useQueryClient } from "@tanstack/react-query"; +import { + getStorageConfig, + getMigrationStatus, + startStorageMigration, + cancelMigration, + type StorageBackendConfig, + type MigrationStatus, +} from "../api/client"; + +type Driver = "local" | "s3" | "webdav"; + +function Section({ title, children }: { title: string; children: React.ReactNode }) { + return ( +
+

{title}

+ {children} +
+ ); +} + +function Field({ label, children }: { label: string; children: React.ReactNode }) { + return ( +
+ + {children} +
+ ); +} + +function inputStyle(disabled = false): React.CSSProperties { + return { + width: "100%", + padding: "6px 10px", + border: "1px solid var(--color-border)", + borderRadius: 6, + fontSize: 14, + background: disabled ? "var(--color-surface)" : "var(--color-background)", + color: "var(--color-text-primary)", + opacity: disabled ? 0.7 : 1, + }; +} + +function MigrationProgressBar({ status }: { status: MigrationStatus }) { + const pct = status.total > 0 ? Math.round((status.done / status.total) * 100) : 0; + const isBusy = ["validating", "migrating", "switching", "cleaning"].includes(status.state); + + const stateLabel: Record = { + idle: "Idle", + validating: "Validating new backend…", + migrating: `Migrating — ${status.done} / ${status.total} objects`, + switching: "Switching active backend…", + cleaning: "Cleaning up old backend…", + done: "Migration complete", + failed: "Migration failed", + cancelled: "Migration cancelled", + }; + + return ( +
+
+ + {stateLabel[status.state] ?? status.state} + + {isBusy && {pct}%} +
+ {(isBusy || status.state === "done") && ( +
+
+
+ )} + {status.errors.length > 0 && ( +
+ {status.errors.slice(0, 10).map((e, i) => ( +
+ {e} +
+ ))} + {status.errors.length > 10 && ( +
…and {status.errors.length - 10} more
+ )} +
+ )} +
+ ); +} + +export default function StorageAdminPage() { + const queryClient = useQueryClient(); + + const { data: storageStatus } = useQuery({ + queryKey: ["storage-config"], + queryFn: getStorageConfig, + refetchInterval: 10_000, + }); + + const { data: migStatus, refetch: refetchMig } = useQuery({ + queryKey: ["migration-status"], + queryFn: getMigrationStatus, + refetchInterval: (query) => { + const state = query.state.data?.state; + return state && ["validating", "migrating", "switching", "cleaning"].includes(state) + ? 2_000 + : false; + }, + }); + + const isMigrating = + migStatus && + ["validating", "migrating", "switching", "cleaning"].includes(migStatus.state); + + // ── New backend form ───────────────────────────────────────────────────────── + const [driver, setDriver] = useState("local"); + const [s3EndpointUrl, setS3EndpointUrl] = useState(""); + const [s3AccessKey, setS3AccessKey] = useState(""); + const [s3SecretKey, setS3SecretKey] = useState(""); + const [s3Region, setS3Region] = useState("us-east-1"); + const [webdavUrl, setWebdavUrl] = useState(""); + const [webdavUsername, setWebdavUsername] = useState(""); + const [webdavPassword, setWebdavPassword] = useState(""); + const [webdavRootPath, setWebdavRootPath] = useState("/"); + const [error, setError] = useState(""); + + function buildConfig(): StorageBackendConfig { + if (driver === "s3") { + return { + driver, + config: { + endpoint_url: s3EndpointUrl, + access_key: s3AccessKey, + secret_key: s3SecretKey, + region: s3Region, + }, + }; + } + if (driver === "webdav") { + return { + driver, + config: { + url: webdavUrl, + username: webdavUsername, + password: webdavPassword, + root_path: webdavRootPath, + }, + }; + } + return { driver: "local", config: {} }; + } + + const migrateMutation = useMutation({ + mutationFn: startStorageMigration, + onSuccess: () => { + setError(""); + refetchMig(); + }, + onError: (e: Error) => setError(e.message), + }); + + const cancelMutation = useMutation({ + mutationFn: cancelMigration, + onSuccess: () => { + queryClient.invalidateQueries({ queryKey: ["migration-status"] }); + queryClient.invalidateQueries({ queryKey: ["storage-config"] }); + }, + onError: (e: Error) => setError(e.message), + }); + + const currentDriver = storageStatus?.backend ?? "—"; + + return ( +
+

Storage

+

+ All uploaded files are stored through the storage-service. Switch between local filesystem, + S3-compatible cloud storage, or WebDAV (Nextcloud). +

+ +
+
+ + {currentDriver} + {storageStatus?.status === "ok" ? " — healthy" : " — unreachable"} +
+
+ +
+

+ When you click Test & Migrate, all existing files will be copied to the + new backend, verified, and the switch will happen only after every file is confirmed. The + old backend is cleaned up automatically. +

+ + + + + + {driver === "s3" && ( + <> + + setS3EndpointUrl(e.target.value)} + placeholder="http://minio:9000" + disabled={!!isMigrating} + style={inputStyle(!!isMigrating)} + /> + + + setS3AccessKey(e.target.value)} + disabled={!!isMigrating} + style={inputStyle(!!isMigrating)} + /> + + + setS3SecretKey(e.target.value)} + disabled={!!isMigrating} + style={inputStyle(!!isMigrating)} + /> + + + setS3Region(e.target.value)} + placeholder="us-east-1" + disabled={!!isMigrating} + style={inputStyle(!!isMigrating)} + /> + + + )} + + {driver === "webdav" && ( + <> + + setWebdavUrl(e.target.value)} + placeholder="https://nextcloud.example.com" + disabled={!!isMigrating} + style={inputStyle(!!isMigrating)} + /> + + + setWebdavUsername(e.target.value)} + disabled={!!isMigrating} + style={inputStyle(!!isMigrating)} + /> + + + setWebdavPassword(e.target.value)} + disabled={!!isMigrating} + style={inputStyle(!!isMigrating)} + /> + + + setWebdavRootPath(e.target.value)} + placeholder="/remote.php/dav/files/username" + disabled={!!isMigrating} + style={inputStyle(!!isMigrating)} + /> + + + )} + + {error && ( +

{error}

+ )} + +
+ + + {isMigrating && ( + + )} +
+ + {migStatus && migStatus.state !== "idle" && ( + + )} +
+
+ ); +} diff --git a/tests/ALL_TESTS.md b/tests/ALL_TESTS.md index 3f439fe..a56c3f4 100644 --- a/tests/ALL_TESTS.md +++ b/tests/ALL_TESTS.md @@ -1,11 +1,12 @@ # ALL_TESTS — Full Test Suite -Complete test suite covering all 19 feature areas. Run tests relevant to the changed area before merging any feature branch into `main`. Service-specific subsets live in separate files: +Complete test suite covering all 20 feature areas. Run tests relevant to the changed area before merging any feature branch into `main`. Service-specific subsets live in separate files: - `tests/backend_tests.md` — §1–9, §18 (auth, users, admin, groups, appearance, service health, plugins, AI/doc settings, infra/security) - `tests/frontend_tests.md` — §19 (UI & routing) - `tests/doc-service_tests.md` — §10–16 (upload/processing, list/filtering, slide-over, sharing, categories, bulk actions, watch directory) - `tests/ai-service_tests.md` — §17 (AI queue & providers) +- `tests/storage-service_tests.md` — §20 (storage-service: objects, backend switching, migration) Every test describes the exact UI action or API call to perform and the expected outcome. @@ -351,3 +352,30 @@ Mark each row before opening the PR. | 19.9 | TanStack Query cache | Navigate away from docs → back | List loads from cache instantly; background refetch runs | | 19.10 | 30s service poll | Leave `/apps` open for 30s | `GET /api/services` fires again in network tab | | 19.11 | Three-dots menu not clipped | Scroll document table → open three-dot actions on any row | Dropdown renders above the table's overflow-hidden container; not cut off | + +--- + +## 20. Storage Service + +| # | Test | Steps | Expected | +|---|------|-------|----------| +| 20.1 | Upload object | `PUT /objects/documents/test/file.pdf` with binary body | 204; object stored | +| 20.2 | Download object | `GET /objects/documents/test/file.pdf` after 20.1 | 200; binary content matches upload | +| 20.3 | Delete object | `DELETE /objects/documents/test/file.pdf` | 204; subsequent GET returns 404 | +| 20.4 | List bucket | `GET /objects/documents` | 200; JSON array of keys includes `test/file.pdf` | +| 20.5 | Health endpoint | `GET /health` | `{"status":"ok","backend":"local"}` | +| 20.6 | Path traversal rejected | `PUT /objects/documents/../etc/passwd` | 400 | +| 20.7 | PDF upload via UI | Upload a PDF document | File stored in storage-service under `documents/{user_id}/{doc_id}.pdf`; `doc_data` volume absent | +| 20.8 | PDF download via UI | Download a previously uploaded PDF | File streams correctly from storage-service | +| 20.9 | Document delete via UI | Delete a document | `DELETE /objects/documents/{key}` called; storage-service key gone | +| 20.10 | Config persistence | Restart all containers | `doc_service_config.json` and AI config survive restart in storage-service config bucket | +| 20.11 | Admin storage page | Navigate to `/admin/storage` as admin | Page loads; current backend shows "local — healthy" | +| 20.12 | Non-admin storage page blocked | Navigate to `/admin/storage` as non-admin | Redirected to `/login` | +| 20.13 | Start migration — local to local | Select "Local filesystem" and click "Test & Migrate" | 400 or migration completes instantly; no data loss | +| 20.14 | Migration progress poll | Start a migration | Status badge updates every ~2 s: validating → migrating → done | +| 20.15 | Cancel migration | Start migration; immediately click Cancel | Migration state becomes "cancelled"; old backend remains active | +| 20.16 | Migration conflict | Start a migration while one is running | 409 "A migration is already in progress" | +| 20.17 | Migration — switch to S3 | Configure MinIO credentials; click "Test & Migrate" | All objects copied to S3 bucket; `GET /health` reports `backend: s3`; old local files gone | +| 20.18 | No doc_data volume | `docker volume ls` after full stack up | `doc_data` volume absent | +| 20.19 | No app_config volume | `docker volume ls` after full stack up | `app_config` volume absent | +| 20.20 | Only storage_data volume | Verify `storage_data` volume exists | `docker volume ls` shows `storage_data`; all config and documents in it | diff --git a/tests/storage-service_tests.md b/tests/storage-service_tests.md new file mode 100644 index 0000000..926c76f --- /dev/null +++ b/tests/storage-service_tests.md @@ -0,0 +1,32 @@ +# Storage Service Tests — §20 + +Storage-service tests. Run these before merging any change that touches `features/storage-service/`, `docker-compose.yml` storage volumes, or storage-related backend/doc-service code. + +See `tests/ALL_TESTS.md` for the full suite and legend. + +--- + +## 20. Storage Service + +| # | Test | Steps | Expected | +|---|------|-------|----------| +| 20.1 | Upload object | `PUT /objects/documents/test/file.pdf` with binary body | 204; object stored | +| 20.2 | Download object | `GET /objects/documents/test/file.pdf` after 20.1 | 200; binary content matches upload | +| 20.3 | Delete object | `DELETE /objects/documents/test/file.pdf` | 204; subsequent GET returns 404 | +| 20.4 | List bucket | `GET /objects/documents` | 200; JSON array of keys includes `test/file.pdf` | +| 20.5 | Health endpoint | `GET /health` | `{"status":"ok","backend":"local"}` | +| 20.6 | Path traversal rejected | `PUT /objects/documents/../etc/passwd` | 400 | +| 20.7 | PDF upload via UI | Upload a PDF document | File stored in storage-service under `documents/{user_id}/{doc_id}.pdf`; `doc_data` volume absent | +| 20.8 | PDF download via UI | Download a previously uploaded PDF | File streams correctly from storage-service | +| 20.9 | Document delete via UI | Delete a document | `DELETE /objects/documents/{key}` called; storage-service key gone | +| 20.10 | Config persistence | Restart all containers | `doc_service_config.json` and AI config survive restart in storage-service config bucket | +| 20.11 | Admin storage page | Navigate to `/admin/storage` as admin | Page loads; current backend shows "local — healthy" | +| 20.12 | Non-admin storage page blocked | Navigate to `/admin/storage` as non-admin | Redirected to `/login` | +| 20.13 | Start migration — local to local | Select "Local filesystem" and click "Test & Migrate" | 400 or migration completes instantly; no data loss | +| 20.14 | Migration progress poll | Start a migration | Status badge updates every ~2 s: validating → migrating → done | +| 20.15 | Cancel migration | Start migration; immediately click Cancel | Migration state becomes "cancelled"; old backend remains active | +| 20.16 | Migration conflict | Start a migration while one is running | 409 "A migration is already in progress" | +| 20.17 | Migration — switch to S3 | Configure MinIO credentials; click "Test & Migrate" | All objects copied to S3 bucket; `GET /health` reports `backend: s3`; old local files gone | +| 20.18 | No doc_data volume | `docker volume ls` after full stack up | `doc_data` volume absent | +| 20.19 | No app_config volume | `docker volume ls` after full stack up | `app_config` volume absent | +| 20.20 | Only storage_data volume | Verify `storage_data` volume exists | `docker volume ls` shows `storage_data`; all config and documents in it |