""" Background health-checker for registered feature services. Polls each service's /health endpoint every POLL_INTERVAL seconds and stores the result in an in-memory dict. The REST layer reads from that dict — no DB, no blocking calls on the request path. """ import asyncio import logging from dataclasses import dataclass, field import httpx logger = logging.getLogger(__name__) POLL_INTERVAL = 30 # seconds @dataclass class ServiceDefinition: id: str name: str description: str internal_url: str # e.g. http://doc-service:8001 health_path: str = "/health" app_path: str = "" # frontend route; empty = no open button settings_path: str = "" # frontend admin-settings route # ── Registry ────────────────────────────────────────────────────────────────── # Add new services here. The internal_url is filled in at startup from settings. _REGISTRY: list[ServiceDefinition] = [] # id → True/False/None (None = not yet checked) _health: dict[str, bool | None] = {} def register_services(doc_service_url: str, ai_service_url: str) -> None: """Called once during app startup to populate the registry from config.""" global _REGISTRY, _health _REGISTRY = [ ServiceDefinition( id="doc-service", name="Documents", description="Upload PDF files, extract data, and organise them with categories.", internal_url=doc_service_url, health_path="/health", app_path="/apps/documents", settings_path="/apps/documents/settings/admin", ), ServiceDefinition( id="ai-service", name="AI Service", description="Shared AI provider for all features. Configure model, credentials, and connection.", internal_url=ai_service_url, health_path="/health", app_path="", settings_path="/apps/ai/settings/admin", ), ] _health = {svc.id: None for svc in _REGISTRY} # ── Health check logic ──────────────────────────────────────────────────────── async def _check_service(svc: ServiceDefinition) -> None: url = f"{svc.internal_url}{svc.health_path}" try: async with httpx.AsyncClient(timeout=5.0) as client: resp = await client.get(url) _health[svc.id] = resp.status_code == 200 except Exception: _health[svc.id] = False async def check_all() -> None: """Run health checks for all registered services concurrently.""" await asyncio.gather(*[_check_service(svc) for svc in _REGISTRY]) async def health_check_loop() -> None: """Runs forever; polls every POLL_INTERVAL seconds.""" while True: await check_all() await asyncio.sleep(POLL_INTERVAL) # ── Public read API ─────────────────────────────────────────────────────────── def get_all_statuses() -> list[dict]: """Return the current health snapshot for all registered services.""" return [ { "id": svc.id, "name": svc.name, "description": svc.description, "app_path": svc.app_path, "settings_path": svc.settings_path, # None means not yet checked; treat as unhealthy for the UI "healthy": bool(_health.get(svc.id)), } for svc in _REGISTRY ]