"""Unit tests for the daemon core — PluginSupervisor and IPC handler dispatch.""" from __future__ import annotations import asyncio import pytest from pyra.daemon.core import PluginSupervisor, _make_ipc_handler # ── Helpers ─────────────────────────────────────────────────────────────────── async def _drain(n: int = 20) -> None: """Yield to the event loop n times to let scheduled tasks run.""" for _ in range(n): await asyncio.sleep(0) # ── PluginSupervisor — lifecycle ────────────────────────────────────────────── async def test_supervisor_empty_starts_and_stops_cleanly() -> None: sup = PluginSupervisor() await sup.start() await sup.stop() assert sup.status() == [] async def test_supervisor_runs_task_to_completion() -> None: done = asyncio.Event() async def task(): done.set() sup = PluginSupervisor() sup._RESTART_DELAY = 0.0 sup.add_task("t", task) await sup.start() await asyncio.wait_for(done.wait(), timeout=1.0) await sup.stop() assert sup._records[0].restart_count == 0 assert sup._records[0].last_error is None async def test_supervisor_restarts_crashed_task() -> None: call_count = 0 completed = asyncio.Event() async def flaky(): nonlocal call_count call_count += 1 if call_count == 1: raise RuntimeError("first call fails") completed.set() sup = PluginSupervisor() sup._RESTART_DELAY = 0.0 sup.add_task("flaky", flaky) await sup.start() await asyncio.wait_for(completed.wait(), timeout=1.0) await sup.stop() assert sup._records[0].restart_count == 1 assert "RuntimeError" in (sup._records[0].last_error or "") async def test_supervisor_gives_up_after_max_restarts() -> None: async def always_fails(): raise ValueError("always") sup = PluginSupervisor() sup._RESTART_DELAY = 0.0 sup._MAX_RESTARTS = 3 sup.add_task("failing", always_fails) await sup.start() # Allow enough iterations for 3 restarts + give-up. for _ in range(200): await asyncio.sleep(0) if sup._records[0].task and sup._records[0].task.done(): break await sup.stop() assert sup._records[0].restart_count == 3 assert sup._records[0].last_error is not None # ── PluginSupervisor — status ───────────────────────────────────────────────── async def test_supervisor_status_returns_correct_shape() -> None: sup = PluginSupervisor() sup._RESTART_DELAY = 0.0 async def noop(): pass sup.add_task("noop", noop) await sup.start() await _drain() statuses = sup.status() assert len(statuses) == 1 s = statuses[0] assert set(s.keys()) == {"name", "alive", "restart_count", "last_error"} assert s["name"] == "noop" assert isinstance(s["alive"], bool) assert isinstance(s["restart_count"], int) await sup.stop() async def test_supervisor_status_empty_when_no_tasks() -> None: sup = PluginSupervisor() await sup.start() assert sup.status() == [] await sup.stop() # ── PluginSupervisor — reload ───────────────────────────────────────────────── async def test_supervisor_reload_restarts_tasks() -> None: call_count = 0 async def counting(): nonlocal call_count call_count += 1 # Hang until cancelled so reload can cancel it. await asyncio.sleep(10) sup = PluginSupervisor() sup._RESTART_DELAY = 0.0 sup.add_task("c", counting) await sup.start() await _drain() assert call_count == 1 await sup.reload() await _drain() # After reload, the task should have been restarted (called a second time). assert call_count == 2 assert sup._records[0].restart_count == 0 # reset by reload await sup.stop() async def test_supervisor_reload_resets_restart_count() -> None: call_count = 0 async def flaky(): nonlocal call_count call_count += 1 if call_count <= 2: raise RuntimeError("crash") await asyncio.sleep(10) sup = PluginSupervisor() sup._RESTART_DELAY = 0.0 sup.add_task("f", flaky) await sup.start() # Wait for 2 crashes to accumulate. for _ in range(200): await asyncio.sleep(0) if sup._records[0].restart_count >= 2: break assert sup._records[0].restart_count == 2 await sup.reload() # Reload must reset the counter. assert sup._records[0].restart_count == 0 await sup.stop() # ── IPC command handler ─────────────────────────────────────────────────────── async def test_ipc_handler_ping() -> None: sup = PluginSupervisor() handler = _make_ipc_handler(sup) resp = await handler({"cmd": "ping"}) assert resp["ok"] is True assert resp["data"]["pong"] is True async def test_ipc_handler_status_shape() -> None: sup = PluginSupervisor() handler = _make_ipc_handler(sup) resp = await handler({"cmd": "status"}) assert resp["ok"] is True assert "uptime" in resp["data"] assert "pid" in resp["data"] assert "tasks" in resp["data"] assert isinstance(resp["data"]["tasks"], list) async def test_ipc_handler_stop_signals_shutdown() -> None: sup = PluginSupervisor() handler = _make_ipc_handler(sup) assert not sup._shutdown.is_set() resp = await handler({"cmd": "stop"}) assert resp["ok"] is True assert sup._shutdown.is_set() async def test_ipc_handler_reload_returns_task_count() -> None: sup = PluginSupervisor() handler = _make_ipc_handler(sup) resp = await handler({"cmd": "reload"}) assert resp["ok"] is True assert resp["data"]["tasks_reloaded"] == 0 async def test_ipc_handler_unknown_command() -> None: sup = PluginSupervisor() handler = _make_ipc_handler(sup) resp = await handler({"cmd": "bogus"}) assert resp["ok"] is False assert "error" in resp["data"] assert "bogus" in resp["data"]["error"]