Files
Business-Management/backend/app/services/service_health.py
T
curo1305 18a638bc3a Fix plugin list bug and switch watcher to PollingObserver
- Fix: list_plugins imported _REGISTRY as a direct reference to the
  empty list that existed at import time; register_services() replaces
  _REGISTRY with a new list so the imported reference was always [].
  Added get_registry() helper so callers access the live list via the
  module namespace. GET /api/plugins now correctly returns accessible
  plugins for the current user.

- Fix: switch watchdog from InotifyObserver to PollingObserver. Inotify
  events from the macOS host are not forwarded through the Docker bind
  mount, so new files were only detected via the startup scan. PollingObserver
  (1s default interval) works reliably on all platforms including
  macOS+Docker bind mounts.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-18 02:25:16 +02:00

170 lines
5.9 KiB
Python

"""
Background health-checker for registered feature services.
Polls each service's /health endpoint every POLL_INTERVAL seconds and stores
the result in an in-memory dict. Also fetches /plugin/manifest when available
and caches it so the plugin proxy can serve it without per-request network calls.
The REST layer reads from that dict — no DB, no blocking calls on the request path.
"""
import asyncio
import logging
from dataclasses import dataclass
import httpx
logger = logging.getLogger(__name__)
POLL_INTERVAL = 30 # seconds
@dataclass
class ServiceDefinition:
id: str
name: str
description: str
internal_url: str # e.g. http://doc-service:8001
health_path: str = "/health"
app_path: str = "" # frontend route; empty = no open button
settings_path: str = "" # frontend admin-settings route
# ── Registry ──────────────────────────────────────────────────────────────────
# Add new services here. The internal_url is filled in at startup from settings.
_REGISTRY: list[ServiceDefinition] = []
# id → True/False/None (None = not yet checked)
_health: dict[str, bool | None] = {}
# id → plugin manifest dict, or None if the service has no plugin manifest
_manifests: dict[str, dict | None] = {}
def register_services(doc_service_url: str, ai_service_url: str) -> None:
"""Called once during app startup to populate the registry from config."""
global _REGISTRY, _health, _manifests
_REGISTRY = [
ServiceDefinition(
id="doc-service",
name="Documents",
description="Upload PDF files, extract data, and organise them with categories.",
internal_url=doc_service_url,
health_path="/health",
app_path="/apps/documents",
settings_path="/apps/documents/settings/admin",
),
ServiceDefinition(
id="ai-service",
name="AI Service",
description="Shared AI provider for all features. Configure model, credentials, and connection.",
internal_url=ai_service_url,
health_path="/health",
app_path="",
settings_path="/apps/ai/settings/admin",
),
]
_health = {svc.id: None for svc in _REGISTRY}
_manifests = {svc.id: None for svc in _REGISTRY}
logger.info("Service registry initialised with %d services", len(_REGISTRY))
# ── Health check logic ────────────────────────────────────────────────────────
async def _check_service(svc: ServiceDefinition) -> None:
url = f"{svc.internal_url}{svc.health_path}"
prev = _health.get(svc.id)
try:
async with httpx.AsyncClient(timeout=5.0) as client:
resp = await client.get(url)
healthy = resp.status_code == 200
except Exception as exc:
logger.debug("Health check failed for %s: %s", svc.id, exc)
healthy = False
_health[svc.id] = healthy
# Log only on transitions so the logs stay quiet during normal operation
if prev != healthy:
if healthy:
logger.info("Service %s is now HEALTHY", svc.id)
else:
logger.warning("Service %s is now UNHEALTHY", svc.id)
# Opportunistically fetch plugin manifest when the service is healthy
if healthy:
await _fetch_manifest(svc)
async def _fetch_manifest(svc: ServiceDefinition) -> None:
"""Try to GET /plugin/manifest from the service; cache result (or None)."""
url = f"{svc.internal_url}/plugin/manifest"
try:
async with httpx.AsyncClient(timeout=5.0) as client:
resp = await client.get(url)
if resp.status_code == 200:
_manifests[svc.id] = resp.json()
else:
_manifests[svc.id] = None
except Exception:
# Service doesn't have a plugin manifest — not an error
_manifests[svc.id] = None
async def check_all() -> None:
"""Run health checks for all registered services concurrently."""
await asyncio.gather(*[_check_service(svc) for svc in _REGISTRY])
async def health_check_loop() -> None:
"""Runs forever; polls every POLL_INTERVAL seconds.
Exceptions inside a single polling round are caught so the loop cannot
be killed by a transient error.
"""
while True:
try:
await check_all()
except Exception:
logger.exception("Unexpected error during health check round; will retry")
await asyncio.sleep(POLL_INTERVAL)
# ── Public read API ───────────────────────────────────────────────────────────
def get_all_statuses() -> list[dict]:
"""Return the current health snapshot for all registered services."""
return [
{
"id": svc.id,
"name": svc.name,
"description": svc.description,
"app_path": svc.app_path,
"settings_path": svc.settings_path,
# None means not yet checked; treat as unhealthy for the UI
"healthy": bool(_health.get(svc.id)),
}
for svc in _REGISTRY
]
def get_cached_manifest(service_id: str) -> dict | None:
"""Return the cached plugin manifest for a service, or None if unavailable."""
return _manifests.get(service_id)
def get_service_url(service_id: str) -> str | None:
"""Return the internal URL for a registered service, or None if unknown."""
for svc in _REGISTRY:
if svc.id == service_id:
return svc.internal_url
return None
def get_registry() -> list[ServiceDefinition]:
"""Return the current service registry (always up-to-date after register_services)."""
return _REGISTRY