18a638bc3a
- Fix: list_plugins imported _REGISTRY as a direct reference to the empty list that existed at import time; register_services() replaces _REGISTRY with a new list so the imported reference was always []. Added get_registry() helper so callers access the live list via the module namespace. GET /api/plugins now correctly returns accessible plugins for the current user. - Fix: switch watchdog from InotifyObserver to PollingObserver. Inotify events from the macOS host are not forwarded through the Docker bind mount, so new files were only detected via the startup scan. PollingObserver (1s default interval) works reliably on all platforms including macOS+Docker bind mounts. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
170 lines
5.9 KiB
Python
170 lines
5.9 KiB
Python
"""
|
|
Background health-checker for registered feature services.
|
|
|
|
Polls each service's /health endpoint every POLL_INTERVAL seconds and stores
|
|
the result in an in-memory dict. Also fetches /plugin/manifest when available
|
|
and caches it so the plugin proxy can serve it without per-request network calls.
|
|
The REST layer reads from that dict — no DB, no blocking calls on the request path.
|
|
"""
|
|
import asyncio
|
|
import logging
|
|
from dataclasses import dataclass
|
|
|
|
import httpx
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
POLL_INTERVAL = 30 # seconds
|
|
|
|
|
|
@dataclass
|
|
class ServiceDefinition:
|
|
id: str
|
|
name: str
|
|
description: str
|
|
internal_url: str # e.g. http://doc-service:8001
|
|
health_path: str = "/health"
|
|
app_path: str = "" # frontend route; empty = no open button
|
|
settings_path: str = "" # frontend admin-settings route
|
|
|
|
|
|
# ── Registry ──────────────────────────────────────────────────────────────────
|
|
# Add new services here. The internal_url is filled in at startup from settings.
|
|
|
|
_REGISTRY: list[ServiceDefinition] = []
|
|
|
|
# id → True/False/None (None = not yet checked)
|
|
_health: dict[str, bool | None] = {}
|
|
|
|
# id → plugin manifest dict, or None if the service has no plugin manifest
|
|
_manifests: dict[str, dict | None] = {}
|
|
|
|
|
|
def register_services(doc_service_url: str, ai_service_url: str) -> None:
|
|
"""Called once during app startup to populate the registry from config."""
|
|
global _REGISTRY, _health, _manifests
|
|
|
|
_REGISTRY = [
|
|
ServiceDefinition(
|
|
id="doc-service",
|
|
name="Documents",
|
|
description="Upload PDF files, extract data, and organise them with categories.",
|
|
internal_url=doc_service_url,
|
|
health_path="/health",
|
|
app_path="/apps/documents",
|
|
settings_path="/apps/documents/settings/admin",
|
|
),
|
|
ServiceDefinition(
|
|
id="ai-service",
|
|
name="AI Service",
|
|
description="Shared AI provider for all features. Configure model, credentials, and connection.",
|
|
internal_url=ai_service_url,
|
|
health_path="/health",
|
|
app_path="",
|
|
settings_path="/apps/ai/settings/admin",
|
|
),
|
|
]
|
|
|
|
_health = {svc.id: None for svc in _REGISTRY}
|
|
_manifests = {svc.id: None for svc in _REGISTRY}
|
|
logger.info("Service registry initialised with %d services", len(_REGISTRY))
|
|
|
|
|
|
# ── Health check logic ────────────────────────────────────────────────────────
|
|
|
|
|
|
async def _check_service(svc: ServiceDefinition) -> None:
|
|
url = f"{svc.internal_url}{svc.health_path}"
|
|
prev = _health.get(svc.id)
|
|
try:
|
|
async with httpx.AsyncClient(timeout=5.0) as client:
|
|
resp = await client.get(url)
|
|
healthy = resp.status_code == 200
|
|
except Exception as exc:
|
|
logger.debug("Health check failed for %s: %s", svc.id, exc)
|
|
healthy = False
|
|
|
|
_health[svc.id] = healthy
|
|
|
|
# Log only on transitions so the logs stay quiet during normal operation
|
|
if prev != healthy:
|
|
if healthy:
|
|
logger.info("Service %s is now HEALTHY", svc.id)
|
|
else:
|
|
logger.warning("Service %s is now UNHEALTHY", svc.id)
|
|
|
|
# Opportunistically fetch plugin manifest when the service is healthy
|
|
if healthy:
|
|
await _fetch_manifest(svc)
|
|
|
|
|
|
async def _fetch_manifest(svc: ServiceDefinition) -> None:
|
|
"""Try to GET /plugin/manifest from the service; cache result (or None)."""
|
|
url = f"{svc.internal_url}/plugin/manifest"
|
|
try:
|
|
async with httpx.AsyncClient(timeout=5.0) as client:
|
|
resp = await client.get(url)
|
|
if resp.status_code == 200:
|
|
_manifests[svc.id] = resp.json()
|
|
else:
|
|
_manifests[svc.id] = None
|
|
except Exception:
|
|
# Service doesn't have a plugin manifest — not an error
|
|
_manifests[svc.id] = None
|
|
|
|
|
|
async def check_all() -> None:
|
|
"""Run health checks for all registered services concurrently."""
|
|
await asyncio.gather(*[_check_service(svc) for svc in _REGISTRY])
|
|
|
|
|
|
async def health_check_loop() -> None:
|
|
"""Runs forever; polls every POLL_INTERVAL seconds.
|
|
|
|
Exceptions inside a single polling round are caught so the loop cannot
|
|
be killed by a transient error.
|
|
"""
|
|
while True:
|
|
try:
|
|
await check_all()
|
|
except Exception:
|
|
logger.exception("Unexpected error during health check round; will retry")
|
|
await asyncio.sleep(POLL_INTERVAL)
|
|
|
|
|
|
# ── Public read API ───────────────────────────────────────────────────────────
|
|
|
|
|
|
def get_all_statuses() -> list[dict]:
|
|
"""Return the current health snapshot for all registered services."""
|
|
return [
|
|
{
|
|
"id": svc.id,
|
|
"name": svc.name,
|
|
"description": svc.description,
|
|
"app_path": svc.app_path,
|
|
"settings_path": svc.settings_path,
|
|
# None means not yet checked; treat as unhealthy for the UI
|
|
"healthy": bool(_health.get(svc.id)),
|
|
}
|
|
for svc in _REGISTRY
|
|
]
|
|
|
|
|
|
def get_cached_manifest(service_id: str) -> dict | None:
|
|
"""Return the cached plugin manifest for a service, or None if unavailable."""
|
|
return _manifests.get(service_id)
|
|
|
|
|
|
def get_service_url(service_id: str) -> str | None:
|
|
"""Return the internal URL for a registered service, or None if unknown."""
|
|
for svc in _REGISTRY:
|
|
if svc.id == service_id:
|
|
return svc.internal_url
|
|
return None
|
|
|
|
|
|
def get_registry() -> list[ServiceDefinition]:
|
|
"""Return the current service registry (always up-to-date after register_services)."""
|
|
return _REGISTRY
|