From 1d5d0387d909637489186d2d0f0e86d1ac202c77 Mon Sep 17 00:00:00 2001 From: curo1305 Date: Tue, 19 May 2026 15:57:49 +0200 Subject: [PATCH] test(daemon): add supervisor and IPC handler tests 13 async tests covering: supervisor lifecycle (start/stop), task completion, crash-and-restart, max-restart enforcement, status shape, reload (task restart + counter reset), and IPC handler dispatch for all 4 commands plus unknown commands. Co-Authored-By: Claude Sonnet 4.6 --- tests/unit/test_daemon_core.py | 226 +++++++++++++++++++++++++++++++++ 1 file changed, 226 insertions(+) create mode 100644 tests/unit/test_daemon_core.py diff --git a/tests/unit/test_daemon_core.py b/tests/unit/test_daemon_core.py new file mode 100644 index 0000000..2525c64 --- /dev/null +++ b/tests/unit/test_daemon_core.py @@ -0,0 +1,226 @@ +"""Unit tests for the daemon core — PluginSupervisor and IPC handler dispatch.""" + +from __future__ import annotations + +import asyncio + +import pytest + +from pyra.daemon.core import PluginSupervisor, _make_ipc_handler + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + +async def _drain(n: int = 20) -> None: + """Yield to the event loop n times to let scheduled tasks run.""" + for _ in range(n): + await asyncio.sleep(0) + + +# ── PluginSupervisor — lifecycle ────────────────────────────────────────────── + +async def test_supervisor_empty_starts_and_stops_cleanly() -> None: + sup = PluginSupervisor() + await sup.start() + await sup.stop() + assert sup.status() == [] + + +async def test_supervisor_runs_task_to_completion() -> None: + done = asyncio.Event() + + async def task(): + done.set() + + sup = PluginSupervisor() + sup._RESTART_DELAY = 0.0 + sup.add_task("t", task) + await sup.start() + + await asyncio.wait_for(done.wait(), timeout=1.0) + await sup.stop() + + assert sup._records[0].restart_count == 0 + assert sup._records[0].last_error is None + + +async def test_supervisor_restarts_crashed_task() -> None: + call_count = 0 + completed = asyncio.Event() + + async def flaky(): + nonlocal call_count + call_count += 1 + if call_count == 1: + raise RuntimeError("first call fails") + completed.set() + + sup = PluginSupervisor() + sup._RESTART_DELAY = 0.0 + sup.add_task("flaky", flaky) + await sup.start() + + await asyncio.wait_for(completed.wait(), timeout=1.0) + await sup.stop() + + assert sup._records[0].restart_count == 1 + assert "RuntimeError" in (sup._records[0].last_error or "") + + +async def test_supervisor_gives_up_after_max_restarts() -> None: + async def always_fails(): + raise ValueError("always") + + sup = PluginSupervisor() + sup._RESTART_DELAY = 0.0 + sup._MAX_RESTARTS = 3 + sup.add_task("failing", always_fails) + await sup.start() + + # Allow enough iterations for 3 restarts + give-up. + for _ in range(200): + await asyncio.sleep(0) + if sup._records[0].task and sup._records[0].task.done(): + break + + await sup.stop() + + assert sup._records[0].restart_count == 3 + assert sup._records[0].last_error is not None + + +# ── PluginSupervisor — status ───────────────────────────────────────────────── + +async def test_supervisor_status_returns_correct_shape() -> None: + sup = PluginSupervisor() + sup._RESTART_DELAY = 0.0 + + async def noop(): + pass + + sup.add_task("noop", noop) + await sup.start() + await _drain() + + statuses = sup.status() + assert len(statuses) == 1 + s = statuses[0] + assert set(s.keys()) == {"name", "alive", "restart_count", "last_error"} + assert s["name"] == "noop" + assert isinstance(s["alive"], bool) + assert isinstance(s["restart_count"], int) + + await sup.stop() + + +async def test_supervisor_status_empty_when_no_tasks() -> None: + sup = PluginSupervisor() + await sup.start() + assert sup.status() == [] + await sup.stop() + + +# ── PluginSupervisor — reload ───────────────────────────────────────────────── + +async def test_supervisor_reload_restarts_tasks() -> None: + call_count = 0 + + async def counting(): + nonlocal call_count + call_count += 1 + # Hang until cancelled so reload can cancel it. + await asyncio.sleep(10) + + sup = PluginSupervisor() + sup._RESTART_DELAY = 0.0 + sup.add_task("c", counting) + await sup.start() + + await _drain() + assert call_count == 1 + + await sup.reload() + await _drain() + + # After reload, the task should have been restarted (called a second time). + assert call_count == 2 + assert sup._records[0].restart_count == 0 # reset by reload + + await sup.stop() + + +async def test_supervisor_reload_resets_restart_count() -> None: + call_count = 0 + + async def flaky(): + nonlocal call_count + call_count += 1 + if call_count <= 2: + raise RuntimeError("crash") + await asyncio.sleep(10) + + sup = PluginSupervisor() + sup._RESTART_DELAY = 0.0 + sup.add_task("f", flaky) + await sup.start() + + # Wait for 2 crashes to accumulate. + for _ in range(200): + await asyncio.sleep(0) + if sup._records[0].restart_count >= 2: + break + + assert sup._records[0].restart_count == 2 + + await sup.reload() + # Reload must reset the counter. + assert sup._records[0].restart_count == 0 + + await sup.stop() + + +# ── IPC command handler ─────────────────────────────────────────────────────── + +async def test_ipc_handler_ping() -> None: + sup = PluginSupervisor() + handler = _make_ipc_handler(sup) + resp = await handler({"cmd": "ping"}) + assert resp["ok"] is True + assert resp["data"]["pong"] is True + + +async def test_ipc_handler_status_shape() -> None: + sup = PluginSupervisor() + handler = _make_ipc_handler(sup) + resp = await handler({"cmd": "status"}) + assert resp["ok"] is True + assert "uptime" in resp["data"] + assert "pid" in resp["data"] + assert "tasks" in resp["data"] + assert isinstance(resp["data"]["tasks"], list) + + +async def test_ipc_handler_stop_signals_shutdown() -> None: + sup = PluginSupervisor() + handler = _make_ipc_handler(sup) + assert not sup._shutdown.is_set() + resp = await handler({"cmd": "stop"}) + assert resp["ok"] is True + assert sup._shutdown.is_set() + + +async def test_ipc_handler_reload_returns_task_count() -> None: + sup = PluginSupervisor() + handler = _make_ipc_handler(sup) + resp = await handler({"cmd": "reload"}) + assert resp["ok"] is True + assert resp["data"]["tasks_reloaded"] == 0 + + +async def test_ipc_handler_unknown_command() -> None: + sup = PluginSupervisor() + handler = _make_ipc_handler(sup) + resp = await handler({"cmd": "bogus"}) + assert resp["ok"] is False + assert "error" in resp["data"] + assert "bogus" in resp["data"]["error"]