"""Unit tests for the email plugin — pure-logic helpers, no network calls.""" from __future__ import annotations import json from unittest.mock import MagicMock, patch import pytest # Import helpers directly — they depend only on stdlib from pyra.bundled_plugins.email.plugin import ( EmailMessage, FilterRule, _build_imap_search, _decode_header, _gmail_action_summary, _gmail_criteria_summary, _normalize_to_gmail, _normalize_to_outlook, _outlook_actions_summary, _parse_raw_message, _strip_html, ) # ── _strip_html ──────────────────────────────────────────────────────────────── def test_strip_html_removes_tags(): result = _strip_html("

Hello world

") assert "<" not in result assert "Hello" in result assert "world" in result def test_strip_html_decodes_entities(): result = _strip_html("<script> & "test"") assert "

Keep this

" result = _strip_html(html) assert "color" not in result assert "alert" not in result assert "Keep this" in result def test_strip_html_plain_text_unchanged(): result = _strip_html("Hello, world!") assert result == "Hello, world!" # ── _decode_header ───────────────────────────────────────────────────────────── def test_decode_header_plain(): assert _decode_header("Hello") == "Hello" def test_decode_header_encoded(): # RFC 2047 base64-encoded UTF-8 encoded = "=?utf-8?b?SGVsbG8gV29ybGQ=?=" assert _decode_header(encoded) == "Hello World" def test_decode_header_empty(): assert _decode_header("") == "" # ── _parse_raw_message ───────────────────────────────────────────────────────── def _make_raw_email( from_addr: str = "sender@example.com", to_addr: str = "recipient@example.com", subject: str = "Test Subject", body: str = "Hello from test.", message_id: str = "", ) -> bytes: return ( f"From: {from_addr}\r\n" f"To: {to_addr}\r\n" f"Subject: {subject}\r\n" f"Date: Mon, 01 Jan 2024 12:00:00 +0000\r\n" f"Message-ID: {message_id}\r\n" f"MIME-Version: 1.0\r\n" f"Content-Type: text/plain; charset=utf-8\r\n" f"\r\n" f"{body}\r\n" ).encode() def test_parse_raw_message_basic_fields(): raw = _make_raw_email() msg = _parse_raw_message(raw, uid="42", folder="INBOX", is_read=False) assert msg.uid == "42" assert msg.folder == "INBOX" assert msg.from_addr == "sender@example.com" assert "recipient@example.com" in msg.to_addrs assert msg.subject == "Test Subject" assert msg.body_text == "Hello from test." assert msg.is_read is False assert msg.has_attachments is False assert msg.attachments == [] assert msg.message_id == "" def test_parse_raw_message_snippet_truncated(): long_body = "A" * 500 raw = _make_raw_email(body=long_body) msg = _parse_raw_message(raw, uid="1", folder="INBOX", is_read=True) assert len(msg.snippet) <= 200 def test_parse_raw_message_body_truncated_at_8000(): huge_body = "x" * 10000 raw = _make_raw_email(body=huge_body) msg = _parse_raw_message(raw, uid="1", folder="INBOX", is_read=False) assert len(msg.body_text) <= 8030 # 8000 + "[...truncated]" assert "truncated" in msg.body_text def test_parse_raw_message_html_stripped(): raw = _make_raw_email(body="

Plain text content

") # Create HTML part manually html_raw = ( "From: a@b.com\r\nTo: c@d.com\r\nSubject: Test\r\n" "MIME-Version: 1.0\r\nContent-Type: text/html; charset=utf-8\r\n\r\n" "

Plain text content

\r\n" ).encode() msg = _parse_raw_message(html_raw, uid="1", folder="INBOX", is_read=False) assert "<" not in msg.body_text assert "Plain text content" in msg.body_text # ── _build_imap_search ───────────────────────────────────────────────────────── def test_build_imap_search_unread(): from imap_tools import AND criteria = _build_imap_search("unread invoices") # Should produce an AND with seen=False assert criteria is not None def test_build_imap_search_from(): criteria = _build_imap_search("from:boss@company.com") assert criteria is not None def test_build_imap_search_subject(): criteria = _build_imap_search("subject: meeting notes") assert criteria is not None def test_build_imap_search_fallback(): criteria = _build_imap_search("random search terms") assert criteria is not None # ── Gmail rule normalisation ─────────────────────────────────────────────────── def test_normalize_to_gmail_from_condition(): criteria, action = _normalize_to_gmail({"from": "boss@company.com"}, {"mark_read": True}) assert criteria.get("from") == "boss@company.com" assert "UNREAD" in action.get("removeLabelIds", []) def test_normalize_to_gmail_move_to(): criteria, action = _normalize_to_gmail({"subject": "invoice"}, {"move_to": "Bills"}) assert criteria.get("subject") == "invoice" assert "Bills" in action.get("addLabelIds", []) assert "INBOX" in action.get("removeLabelIds", []) def test_normalize_to_gmail_mark_important(): _, action = _normalize_to_gmail({}, {"mark_important": True}) assert "IMPORTANT" in action.get("addLabelIds", []) def test_normalize_to_gmail_forward(): _, action = _normalize_to_gmail({}, {"forward_to": "archive@example.com"}) assert action.get("forward") == "archive@example.com" def test_gmail_criteria_summary_empty(): assert _gmail_criteria_summary({}) == "(any)" def test_gmail_criteria_summary_from(): assert "from=boss" in _gmail_criteria_summary({"from": "boss@company.com"}) def test_gmail_action_summary_empty(): assert _gmail_action_summary({}) == "(no action)" # ── Outlook rule normalisation ───────────────────────────────────────────────── def test_normalize_to_outlook_from(): body = _normalize_to_outlook({"from": "a@b.com"}, {"move_to": "Work"}) from_addrs = body["conditions"].get("fromAddresses", []) assert any("a@b.com" in str(a) for a in from_addrs) assert body["actions"].get("moveToFolder") == "Work" def test_normalize_to_outlook_subject_contains(): body = _normalize_to_outlook({"subject": "invoice"}, {"mark_read": True}) assert "invoice" in body["conditions"].get("subjectContains", []) assert body["actions"].get("markAsRead") is True def test_normalize_to_outlook_mark_important(): body = _normalize_to_outlook({}, {"mark_important": True}) assert body["actions"].get("markImportance") == "high" def test_normalize_to_outlook_delete(): body = _normalize_to_outlook({}, {"delete": True}) assert body["actions"].get("delete") is True # ── email_move folder-not-found path ────────────────────────────────────────── def test_email_move_returns_error_when_folder_missing(tmp_pyra_home): from pyra.bundled_plugins.email.plugin import EmailPlugin plugin = EmailPlugin() # Inject a mock provider with known folders mock_provider = MagicMock() mock_provider.list_folders.return_value = ["INBOX", "Sent", "Trash"] plugin._provider_instance = mock_provider result = plugin._tool_move("uid123", "NonExistent", "INBOX") assert "does not exist" in result.lower() assert "email_create_folder" in result mock_provider.move_message.assert_not_called() def test_email_move_succeeds_when_folder_exists(tmp_pyra_home): from pyra.bundled_plugins.email.plugin import EmailPlugin plugin = EmailPlugin() mock_provider = MagicMock() mock_provider.list_folders.return_value = ["INBOX", "Work", "Newsletters"] plugin._provider_instance = mock_provider result = plugin._tool_move("uid456", "Work", "INBOX") assert "moved" in result.lower() mock_provider.move_message.assert_called_once_with("uid456", "INBOX", "Work") # ── email_list_rules not-supported path ─────────────────────────────────────── def test_email_list_rules_not_supported(tmp_pyra_home): from pyra.bundled_plugins.email.plugin import EmailPlugin plugin = EmailPlugin() mock_provider = MagicMock() mock_provider.list_rules.side_effect = NotImplementedError plugin._provider_instance = mock_provider result = plugin._tool_list_rules() assert "not supported" in result.lower() # ── daemon/events integration ───────────────────────────────────────────────── @pytest.mark.asyncio async def test_events_publish_and_subscribe(): from pyra.daemon import events events.reset() await events.publish({"type": "new_email", "subject": "Test"}) received = [] async for event in events.subscribe_forever(): received.append(event) break # only need one assert received[0]["type"] == "new_email" assert received[0]["subject"] == "Test" events.reset() @pytest.mark.asyncio async def test_events_queue_full_drops_silently(): from pyra.daemon import events events.reset() # Fill the queue for i in range(200): await events.publish({"n": i}) # This should not raise even though queue is full await events.publish({"n": 999}) events.reset() # ── ProtonMail Bridge connectivity check (mocked) ───────────────────────────── def test_protonmail_setup_aborts_when_bridge_unreachable(tmp_pyra_home): """_setup_protonmail should abort gracefully when Bridge is not running.""" import socket from unittest.mock import patch, MagicMock from pyra.bundled_plugins.email.plugin import EmailPlugin plugin = EmailPlugin() console = MagicMock() vault_writer = MagicMock() with patch("socket.create_connection", side_effect=ConnectionRefusedError): plugin._setup_protonmail(console, vault_writer, "user@proton.me") # Should not store any vault key if Bridge is unreachable vault_writer.assert_not_called() # ── messaging bot recommendation ────────────────────────────────────────────── def test_check_messaging_bot_warns_when_no_bot(tmp_pyra_home): from pyra.bundled_plugins.email.plugin import EmailPlugin from unittest.mock import MagicMock, patch from pyra.config.schema import PyraConfig, ProviderConfig, PluginConfig plugin = EmailPlugin() console = MagicMock() cfg = PyraConfig(ai=ProviderConfig(provider_id="lmstudio", model="test")) cfg.plugins = PluginConfig(enabled=[]) # no bots with patch("pyra.bundled_plugins.email.plugin.EmailPlugin._load_settings", return_value={}), \ patch("pyra.config.manager.load_config", return_value=cfg): plugin._check_messaging_bot(console) # Should have printed something (Panel) recommending a bot console.print.assert_called() # ── Tool list completeness ───────────────────────────────────────────────────── def test_plugin_exposes_16_tools(): from pyra.bundled_plugins.email.plugin import EmailPlugin plugin = EmailPlugin() # on_load with no-op vault reader plugin.on_load(lambda _: None) tools = plugin.tools() tool_names = [t.name for t in tools] assert len(tools) == 16 expected = { "email_list_folder", "email_read", "email_send", "email_reply", "email_forward", "email_move", "email_delete", "email_mark_read", "email_search", "email_list_folders", "email_create_folder", "email_inbox_summary", "email_list_rules", "email_create_rule", "email_delete_rule", "email_bulk_action", } assert set(tool_names) == expected def test_write_tools_require_approval(): from pyra.bundled_plugins.email.plugin import EmailPlugin plugin = EmailPlugin() plugin.on_load(lambda _: None) tools = {t.name: t for t in plugin.tools()} for name in ["email_send", "email_reply", "email_forward", "email_move", "email_delete", "email_create_folder", "email_create_rule", "email_delete_rule", "email_bulk_action"]: assert tools[name].requires_approval, f"{name} should require approval" def test_read_tools_no_approval(): from pyra.bundled_plugins.email.plugin import EmailPlugin plugin = EmailPlugin() plugin.on_load(lambda _: None) tools = {t.name: t for t in plugin.tools()} for name in ["email_list_folder", "email_read", "email_mark_read", "email_search", "email_list_folders", "email_inbox_summary", "email_list_rules"]: assert not tools[name].requires_approval, f"{name} should NOT require approval"