2 Commits

Author SHA1 Message Date
curo1305 1cbb40ac93 chore(chat): tighten tool descriptions to reduce AI selection confusion
- plan_and_execute: restrict to 3+ step tasks; prevents over-triggering on simple requests
- memory_read: hint to call memory_lookup first to find the correct path

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 23:50:21 +02:00
curo1305 8d4917f7ca feat(daemon): add async event bus for inter-plugin notifications
Adds daemon/events.py — a lightweight asyncio.Queue-based publish/subscribe
bus that lets daemon tasks communicate without direct imports between plugins.
Email plugin publishes new_email events; messaging bots consume via
subscribe_forever(). Also adds email optional-dependency group to pyproject.toml
(imap-tools, google-api-python-client, google-auth-oauthlib, O365).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 23:12:17 +02:00
4 changed files with 61 additions and 3 deletions
+8
View File
@@ -35,6 +35,12 @@ gdrive = ["google-api-python-client>=2.120.0", "google-auth-oauthlib>=1.2.0"]
onedrive = ["msal>=1.28.0"] onedrive = ["msal>=1.28.0"]
dropbox = ["dropbox>=12.0.0"] dropbox = ["dropbox>=12.0.0"]
daemon = ["aiofiles>=23.0.0"] daemon = ["aiofiles>=23.0.0"]
email = [
"imap-tools>=1.7.0",
"google-api-python-client>=2.120.0",
"google-auth-oauthlib>=1.2.0",
"O365>=2.0.36",
]
all-plugins = [ all-plugins = [
"caldav>=1.3.0", "webdav4>=0.9.0", "vobject>=0.9.6", "caldav>=1.3.0", "webdav4>=0.9.0", "vobject>=0.9.6",
"matrix-nio>=0.24.0", "aiofiles>=23.0.0", "matrix-nio>=0.24.0", "aiofiles>=23.0.0",
@@ -44,6 +50,8 @@ all-plugins = [
"google-api-python-client>=2.120.0", "google-auth-oauthlib>=1.2.0", "google-api-python-client>=2.120.0", "google-auth-oauthlib>=1.2.0",
"msal>=1.28.0", "msal>=1.28.0",
"dropbox>=12.0.0", "dropbox>=12.0.0",
"imap-tools>=1.7.0",
"O365>=2.0.36",
] ]
[project.scripts] [project.scripts]
+4 -3
View File
@@ -81,8 +81,9 @@ def start_chat() -> None:
name="plan_and_execute", name="plan_and_execute",
description=( description=(
"Decompose a multi-step task into sequential steps and execute each with " "Decompose a multi-step task into sequential steps and execute each with "
"a focused sub-agent. Use when the request has multiple distinct phases. " "a focused sub-agent. Use only for long tasks with 3 or more sequential "
"Specify 'agent' per step to route to a specialized agent." "steps that need verification between them — for simple 2-step tasks, do "
"them directly. Specify 'agent' per step to route to a specialized agent."
), ),
parameters={ parameters={
"type": "object", "type": "object",
@@ -125,7 +126,7 @@ def start_chat() -> None:
)) ))
registry.register_builtin(Tool( registry.register_builtin(Tool(
name="memory_read", name="memory_read",
description="Read the full content of a memory file by its relative path (e.g. 'user/profile.md').", description="Read the full content of a memory file by its relative path (e.g. 'user/profile.md'). Use memory_lookup first to find the correct path.",
parameters={ parameters={
"type": "object", "type": "object",
"properties": { "properties": {
+3
View File
@@ -1,6 +1,7 @@
"""Pyra background daemon package.""" """Pyra background daemon package."""
from pyra.daemon.core import PluginSupervisor, run_foreground, start_background from pyra.daemon.core import PluginSupervisor, run_foreground, start_background
from pyra.daemon.events import publish, subscribe_forever
from pyra.daemon.ipc import IpcClient, IpcServer, send_command from pyra.daemon.ipc import IpcClient, IpcServer, send_command
from pyra.daemon.pid import PidFile, PidFileError, resolve_pid_path from pyra.daemon.pid import PidFile, PidFileError, resolve_pid_path
from pyra.daemon.service import detect_platform, install_service, uninstall_service from pyra.daemon.service import detect_platform, install_service, uninstall_service
@@ -9,6 +10,8 @@ __all__ = [
"run_foreground", "run_foreground",
"start_background", "start_background",
"PluginSupervisor", "PluginSupervisor",
"publish",
"subscribe_forever",
"IpcClient", "IpcClient",
"IpcServer", "IpcServer",
"send_command", "send_command",
+46
View File
@@ -0,0 +1,46 @@
"""Async notification bus for inter-plugin communication in the daemon.
Plugins publish events to a shared asyncio.Queue; other plugins (e.g. messaging
bots) consume them via subscribe_forever(). No direct plugin-to-plugin imports
are needed — both sides just use this module.
Event shape (by convention):
{"type": "new_email", "priority": int, "from": str, "subject": str,
"summary": str, "uid": str, "folder": str}
{"type": "new_message", "bot": str, "user_id": str, "text": str}
"""
from __future__ import annotations
import asyncio
from typing import Any, AsyncGenerator
_queue: asyncio.Queue[dict[str, Any]] | None = None
def get_queue() -> asyncio.Queue[dict[str, Any]]:
global _queue
if _queue is None:
_queue = asyncio.Queue(maxsize=200)
return _queue
async def publish(event: dict[str, Any]) -> None:
"""Emit an event. Drops silently if the queue is full (daemon is overloaded)."""
q = get_queue()
try:
q.put_nowait(event)
except asyncio.QueueFull:
pass
async def subscribe_forever() -> AsyncGenerator[dict[str, Any], None]:
"""Async generator — yields events as they arrive. Intended for daemon tasks."""
q = get_queue()
while True:
yield await q.get()
def reset() -> None:
"""Discard the current queue and create a fresh one. FOR TESTS ONLY."""
global _queue
_queue = None