diff --git a/pyproject.toml b/pyproject.toml index 955211f..af9f43b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,6 +35,12 @@ gdrive = ["google-api-python-client>=2.120.0", "google-auth-oauthlib>=1.2.0"] onedrive = ["msal>=1.28.0"] dropbox = ["dropbox>=12.0.0"] daemon = ["aiofiles>=23.0.0"] +email = [ + "imap-tools>=1.7.0", + "google-api-python-client>=2.120.0", + "google-auth-oauthlib>=1.2.0", + "O365>=2.0.36", +] all-plugins = [ "caldav>=1.3.0", "webdav4>=0.9.0", "vobject>=0.9.6", "matrix-nio>=0.24.0", "aiofiles>=23.0.0", @@ -44,6 +50,8 @@ all-plugins = [ "google-api-python-client>=2.120.0", "google-auth-oauthlib>=1.2.0", "msal>=1.28.0", "dropbox>=12.0.0", + "imap-tools>=1.7.0", + "O365>=2.0.36", ] [project.scripts] diff --git a/src/pyra/daemon/__init__.py b/src/pyra/daemon/__init__.py index 2f10e68..47ed57b 100644 --- a/src/pyra/daemon/__init__.py +++ b/src/pyra/daemon/__init__.py @@ -1,6 +1,7 @@ """Pyra background daemon package.""" from pyra.daemon.core import PluginSupervisor, run_foreground, start_background +from pyra.daemon.events import publish, subscribe_forever from pyra.daemon.ipc import IpcClient, IpcServer, send_command from pyra.daemon.pid import PidFile, PidFileError, resolve_pid_path from pyra.daemon.service import detect_platform, install_service, uninstall_service @@ -9,6 +10,8 @@ __all__ = [ "run_foreground", "start_background", "PluginSupervisor", + "publish", + "subscribe_forever", "IpcClient", "IpcServer", "send_command", diff --git a/src/pyra/daemon/events.py b/src/pyra/daemon/events.py new file mode 100644 index 0000000..5846dc4 --- /dev/null +++ b/src/pyra/daemon/events.py @@ -0,0 +1,46 @@ +"""Async notification bus for inter-plugin communication in the daemon. + +Plugins publish events to a shared asyncio.Queue; other plugins (e.g. messaging +bots) consume them via subscribe_forever(). No direct plugin-to-plugin imports +are needed — both sides just use this module. + +Event shape (by convention): + {"type": "new_email", "priority": int, "from": str, "subject": str, + "summary": str, "uid": str, "folder": str} + {"type": "new_message", "bot": str, "user_id": str, "text": str} +""" +from __future__ import annotations + +import asyncio +from typing import Any, AsyncGenerator + +_queue: asyncio.Queue[dict[str, Any]] | None = None + + +def get_queue() -> asyncio.Queue[dict[str, Any]]: + global _queue + if _queue is None: + _queue = asyncio.Queue(maxsize=200) + return _queue + + +async def publish(event: dict[str, Any]) -> None: + """Emit an event. Drops silently if the queue is full (daemon is overloaded).""" + q = get_queue() + try: + q.put_nowait(event) + except asyncio.QueueFull: + pass + + +async def subscribe_forever() -> AsyncGenerator[dict[str, Any], None]: + """Async generator — yields events as they arrive. Intended for daemon tasks.""" + q = get_queue() + while True: + yield await q.get() + + +def reset() -> None: + """Discard the current queue and create a fresh one. FOR TESTS ONLY.""" + global _queue + _queue = None