8 Commits

Author SHA1 Message Date
curo1305 1cbb40ac93 chore(chat): tighten tool descriptions to reduce AI selection confusion
- plan_and_execute: restrict to 3+ step tasks; prevents over-triggering on simple requests
- memory_read: hint to call memory_lookup first to find the correct path

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 23:50:21 +02:00
curo1305 8d4917f7ca feat(daemon): add async event bus for inter-plugin notifications
Adds daemon/events.py — a lightweight asyncio.Queue-based publish/subscribe
bus that lets daemon tasks communicate without direct imports between plugins.
Email plugin publishes new_email events; messaging bots consume via
subscribe_forever(). Also adds email optional-dependency group to pyproject.toml
(imap-tools, google-api-python-client, google-auth-oauthlib, O365).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 23:12:17 +02:00
curo1305 bde0856979 feat(daemon): Stage 6 daemon infrastructure
Always-on asyncio daemon with IPC socket, OS service install/uninstall
(launchd/systemd/schtasks), and plugin task supervisor.

- daemon/pid.py: atomic PID file, stale detection (POSIX + Windows)
- daemon/ipc.py: Unix socket (chmod 600, UID-checked) on Linux/macOS;
  TCP loopback + port file on Windows; newline-delimited JSON protocol
- daemon/service.py: launchd plist, systemd user unit, schtasks XML;
  auto-detects platform; finds pyra executable via shutil.which
- daemon/core.py: asyncio event loop, PluginSupervisor (per-task
  restart up to 10x with 5s back-off, reload), IPC command dispatch,
  SIGTERM/SIGHUP signal handling via get_running_loop()
- cli.py: all 7 daemon stubs replaced with real commands
- 376 tests passing (13 new supervisor + IPC handler tests)
2026-05-19 16:14:51 +02:00
curo1305 4744cf819b docs: update CLAUDE.md for Stage 6 daemon infrastructure
- Current Status: add Stage 6 daemon infrastructure in progress
- Architecture table: expand daemon/__init__.py stub to all 5 daemon modules
- Code Inventory: add daemon.core, daemon.pid, daemon.ipc, daemon.service
  sections with function signatures and purposes
- Internal classes: add PluginSupervisor and PidFile; expand DaemonConfig

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 16:10:49 +02:00
curo1305 1d5d0387d9 test(daemon): add supervisor and IPC handler tests
13 async tests covering: supervisor lifecycle (start/stop), task
completion, crash-and-restart, max-restart enforcement, status shape,
reload (task restart + counter reset), and IPC handler dispatch for all
4 commands plus unknown commands.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 15:57:49 +02:00
curo1305 db6ca6ee57 feat(daemon): implement reload, fix PID race condition
- PluginSupervisor.reload(): cancels all running plugin tasks, resets
  restart counters, and re-creates them with fresh coroutines
- IPC reload command now calls supervisor.reload() instead of being a stub
- run_foreground(): wrap PID file acquisition in try/except PidFileError
  to produce a clean error if two daemon starts race on the PID file

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 15:54:56 +02:00
curo1305 cc24257ab0 fix(daemon): close discarded coroutines in get_daemon_task_factories
The initial daemon_tasks() call to count tasks created coroutines that
were immediately discarded, triggering RuntimeWarning "coroutine never
awaited". Explicitly close them after counting.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 15:53:06 +02:00
curo1305 68f9007ef0 fix(daemon): install signal handlers inside running event loop
_install_signal_handlers() was called before asyncio.run(), registering
handlers on a throwaway loop that asyncio.get_event_loop() created — so
SIGTERM would never reach the supervisor. Move the call into _run_daemon()
and switch to asyncio.get_running_loop() so handlers are registered on the
actual running loop.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 15:52:11 +02:00
8 changed files with 370 additions and 20 deletions
+46 -5
View File
@@ -8,7 +8,8 @@ a plugin/integration system (Stage 2+) and an encrypted vault (Stage 3+).
## Current Status ## Current Status
**Stage 3 — Memory Database: complete** (2026-05-18) **Stage 3 — Memory Database: complete** (2026-05-18)
Next: Stage 4Vault Encryption **Stage 6Daemon infrastructure: in progress** (`feat/daemon` branch)
Next: Stage 4 — Vault Encryption (skipped for now); messaging bots (Stage 6 remainder)
## Project Roadmap ## Project Roadmap
@@ -19,11 +20,11 @@ memory in `~/.pyra/memory/`, and hard security boundaries around the vault.
### Stage 2 — Plugin Framework ✅ COMPLETE ### Stage 2 — Plugin Framework ✅ COMPLETE
- `src/pyra/plugins/` package: `base.py`, `loader.py`, `registry.py`, `executor.py`, `install.py` - `src/pyra/plugins/` package: `base.py`, `loader.py`, `registry.py`, `executor.py`, `install.py`
- `src/pyra/bundled_plugins/` — ships bundled plugin scripts with pyra - `src/pyra/bundled_plugins/` — ships bundled plugin scripts with pyra
- `src/pyra/daemon/` stub (CLI surface only) - `src/pyra/daemon/` stub (CLI surface only; daemon itself is Stage 6)
- Config: `PluginConfig` + `DaemonConfig` added to `PyraConfig` - Config: `PluginConfig` + `DaemonConfig` added to `PyraConfig`
- Bootstrap: `~/.pyra/plugins/` and `~/.pyra/logs/` created on startup - Bootstrap: `~/.pyra/plugins/` and `~/.pyra/logs/` created on startup
- Chat session: AI tool-use loop (up to 10 iterations), approval gate, plugin slash commands - Chat session: AI tool-use loop (up to 10 iterations), approval gate, plugin slash commands
- CLI: `pyra plugin list/install/enable/disable/setup`, `pyra daemon *` stubs - CLI: `pyra plugin list/install/enable/disable/setup`, `pyra daemon *` (stubs at Stage 2; implemented in Stage 6)
### Stage 3 — Memory Database ✅ COMPLETE ### Stage 3 — Memory Database ✅ COMPLETE
- `src/pyra/memory/database.py`: SQLite + FTS5 via `memory_meta` + `memory_fts` tables - `src/pyra/memory/database.py`: SQLite + FTS5 via `memory_meta` + `memory_fts` tables
@@ -117,7 +118,11 @@ the vault under namespaced keys (`plugin:{name}:{key}`).
| `plugins/executor.py` | Approval gate: scan args → prompt → execute → scan result → log | | `plugins/executor.py` | Approval gate: scan args → prompt → execute → scan result → log |
| `plugins/install.py` | Copies bundled plugins to `~/.pyra/plugins/` | | `plugins/install.py` | Copies bundled plugins to `~/.pyra/plugins/` |
| `bundled_plugins/` | Standalone plugin scripts shipped with pyra (installed on demand) | | `bundled_plugins/` | Standalone plugin scripts shipped with pyra (installed on demand) |
| `daemon/__init__.py` | Daemon package stub (implementation in Stage 2.4) | | `daemon/pid.py` | Atomic PID file — write, read, stale detection (POSIX + Windows), context manager |
| `daemon/ipc.py` | IPC transport — Unix socket chmod 600 + UID-check (Linux/macOS) or TCP loopback + port file (Windows); newline-delimited JSON protocol |
| `daemon/service.py` | OS service file generation + install/uninstall — launchd plist (macOS), systemd user unit (Linux), schtasks XML (Windows) |
| `daemon/core.py` | asyncio event loop entry point, `PluginSupervisor` (per-task restart, max 10×, 5s back-off, reload), IPC command dispatch, signal handling |
| `daemon/__init__.py` | Public daemon API exports |
### Runtime: `~/.pyra/` ### Runtime: `~/.pyra/`
@@ -493,6 +498,40 @@ Dataclass: `MemoryFile(name, path, category, size_bytes, modified)`
| `list_bundled_plugins` | `(bundled_dir: Path) -> list[str]` | Names of all bundled plugins that have a `manifest.json` | | `list_bundled_plugins` | `(bundled_dir: Path) -> list[str]` | Names of all bundled plugins that have a `manifest.json` |
| `read_manifest` | `(plugin_dir: Path) -> dict` | Reads `manifest.json`; returns `{}` if missing | | `read_manifest` | `(plugin_dir: Path) -> dict` | Reads `manifest.json`; returns `{}` if missing |
#### `daemon.core`
| Function | Signature | Purpose |
|----------|-----------|---------|
| `run_foreground` | `() -> None` | Entry point for `pyra daemon run` — loads config + plugins, writes PID file, runs asyncio loop |
| `start_background` | `() -> None` | Spawns `pyra daemon run` as a detached subprocess (`start_new_session` on POSIX, `DETACHED_PROCESS` on Windows) |
#### `daemon.pid`
| Function | Signature | Purpose |
|----------|-----------|---------|
| `resolve_pid_path` | `(cfg_path: str) -> Path` | Expand `~` and resolve to absolute Path |
#### `daemon.ipc`
| Function | Signature | Purpose |
|----------|-----------|---------|
| `send_command` | `(address, msg, timeout=5.0) -> IpcResponse` | Synchronous CLI helper — `asyncio.run(IpcClient.send(...))` |
| `get_socket_path` | `(cfg: str) -> Path` | Expand `~` and return Unix socket path |
| `is_unix_socket` | `() -> bool` | True on Linux/macOS (`sys.platform != 'nt'`) |
| `get_port_file_path` | `() -> Path` | Path to `~/.pyra/daemon.port` (Windows TCP port file) |
#### `daemon.service`
| Function | Signature | Purpose |
|----------|-----------|---------|
| `detect_platform` | `() -> Literal["macos","linux","windows"]` | Detect current OS |
| `find_pyra_executable` | `() -> str` | `shutil.which("pyra")` → sibling fallback → `sys.executable -m pyra` |
| `install_service` | `() -> None` | Generate + register OS service (reads config for log/pid paths) |
| `uninstall_service` | `() -> None` | Deregister OS service |
| `render_launchd_plist` | `(exe, log_file, pid_file) -> str` | macOS plist template |
| `render_systemd_unit` | `(exe, log_file) -> str` | Linux systemd unit template |
| `render_schtasks_xml` | `(exe) -> str` | Windows Task Scheduler XML template (write as UTF-16) |
#### `chat.renderer` — rendering functions and shared `console` #### `chat.renderer` — rendering functions and shared `console`
Import `console` from here; do not create a second `rich.Console()` in new code. Import `console` from here; do not create a second `rich.Console()` in new code.
@@ -515,7 +554,7 @@ Import `console` from here; do not create a second `rich.Console()` in new code.
| `GeneralConfig` | `config.schema` | `general:` block — `user_name`, `assistant_name` | | `GeneralConfig` | `config.schema` | `general:` block — `user_name`, `assistant_name` |
| `ProviderConfig` | `config.schema` | `ai:` block — `provider_id`, `model`, `base_url` | | `ProviderConfig` | `config.schema` | `ai:` block — `provider_id`, `model`, `base_url` |
| `PluginConfig` | `config.schema` | `plugins:` block — `enabled`, `require_approval`, `log_executions` | | `PluginConfig` | `config.schema` | `plugins:` block — `enabled`, `require_approval`, `log_executions` |
| `DaemonConfig` | `config.schema` | `daemon:` block | | `DaemonConfig` | `config.schema` | `daemon:` block — `enabled`, `socket_path`, `log_file`, `pid_file`, `ipc_port` |
| `MemoryConfig` | `config.schema` | `memory:` block — `max_tokens_in_context`, `auto_load` | | `MemoryConfig` | `config.schema` | `memory:` block — `max_tokens_in_context`, `auto_load` |
| `SecurityConfig` | `config.schema` | `security:` block — `injection_detection`, `log_injections` | | `SecurityConfig` | `config.schema` | `security:` block — `injection_detection`, `log_injections` |
| `ConversationHistory` | `chat.history` | Holds message list; builds API payload via `build_for_api()`; trims to token budget | | `ConversationHistory` | `chat.history` | Holds message list; builds API payload via `build_for_api()`; trims to token budget |
@@ -526,3 +565,5 @@ Import `console` from here; do not create a second `rich.Console()` in new code.
| `PyraPlugin` | `plugins.base` | `@runtime_checkable` Protocol — the plugin interface | | `PyraPlugin` | `plugins.base` | `@runtime_checkable` Protocol — the plugin interface |
| `BasePlugin` | `plugins.base` | Concrete base with no-op defaults; plugins should inherit this | | `BasePlugin` | `plugins.base` | Concrete base with no-op defaults; plugins should inherit this |
| `TaskPlanner` | `chat.planner` | Multi-step plan runner; `make_tool_handler()` returns the callable wired into the chat session; presents plan for user approval, executes each step via litellm with up to 5 tool-use iterations, verifies output before proceeding | | `TaskPlanner` | `chat.planner` | Multi-step plan runner; `make_tool_handler()` returns the callable wired into the chat session; presents plan for user approval, executes each step via litellm with up to 5 tool-use iterations, verifies output before proceeding |
| `PluginSupervisor` | `daemon.core` | asyncio supervisor — `add_task(name, factory)`, `start()`, `stop()`, `reload()`, `status()`; restarts crashed tasks up to 10× with 5s back-off |
| `PidFile` | `daemon.pid` | `write()` (atomic), `read()`, `is_stale()`, `remove()`, context manager; `PidFileError(OSError)` raised when live PID already exists |
+8
View File
@@ -35,6 +35,12 @@ gdrive = ["google-api-python-client>=2.120.0", "google-auth-oauthlib>=1.2.0"]
onedrive = ["msal>=1.28.0"] onedrive = ["msal>=1.28.0"]
dropbox = ["dropbox>=12.0.0"] dropbox = ["dropbox>=12.0.0"]
daemon = ["aiofiles>=23.0.0"] daemon = ["aiofiles>=23.0.0"]
email = [
"imap-tools>=1.7.0",
"google-api-python-client>=2.120.0",
"google-auth-oauthlib>=1.2.0",
"O365>=2.0.36",
]
all-plugins = [ all-plugins = [
"caldav>=1.3.0", "webdav4>=0.9.0", "vobject>=0.9.6", "caldav>=1.3.0", "webdav4>=0.9.0", "vobject>=0.9.6",
"matrix-nio>=0.24.0", "aiofiles>=23.0.0", "matrix-nio>=0.24.0", "aiofiles>=23.0.0",
@@ -44,6 +50,8 @@ all-plugins = [
"google-api-python-client>=2.120.0", "google-auth-oauthlib>=1.2.0", "google-api-python-client>=2.120.0", "google-auth-oauthlib>=1.2.0",
"msal>=1.28.0", "msal>=1.28.0",
"dropbox>=12.0.0", "dropbox>=12.0.0",
"imap-tools>=1.7.0",
"O365>=2.0.36",
] ]
[project.scripts] [project.scripts]
+4 -3
View File
@@ -81,8 +81,9 @@ def start_chat() -> None:
name="plan_and_execute", name="plan_and_execute",
description=( description=(
"Decompose a multi-step task into sequential steps and execute each with " "Decompose a multi-step task into sequential steps and execute each with "
"a focused sub-agent. Use when the request has multiple distinct phases. " "a focused sub-agent. Use only for long tasks with 3 or more sequential "
"Specify 'agent' per step to route to a specialized agent." "steps that need verification between them — for simple 2-step tasks, do "
"them directly. Specify 'agent' per step to route to a specialized agent."
), ),
parameters={ parameters={
"type": "object", "type": "object",
@@ -125,7 +126,7 @@ def start_chat() -> None:
)) ))
registry.register_builtin(Tool( registry.register_builtin(Tool(
name="memory_read", name="memory_read",
description="Read the full content of a memory file by its relative path (e.g. 'user/profile.md').", description="Read the full content of a memory file by its relative path (e.g. 'user/profile.md'). Use memory_lookup first to find the correct path.",
parameters={ parameters={
"type": "object", "type": "object",
"properties": { "properties": {
+3
View File
@@ -1,6 +1,7 @@
"""Pyra background daemon package.""" """Pyra background daemon package."""
from pyra.daemon.core import PluginSupervisor, run_foreground, start_background from pyra.daemon.core import PluginSupervisor, run_foreground, start_background
from pyra.daemon.events import publish, subscribe_forever
from pyra.daemon.ipc import IpcClient, IpcServer, send_command from pyra.daemon.ipc import IpcClient, IpcServer, send_command
from pyra.daemon.pid import PidFile, PidFileError, resolve_pid_path from pyra.daemon.pid import PidFile, PidFileError, resolve_pid_path
from pyra.daemon.service import detect_platform, install_service, uninstall_service from pyra.daemon.service import detect_platform, install_service, uninstall_service
@@ -9,6 +10,8 @@ __all__ = [
"run_foreground", "run_foreground",
"start_background", "start_background",
"PluginSupervisor", "PluginSupervisor",
"publish",
"subscribe_forever",
"IpcClient", "IpcClient",
"IpcServer", "IpcServer",
"send_command", "send_command",
+26 -4
View File
@@ -68,6 +68,22 @@ class PluginSupervisor:
if tasks: if tasks:
await asyncio.gather(*tasks, return_exceptions=True) await asyncio.gather(*tasks, return_exceptions=True)
async def reload(self) -> None:
"""Cancel all running tasks and restart them with fresh coroutines."""
for record in self._records:
if record.task and not record.task.done():
record.task.cancel()
try:
await record.task
except (asyncio.CancelledError, Exception):
pass
record.restart_count = 0
record.last_error = None
record.task = asyncio.create_task(
self._supervise(record), name=record.name
)
_log.info("Reloaded %d plugin task(s).", len(self._records))
def status(self) -> list[dict]: def status(self) -> list[dict]:
return [ return [
{ {
@@ -131,8 +147,8 @@ def _make_ipc_handler(supervisor: PluginSupervisor):
supervisor.request_shutdown() supervisor.request_shutdown()
return {"ok": True, "data": {}} return {"ok": True, "data": {}}
case "reload": case "reload":
_log.info("Reload requested via IPC.") await supervisor.reload()
return {"ok": True, "data": {}} return {"ok": True, "data": {"tasks_reloaded": len(supervisor._records)}}
case _: case _:
return {"ok": False, "data": {"error": f"unknown command: {cmd}"}} return {"ok": False, "data": {"error": f"unknown command: {cmd}"}}
@@ -144,6 +160,9 @@ def _make_ipc_handler(supervisor: PluginSupervisor):
async def _run_daemon(cfg, supervisor: PluginSupervisor) -> None: async def _run_daemon(cfg, supervisor: PluginSupervisor) -> None:
from pyra.daemon.ipc import IpcServer, get_socket_path, is_unix_socket from pyra.daemon.ipc import IpcServer, get_socket_path, is_unix_socket
# Install signal handlers now that the event loop is running.
_install_signal_handlers(supervisor)
if is_unix_socket(): if is_unix_socket():
address = get_socket_path(cfg.daemon.socket_path) address = get_socket_path(cfg.daemon.socket_path)
else: else:
@@ -193,14 +212,17 @@ def run_foreground() -> None:
_start_time = time.monotonic() _start_time = time.monotonic()
try:
with pid_file: with pid_file:
_install_signal_handlers(supervisor)
_log.info("Pyra daemon starting (PID %d).", os.getpid()) _log.info("Pyra daemon starting (PID %d).", os.getpid())
try: try:
asyncio.run(_run_daemon(cfg, supervisor)) asyncio.run(_run_daemon(cfg, supervisor))
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
_log.info("Pyra daemon stopped.") _log.info("Pyra daemon stopped.")
except PidFileError as exc:
_log.error("Could not acquire PID file: %s", exc)
sys.exit(1)
# ── Background spawn (pyra daemon start) ───────────────────────────────────── # ── Background spawn (pyra daemon start) ─────────────────────────────────────
@@ -266,7 +288,7 @@ def _install_signal_handlers(supervisor: PluginSupervisor) -> None:
signal.signal(signal.SIGTERM, lambda *_: supervisor.request_shutdown()) signal.signal(signal.SIGTERM, lambda *_: supervisor.request_shutdown())
return return
loop = asyncio.get_event_loop() loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGTERM, supervisor.request_shutdown) loop.add_signal_handler(signal.SIGTERM, supervisor.request_shutdown)
loop.add_signal_handler(signal.SIGHUP, supervisor.request_shutdown) loop.add_signal_handler(signal.SIGHUP, supervisor.request_shutdown)
+46
View File
@@ -0,0 +1,46 @@
"""Async notification bus for inter-plugin communication in the daemon.
Plugins publish events to a shared asyncio.Queue; other plugins (e.g. messaging
bots) consume them via subscribe_forever(). No direct plugin-to-plugin imports
are needed — both sides just use this module.
Event shape (by convention):
{"type": "new_email", "priority": int, "from": str, "subject": str,
"summary": str, "uid": str, "folder": str}
{"type": "new_message", "bot": str, "user_id": str, "text": str}
"""
from __future__ import annotations
import asyncio
from typing import Any, AsyncGenerator
_queue: asyncio.Queue[dict[str, Any]] | None = None
def get_queue() -> asyncio.Queue[dict[str, Any]]:
global _queue
if _queue is None:
_queue = asyncio.Queue(maxsize=200)
return _queue
async def publish(event: dict[str, Any]) -> None:
"""Emit an event. Drops silently if the queue is full (daemon is overloaded)."""
q = get_queue()
try:
q.put_nowait(event)
except asyncio.QueueFull:
pass
async def subscribe_forever() -> AsyncGenerator[dict[str, Any], None]:
"""Async generator — yields events as they arrive. Intended for daemon tasks."""
q = get_queue()
while True:
yield await q.get()
def reset() -> None:
"""Discard the current queue and create a fresh one. FOR TESTS ONLY."""
global _queue
_queue = None
+4 -1
View File
@@ -87,7 +87,10 @@ class PluginRegistry:
factories: list[tuple[str, Callable[[], Coroutine]]] = [] # type: ignore[type-arg] factories: list[tuple[str, Callable[[], Coroutine]]] = [] # type: ignore[type-arg]
for plugin in self._plugins.values(): for plugin in self._plugins.values():
try: try:
n_tasks = len(plugin.daemon_tasks()) initial = plugin.daemon_tasks()
n_tasks = len(initial)
for c in initial:
c.close() # prevent "coroutine never awaited" RuntimeWarning
except Exception: except Exception:
continue continue
for i in range(n_tasks): for i in range(n_tasks):
+226
View File
@@ -0,0 +1,226 @@
"""Unit tests for the daemon core — PluginSupervisor and IPC handler dispatch."""
from __future__ import annotations
import asyncio
import pytest
from pyra.daemon.core import PluginSupervisor, _make_ipc_handler
# ── Helpers ───────────────────────────────────────────────────────────────────
async def _drain(n: int = 20) -> None:
"""Yield to the event loop n times to let scheduled tasks run."""
for _ in range(n):
await asyncio.sleep(0)
# ── PluginSupervisor — lifecycle ──────────────────────────────────────────────
async def test_supervisor_empty_starts_and_stops_cleanly() -> None:
sup = PluginSupervisor()
await sup.start()
await sup.stop()
assert sup.status() == []
async def test_supervisor_runs_task_to_completion() -> None:
done = asyncio.Event()
async def task():
done.set()
sup = PluginSupervisor()
sup._RESTART_DELAY = 0.0
sup.add_task("t", task)
await sup.start()
await asyncio.wait_for(done.wait(), timeout=1.0)
await sup.stop()
assert sup._records[0].restart_count == 0
assert sup._records[0].last_error is None
async def test_supervisor_restarts_crashed_task() -> None:
call_count = 0
completed = asyncio.Event()
async def flaky():
nonlocal call_count
call_count += 1
if call_count == 1:
raise RuntimeError("first call fails")
completed.set()
sup = PluginSupervisor()
sup._RESTART_DELAY = 0.0
sup.add_task("flaky", flaky)
await sup.start()
await asyncio.wait_for(completed.wait(), timeout=1.0)
await sup.stop()
assert sup._records[0].restart_count == 1
assert "RuntimeError" in (sup._records[0].last_error or "")
async def test_supervisor_gives_up_after_max_restarts() -> None:
async def always_fails():
raise ValueError("always")
sup = PluginSupervisor()
sup._RESTART_DELAY = 0.0
sup._MAX_RESTARTS = 3
sup.add_task("failing", always_fails)
await sup.start()
# Allow enough iterations for 3 restarts + give-up.
for _ in range(200):
await asyncio.sleep(0)
if sup._records[0].task and sup._records[0].task.done():
break
await sup.stop()
assert sup._records[0].restart_count == 3
assert sup._records[0].last_error is not None
# ── PluginSupervisor — status ─────────────────────────────────────────────────
async def test_supervisor_status_returns_correct_shape() -> None:
sup = PluginSupervisor()
sup._RESTART_DELAY = 0.0
async def noop():
pass
sup.add_task("noop", noop)
await sup.start()
await _drain()
statuses = sup.status()
assert len(statuses) == 1
s = statuses[0]
assert set(s.keys()) == {"name", "alive", "restart_count", "last_error"}
assert s["name"] == "noop"
assert isinstance(s["alive"], bool)
assert isinstance(s["restart_count"], int)
await sup.stop()
async def test_supervisor_status_empty_when_no_tasks() -> None:
sup = PluginSupervisor()
await sup.start()
assert sup.status() == []
await sup.stop()
# ── PluginSupervisor — reload ─────────────────────────────────────────────────
async def test_supervisor_reload_restarts_tasks() -> None:
call_count = 0
async def counting():
nonlocal call_count
call_count += 1
# Hang until cancelled so reload can cancel it.
await asyncio.sleep(10)
sup = PluginSupervisor()
sup._RESTART_DELAY = 0.0
sup.add_task("c", counting)
await sup.start()
await _drain()
assert call_count == 1
await sup.reload()
await _drain()
# After reload, the task should have been restarted (called a second time).
assert call_count == 2
assert sup._records[0].restart_count == 0 # reset by reload
await sup.stop()
async def test_supervisor_reload_resets_restart_count() -> None:
call_count = 0
async def flaky():
nonlocal call_count
call_count += 1
if call_count <= 2:
raise RuntimeError("crash")
await asyncio.sleep(10)
sup = PluginSupervisor()
sup._RESTART_DELAY = 0.0
sup.add_task("f", flaky)
await sup.start()
# Wait for 2 crashes to accumulate.
for _ in range(200):
await asyncio.sleep(0)
if sup._records[0].restart_count >= 2:
break
assert sup._records[0].restart_count == 2
await sup.reload()
# Reload must reset the counter.
assert sup._records[0].restart_count == 0
await sup.stop()
# ── IPC command handler ───────────────────────────────────────────────────────
async def test_ipc_handler_ping() -> None:
sup = PluginSupervisor()
handler = _make_ipc_handler(sup)
resp = await handler({"cmd": "ping"})
assert resp["ok"] is True
assert resp["data"]["pong"] is True
async def test_ipc_handler_status_shape() -> None:
sup = PluginSupervisor()
handler = _make_ipc_handler(sup)
resp = await handler({"cmd": "status"})
assert resp["ok"] is True
assert "uptime" in resp["data"]
assert "pid" in resp["data"]
assert "tasks" in resp["data"]
assert isinstance(resp["data"]["tasks"], list)
async def test_ipc_handler_stop_signals_shutdown() -> None:
sup = PluginSupervisor()
handler = _make_ipc_handler(sup)
assert not sup._shutdown.is_set()
resp = await handler({"cmd": "stop"})
assert resp["ok"] is True
assert sup._shutdown.is_set()
async def test_ipc_handler_reload_returns_task_count() -> None:
sup = PluginSupervisor()
handler = _make_ipc_handler(sup)
resp = await handler({"cmd": "reload"})
assert resp["ok"] is True
assert resp["data"]["tasks_reloaded"] == 0
async def test_ipc_handler_unknown_command() -> None:
sup = PluginSupervisor()
handler = _make_ipc_handler(sup)
resp = await handler({"cmd": "bogus"})
assert resp["ok"] is False
assert "error" in resp["data"]
assert "bogus" in resp["data"]["error"]