Files
curo1305 5349f21752 feat: add storage-service container with pluggable backends (Phase 1)
New FastAPI microservice (port 8020) providing unified blob storage via
PUT/GET/DELETE/LIST HTTP API. Local filesystem backend is the default (zero
extra deps). S3-compatible and WebDAV backends are built in. Backend is
switchable at runtime via POST /migrate, which copies all objects to the new
backend, verifies each one, atomically switches, then cleans up the old backend.

WebDAV XML parsing uses defusedxml to prevent XXE attacks.

Wired into docker-compose (storage_data volume) and registered in the backend
service-health poller as 'storage-service'.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-20 15:50:31 +02:00

179 lines
6.2 KiB
Python

"""
Background health-checker for registered feature services.
Polls each service's /health endpoint every POLL_INTERVAL seconds and stores
the result in an in-memory dict. Also fetches /plugin/manifest when available
and caches it so the plugin proxy can serve it without per-request network calls.
The REST layer reads from that dict — no DB, no blocking calls on the request path.
"""
import asyncio
import logging
from dataclasses import dataclass
import httpx
logger = logging.getLogger(__name__)
POLL_INTERVAL = 30 # seconds
@dataclass
class ServiceDefinition:
id: str
name: str
description: str
internal_url: str # e.g. http://doc-service:8001
health_path: str = "/health"
app_path: str = "" # frontend route; empty = no open button
settings_path: str = "" # frontend admin-settings route
# ── Registry ──────────────────────────────────────────────────────────────────
# Add new services here. The internal_url is filled in at startup from settings.
_REGISTRY: list[ServiceDefinition] = []
# id → True/False/None (None = not yet checked)
_health: dict[str, bool | None] = {}
# id → plugin manifest dict, or None if the service has no plugin manifest
_manifests: dict[str, dict | None] = {}
def register_services(doc_service_url: str, ai_service_url: str, storage_service_url: str) -> None:
"""Called once during app startup to populate the registry from config."""
global _REGISTRY, _health, _manifests
_REGISTRY = [
ServiceDefinition(
id="doc-service",
name="Documents",
description="Upload PDF files, extract data, and organise them with categories.",
internal_url=doc_service_url,
health_path="/health",
app_path="/apps/documents",
settings_path="/apps/documents/settings",
),
ServiceDefinition(
id="ai-service",
name="AI Service",
description="Shared AI provider for all features. Configure model, credentials, and connection.",
internal_url=ai_service_url,
health_path="/health",
app_path="",
settings_path="/apps/ai/settings",
),
ServiceDefinition(
id="storage-service",
name="Storage",
description="Unified file storage. Manages all uploaded files with pluggable backends (local, S3, WebDAV).",
internal_url=storage_service_url,
health_path="/health",
app_path="",
settings_path="/admin/storage",
),
]
_health = {svc.id: None for svc in _REGISTRY}
_manifests = {svc.id: None for svc in _REGISTRY}
logger.info("Service registry initialised with %d services", len(_REGISTRY))
# ── Health check logic ────────────────────────────────────────────────────────
async def _check_service(svc: ServiceDefinition) -> None:
url = f"{svc.internal_url}{svc.health_path}"
prev = _health.get(svc.id)
try:
async with httpx.AsyncClient(timeout=5.0) as client:
resp = await client.get(url)
healthy = resp.status_code == 200
except Exception as exc:
logger.debug("Health check failed for %s: %s", svc.id, exc)
healthy = False
_health[svc.id] = healthy
# Log only on transitions so the logs stay quiet during normal operation
if prev != healthy:
if healthy:
logger.info("Service %s is now HEALTHY", svc.id)
else:
logger.warning("Service %s is now UNHEALTHY", svc.id)
# Opportunistically fetch plugin manifest when the service is healthy
if healthy:
await _fetch_manifest(svc)
async def _fetch_manifest(svc: ServiceDefinition) -> None:
"""Try to GET /plugin/manifest from the service; cache result (or None)."""
url = f"{svc.internal_url}/plugin/manifest"
try:
async with httpx.AsyncClient(timeout=5.0) as client:
resp = await client.get(url)
if resp.status_code == 200:
_manifests[svc.id] = resp.json()
else:
_manifests[svc.id] = None
except Exception:
# Service doesn't have a plugin manifest — not an error
_manifests[svc.id] = None
async def check_all() -> None:
"""Run health checks for all registered services concurrently."""
await asyncio.gather(*[_check_service(svc) for svc in _REGISTRY])
async def health_check_loop() -> None:
"""Runs forever; polls every POLL_INTERVAL seconds.
Exceptions inside a single polling round are caught so the loop cannot
be killed by a transient error.
"""
while True:
try:
await check_all()
except Exception:
logger.exception("Unexpected error during health check round; will retry")
await asyncio.sleep(POLL_INTERVAL)
# ── Public read API ───────────────────────────────────────────────────────────
def get_all_statuses() -> list[dict]:
"""Return the current health snapshot for all registered services."""
return [
{
"id": svc.id,
"name": svc.name,
"description": svc.description,
"app_path": svc.app_path,
"settings_path": svc.settings_path,
# None means not yet checked; treat as unhealthy for the UI
"healthy": bool(_health.get(svc.id)),
}
for svc in _REGISTRY
]
def get_cached_manifest(service_id: str) -> dict | None:
"""Return the cached plugin manifest for a service, or None if unavailable."""
return _manifests.get(service_id)
def get_service_url(service_id: str) -> str | None:
"""Return the internal URL for a registered service, or None if unknown."""
for svc in _REGISTRY:
if svc.id == service_id:
return svc.internal_url
return None
def get_registry() -> list[ServiceDefinition]:
"""Return the current service registry (always up-to-date after register_services)."""
return _REGISTRY