""" Background health-checker for registered feature services. Polls each service's /health endpoint every POLL_INTERVAL seconds and stores the result in an in-memory dict. Also fetches /plugin/manifest when available and caches it so the plugin proxy can serve it without per-request network calls. The REST layer reads from that dict — no DB, no blocking calls on the request path. """ import asyncio import logging from dataclasses import dataclass import httpx logger = logging.getLogger(__name__) POLL_INTERVAL = 30 # seconds @dataclass class ServiceDefinition: id: str name: str description: str internal_url: str # e.g. http://doc-service:8001 health_path: str = "/health" app_path: str = "" # frontend route; empty = no open button settings_path: str = "" # frontend admin-settings route # ── Registry ────────────────────────────────────────────────────────────────── # Add new services here. The internal_url is filled in at startup from settings. _REGISTRY: list[ServiceDefinition] = [] # id → True/False/None (None = not yet checked) _health: dict[str, bool | None] = {} # id → plugin manifest dict, or None if the service has no plugin manifest _manifests: dict[str, dict | None] = {} def register_services(doc_service_url: str, ai_service_url: str) -> None: """Called once during app startup to populate the registry from config.""" global _REGISTRY, _health, _manifests _REGISTRY = [ ServiceDefinition( id="doc-service", name="Documents", description="Upload PDF files, extract data, and organise them with categories.", internal_url=doc_service_url, health_path="/health", app_path="/apps/documents", settings_path="/apps/documents/settings", ), ServiceDefinition( id="ai-service", name="AI Service", description="Shared AI provider for all features. Configure model, credentials, and connection.", internal_url=ai_service_url, health_path="/health", app_path="", settings_path="/apps/ai/settings", ), ] _health = {svc.id: None for svc in _REGISTRY} _manifests = {svc.id: None for svc in _REGISTRY} logger.info("Service registry initialised with %d services", len(_REGISTRY)) # ── Health check logic ──────────────────────────────────────────────────────── async def _check_service(svc: ServiceDefinition) -> None: url = f"{svc.internal_url}{svc.health_path}" prev = _health.get(svc.id) try: async with httpx.AsyncClient(timeout=5.0) as client: resp = await client.get(url) healthy = resp.status_code == 200 except Exception as exc: logger.debug("Health check failed for %s: %s", svc.id, exc) healthy = False _health[svc.id] = healthy # Log only on transitions so the logs stay quiet during normal operation if prev != healthy: if healthy: logger.info("Service %s is now HEALTHY", svc.id) else: logger.warning("Service %s is now UNHEALTHY", svc.id) # Opportunistically fetch plugin manifest when the service is healthy if healthy: await _fetch_manifest(svc) async def _fetch_manifest(svc: ServiceDefinition) -> None: """Try to GET /plugin/manifest from the service; cache result (or None).""" url = f"{svc.internal_url}/plugin/manifest" try: async with httpx.AsyncClient(timeout=5.0) as client: resp = await client.get(url) if resp.status_code == 200: _manifests[svc.id] = resp.json() else: _manifests[svc.id] = None except Exception: # Service doesn't have a plugin manifest — not an error _manifests[svc.id] = None async def check_all() -> None: """Run health checks for all registered services concurrently.""" await asyncio.gather(*[_check_service(svc) for svc in _REGISTRY]) async def health_check_loop() -> None: """Runs forever; polls every POLL_INTERVAL seconds. Exceptions inside a single polling round are caught so the loop cannot be killed by a transient error. """ while True: try: await check_all() except Exception: logger.exception("Unexpected error during health check round; will retry") await asyncio.sleep(POLL_INTERVAL) # ── Public read API ─────────────────────────────────────────────────────────── def get_all_statuses() -> list[dict]: """Return the current health snapshot for all registered services.""" return [ { "id": svc.id, "name": svc.name, "description": svc.description, "app_path": svc.app_path, "settings_path": svc.settings_path, # None means not yet checked; treat as unhealthy for the UI "healthy": bool(_health.get(svc.id)), } for svc in _REGISTRY ] def get_cached_manifest(service_id: str) -> dict | None: """Return the cached plugin manifest for a service, or None if unavailable.""" return _manifests.get(service_id) def get_service_url(service_id: str) -> str | None: """Return the internal URL for a registered service, or None if unknown.""" for svc in _REGISTRY: if svc.id == service_id: return svc.internal_url return None def get_registry() -> list[ServiceDefinition]: """Return the current service registry (always up-to-date after register_services).""" return _REGISTRY