test: add comprehensive coverage for cli, chat, renderer, dirs, install, paths
56 new tests covering previously untested modules: - test_cli.py: memory write/read/append/list + plugin enable/disable + daemon stubs (via CliRunner) - test_chat_history.py: ConversationHistory build_for_api, add_*/clear, _trim_to_budget - test_chat_renderer.py: render_text_response return values, void render_* functions - test_config_dirs.py: bootstrap idempotency, directory/template/vault/db creation - test_plugin_install.py: list_bundled_plugins, read_manifest, install_bundled_plugin - test_utils_paths.py: ensure_dir (nested, idempotent), safe_chmod Total: 171 → 227 passing tests. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,90 @@
|
||||
import pytest
|
||||
|
||||
|
||||
def _make_config():
|
||||
from pyra.config.schema import PyraConfig, ProviderConfig
|
||||
return PyraConfig(ai=ProviderConfig(provider_id="lmstudio", model="gemma"))
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def history(tmp_pyra_home, monkeypatch):
|
||||
monkeypatch.setattr("pyra.chat.history.load_context_for_session", lambda: "")
|
||||
from pyra.chat.history import ConversationHistory
|
||||
return ConversationHistory(_make_config())
|
||||
|
||||
|
||||
def test_build_for_api_first_message_is_system(history):
|
||||
msgs = history.build_for_api()
|
||||
assert msgs[0]["role"] == "system"
|
||||
|
||||
|
||||
def test_build_for_api_system_contains_pyra(history):
|
||||
msgs = history.build_for_api()
|
||||
assert "Pyra" in msgs[0]["content"]
|
||||
|
||||
|
||||
def test_build_for_api_includes_memory_context(tmp_pyra_home, monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
"pyra.chat.history.load_context_for_session",
|
||||
lambda: "## Long-term Memory\n\nSome remembered facts.",
|
||||
)
|
||||
from pyra.chat.history import ConversationHistory
|
||||
h = ConversationHistory(_make_config())
|
||||
msgs = h.build_for_api()
|
||||
assert "Long-term Memory" in msgs[0]["content"]
|
||||
|
||||
|
||||
def test_add_user_appears_in_api_payload(history):
|
||||
history.add_user("hello from user")
|
||||
msgs = history.build_for_api()
|
||||
user_msgs = [m for m in msgs if m["role"] == "user"]
|
||||
assert any(m["content"] == "hello from user" for m in user_msgs)
|
||||
|
||||
|
||||
def test_add_assistant_appears_in_api_payload(history):
|
||||
history.add_assistant("hello from assistant")
|
||||
msgs = history.build_for_api()
|
||||
asst_msgs = [m for m in msgs if m["role"] == "assistant"]
|
||||
assert any(m["content"] == "hello from assistant" for m in asst_msgs)
|
||||
|
||||
|
||||
def test_add_tool_result(history):
|
||||
history.add_tool_result("call_abc", "tool output")
|
||||
msgs = history.build_for_api()
|
||||
tool_msgs = [m for m in msgs if m.get("role") == "tool"]
|
||||
assert len(tool_msgs) == 1
|
||||
assert tool_msgs[0]["tool_call_id"] == "call_abc"
|
||||
assert tool_msgs[0]["content"] == "tool output"
|
||||
|
||||
|
||||
def test_clear_removes_messages(history):
|
||||
history.add_user("hello")
|
||||
history.add_assistant("hi")
|
||||
history.clear()
|
||||
msgs = history.build_for_api()
|
||||
assert all(m["role"] == "system" for m in msgs)
|
||||
|
||||
|
||||
def test_trim_to_budget_drops_old_messages():
|
||||
from pyra.chat.history import _trim_to_budget
|
||||
msgs = [
|
||||
{"role": "user", "content": "a" * 4000},
|
||||
{"role": "assistant", "content": "b" * 4000},
|
||||
{"role": "user", "content": "new message"},
|
||||
]
|
||||
trimmed = _trim_to_budget(list(msgs), 100)
|
||||
assert len(trimmed) < 3
|
||||
assert any(m["content"] == "new message" for m in trimmed)
|
||||
|
||||
|
||||
def test_trim_to_budget_no_trim_when_under_budget():
|
||||
from pyra.chat.history import _trim_to_budget
|
||||
msgs = [{"role": "user", "content": "short"}]
|
||||
result = _trim_to_budget(list(msgs), 10000)
|
||||
assert len(result) == 1
|
||||
assert result[0]["content"] == "short"
|
||||
|
||||
|
||||
def test_trim_to_budget_empty_list():
|
||||
from pyra.chat.history import _trim_to_budget
|
||||
assert _trim_to_budget([], 1000) == []
|
||||
@@ -0,0 +1,57 @@
|
||||
from pyra.security.injection import InjectionWarning
|
||||
|
||||
|
||||
def test_render_text_response_passthrough():
|
||||
from pyra.chat.renderer import render_text_response
|
||||
result = render_text_response("Hello world")
|
||||
assert result == "Hello world"
|
||||
|
||||
|
||||
def test_render_text_response_redacts_api_key():
|
||||
from pyra.chat.renderer import render_text_response
|
||||
# anthropic-style key
|
||||
result = render_text_response("key: sk-ant-api03-abcdefghijklmnopqrstuvwxyz123456")
|
||||
assert "sk-ant" not in result
|
||||
assert "[REDACTED]" in result
|
||||
|
||||
|
||||
def test_render_text_response_empty_string():
|
||||
from pyra.chat.renderer import render_text_response
|
||||
result = render_text_response("")
|
||||
assert result == ""
|
||||
|
||||
|
||||
def test_render_text_response_whitespace_only():
|
||||
from pyra.chat.renderer import render_text_response
|
||||
result = render_text_response(" ")
|
||||
assert result == " "
|
||||
|
||||
|
||||
def test_render_error_no_exception():
|
||||
from pyra.chat.renderer import render_error
|
||||
render_error("Something went wrong")
|
||||
|
||||
|
||||
def test_render_info_no_exception():
|
||||
from pyra.chat.renderer import render_info
|
||||
render_info("Informational message")
|
||||
|
||||
|
||||
def test_render_system_no_exception():
|
||||
from pyra.chat.renderer import render_system
|
||||
render_system("System message")
|
||||
|
||||
|
||||
def test_render_injection_warning_no_exception():
|
||||
from pyra.chat.renderer import render_injection_warning
|
||||
warnings = [InjectionWarning(pattern_label="instruction-override", matched_text="ignore all")]
|
||||
render_injection_warning(warnings)
|
||||
|
||||
|
||||
def test_render_injection_warning_multiple():
|
||||
from pyra.chat.renderer import render_injection_warning
|
||||
warnings = [
|
||||
InjectionWarning(pattern_label="instruction-override", matched_text="ignore"),
|
||||
InjectionWarning(pattern_label="jailbreak", matched_text="DAN mode"),
|
||||
]
|
||||
render_injection_warning(warnings)
|
||||
@@ -0,0 +1,109 @@
|
||||
import pytest
|
||||
from click.testing import CliRunner
|
||||
|
||||
from pyra.cli import main
|
||||
|
||||
|
||||
def test_memory_write_creates_file(tmp_pyra_home):
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(main, ["memory", "write", "user/note.md", "Hello world"])
|
||||
assert result.exit_code == 0
|
||||
assert (tmp_pyra_home / "memory" / "user" / "note.md").exists()
|
||||
assert "Hello world" in (tmp_pyra_home / "memory" / "user" / "note.md").read_text()
|
||||
|
||||
|
||||
def test_memory_write_updates_db(tmp_pyra_home):
|
||||
runner = CliRunner()
|
||||
runner.invoke(main, ["memory", "write", "user/note.md", "DB test content"])
|
||||
from pyra.memory import database
|
||||
rows = database.list_all()
|
||||
assert any(r["path"] == "user/note.md" for r in rows)
|
||||
|
||||
|
||||
def test_memory_append_adds_content(tmp_pyra_home):
|
||||
runner = CliRunner()
|
||||
runner.invoke(main, ["memory", "write", "user/note.md", "First line"])
|
||||
runner.invoke(main, ["memory", "append", "user/note.md", "Second line"])
|
||||
content = (tmp_pyra_home / "memory" / "user" / "note.md").read_text()
|
||||
assert "First line" in content
|
||||
assert "Second line" in content
|
||||
|
||||
|
||||
def test_memory_read_existing(tmp_pyra_home):
|
||||
from pyra.memory.writer import write_memory
|
||||
write_memory("user/note.md", "Readable content")
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(main, ["memory", "read", "user/note.md"])
|
||||
assert result.exit_code == 0
|
||||
|
||||
|
||||
def test_memory_read_missing_exits_cleanly(tmp_pyra_home):
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(main, ["memory", "read", "user/does_not_exist.md"])
|
||||
assert result.exit_code == 0
|
||||
|
||||
|
||||
def test_memory_read_blocked_path_exits_cleanly(tmp_pyra_home):
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(main, ["memory", "read", "../../../../vault/secrets/api_keys.json"])
|
||||
assert result.exit_code == 0
|
||||
|
||||
|
||||
def test_memory_list_empty(tmp_pyra_home):
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(main, ["memory", "list"])
|
||||
assert result.exit_code == 0
|
||||
|
||||
|
||||
def test_memory_list_populated(tmp_pyra_home):
|
||||
from pyra.memory.writer import write_memory
|
||||
write_memory("user/profile.md", "# Profile")
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(main, ["memory", "list"])
|
||||
assert result.exit_code == 0
|
||||
|
||||
|
||||
def test_plugin_list_no_config(tmp_pyra_home):
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(main, ["plugin", "list"])
|
||||
assert result.exit_code == 0
|
||||
|
||||
|
||||
def _make_config():
|
||||
from pyra.config.schema import PyraConfig, ProviderConfig
|
||||
return PyraConfig(ai=ProviderConfig(provider_id="lmstudio", model="gemma"))
|
||||
|
||||
|
||||
def test_plugin_enable_not_installed(tmp_pyra_home):
|
||||
from pyra.config.manager import save_config
|
||||
save_config(_make_config())
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(main, ["plugin", "enable", "nonexistent"])
|
||||
assert result.exit_code == 0
|
||||
|
||||
|
||||
def test_plugin_enable_and_disable(tmp_pyra_home):
|
||||
from pyra.config.manager import load_config, save_config
|
||||
|
||||
save_config(_make_config())
|
||||
|
||||
plugin_dir = tmp_pyra_home / "plugins" / "myplugin"
|
||||
plugin_dir.mkdir(parents=True, exist_ok=True)
|
||||
(plugin_dir / "manifest.json").write_text('{"name": "myplugin", "version": "1.0"}')
|
||||
|
||||
runner = CliRunner()
|
||||
|
||||
runner.invoke(main, ["plugin", "enable", "myplugin"])
|
||||
cfg = load_config()
|
||||
assert "myplugin" in cfg.plugins.enabled
|
||||
|
||||
runner.invoke(main, ["plugin", "disable", "myplugin"])
|
||||
cfg = load_config()
|
||||
assert "myplugin" not in cfg.plugins.enabled
|
||||
|
||||
|
||||
def test_daemon_commands_exit_cleanly(tmp_pyra_home):
|
||||
runner = CliRunner()
|
||||
for cmd in ["start", "stop", "status", "restart"]:
|
||||
result = runner.invoke(main, ["daemon", cmd])
|
||||
assert result.exit_code == 0, f"daemon {cmd} exited with {result.exit_code}"
|
||||
@@ -0,0 +1,50 @@
|
||||
import os
|
||||
|
||||
|
||||
def test_bootstrap_idempotent(tmp_pyra_home):
|
||||
from pyra.config.dirs import bootstrap
|
||||
bootstrap() # already called by fixture; should not raise
|
||||
|
||||
|
||||
def test_bootstrap_creates_all_directories(tmp_pyra_home):
|
||||
assert (tmp_pyra_home / "memory" / "user").is_dir()
|
||||
assert (tmp_pyra_home / "memory" / "context").is_dir()
|
||||
assert (tmp_pyra_home / "memory" / "knowledge").is_dir()
|
||||
assert (tmp_pyra_home / "vault" / "secrets").is_dir()
|
||||
assert (tmp_pyra_home / "plugins").is_dir()
|
||||
assert (tmp_pyra_home / "logs").is_dir()
|
||||
|
||||
|
||||
def test_bootstrap_creates_template_files(tmp_pyra_home):
|
||||
from pyra.config.dirs import bootstrap
|
||||
bootstrap()
|
||||
assert (tmp_pyra_home / "memory" / "MEMORY_INDEX.md").exists()
|
||||
assert (tmp_pyra_home / "memory" / "user" / "profile.md").exists()
|
||||
|
||||
|
||||
def test_bootstrap_template_content(tmp_pyra_home):
|
||||
from pyra.config.dirs import bootstrap
|
||||
bootstrap()
|
||||
profile = (tmp_pyra_home / "memory" / "user" / "profile.md").read_text()
|
||||
assert "User Profile" in profile
|
||||
|
||||
|
||||
def test_bootstrap_initializes_db(tmp_pyra_home):
|
||||
from pyra.memory import database
|
||||
assert database._DB_PATH.exists()
|
||||
|
||||
|
||||
def test_bootstrap_creates_vault_lock(tmp_pyra_home):
|
||||
assert (tmp_pyra_home / "vault" / ".vault_lock").exists()
|
||||
|
||||
|
||||
def test_bootstrap_sets_config_permissions(tmp_pyra_home):
|
||||
from pyra.config.manager import save_config
|
||||
from pyra.config.schema import PyraConfig, ProviderConfig
|
||||
from pyra.config.dirs import bootstrap
|
||||
save_config(PyraConfig(ai=ProviderConfig(provider_id="lmstudio", model="gemma")))
|
||||
bootstrap()
|
||||
config = tmp_pyra_home / "config.yaml"
|
||||
assert config.exists()
|
||||
if os.name != "nt":
|
||||
assert oct(config.stat().st_mode)[-3:] == "600"
|
||||
@@ -0,0 +1,121 @@
|
||||
import json
|
||||
import os
|
||||
import pytest
|
||||
|
||||
from pyra.plugins.install import (
|
||||
get_bundled_plugins_dir,
|
||||
install_bundled_plugin,
|
||||
list_bundled_plugins,
|
||||
read_manifest,
|
||||
)
|
||||
|
||||
|
||||
def test_get_bundled_plugins_dir_name():
|
||||
d = get_bundled_plugins_dir()
|
||||
assert d.name == "bundled_plugins"
|
||||
assert d.parent.name == "pyra"
|
||||
|
||||
|
||||
def test_list_bundled_plugins_empty_dir(tmp_path):
|
||||
bundled = tmp_path / "bundled"
|
||||
bundled.mkdir()
|
||||
assert list_bundled_plugins(bundled) == []
|
||||
|
||||
|
||||
def test_list_bundled_plugins_missing_dir(tmp_path):
|
||||
assert list_bundled_plugins(tmp_path / "nonexistent") == []
|
||||
|
||||
|
||||
def test_list_bundled_plugins_with_manifest(tmp_path):
|
||||
bundled = tmp_path / "bundled"
|
||||
plugin = bundled / "myplugin"
|
||||
plugin.mkdir(parents=True)
|
||||
(plugin / "manifest.json").write_text('{"name": "myplugin", "version": "1.0"}')
|
||||
assert list_bundled_plugins(bundled) == ["myplugin"]
|
||||
|
||||
|
||||
def test_list_bundled_plugins_without_manifest(tmp_path):
|
||||
bundled = tmp_path / "bundled"
|
||||
(bundled / "myplugin").mkdir(parents=True)
|
||||
assert list_bundled_plugins(bundled) == []
|
||||
|
||||
|
||||
def test_list_bundled_plugins_sorted(tmp_path):
|
||||
bundled = tmp_path / "bundled"
|
||||
for name in ["zebra", "alpha", "mango"]:
|
||||
p = bundled / name
|
||||
p.mkdir(parents=True)
|
||||
(p / "manifest.json").write_text("{}")
|
||||
result = list_bundled_plugins(bundled)
|
||||
assert result == sorted(result)
|
||||
|
||||
|
||||
def test_read_manifest_valid(tmp_path):
|
||||
plugin_dir = tmp_path / "myplugin"
|
||||
plugin_dir.mkdir()
|
||||
(plugin_dir / "manifest.json").write_text(
|
||||
json.dumps({"name": "myplugin", "version": "1.0.0", "description": "Test plugin"})
|
||||
)
|
||||
manifest = read_manifest(plugin_dir)
|
||||
assert manifest["name"] == "myplugin"
|
||||
assert manifest["version"] == "1.0.0"
|
||||
assert manifest["description"] == "Test plugin"
|
||||
|
||||
|
||||
def test_read_manifest_missing(tmp_path):
|
||||
plugin_dir = tmp_path / "myplugin"
|
||||
plugin_dir.mkdir()
|
||||
assert read_manifest(plugin_dir) == {}
|
||||
|
||||
|
||||
def test_install_bundled_plugin_not_found(tmp_path):
|
||||
bundled = tmp_path / "bundled"
|
||||
bundled.mkdir()
|
||||
plugins = tmp_path / "plugins"
|
||||
plugins.mkdir()
|
||||
with pytest.raises(FileNotFoundError):
|
||||
install_bundled_plugin("nonexistent", bundled, plugins)
|
||||
|
||||
|
||||
def test_install_bundled_plugin_missing_manifest(tmp_path):
|
||||
bundled = tmp_path / "bundled"
|
||||
(bundled / "myplugin").mkdir(parents=True)
|
||||
(bundled / "myplugin" / "plugin.py").write_text("# stub")
|
||||
plugins = tmp_path / "plugins"
|
||||
plugins.mkdir()
|
||||
with pytest.raises(FileNotFoundError):
|
||||
install_bundled_plugin("myplugin", bundled, plugins)
|
||||
|
||||
|
||||
def test_install_bundled_plugin_success(tmp_path):
|
||||
bundled = tmp_path / "bundled"
|
||||
src = bundled / "myplugin"
|
||||
src.mkdir(parents=True)
|
||||
(src / "manifest.json").write_text('{"name": "myplugin", "version": "1.0"}')
|
||||
(src / "plugin.py").write_text("# stub plugin")
|
||||
|
||||
plugins = tmp_path / "plugins"
|
||||
plugins.mkdir()
|
||||
|
||||
install_bundled_plugin("myplugin", bundled, plugins)
|
||||
|
||||
dest = plugins / "myplugin"
|
||||
assert dest.is_dir()
|
||||
assert (dest / "manifest.json").exists()
|
||||
assert (dest / "plugin.py").exists()
|
||||
if os.name != "nt":
|
||||
assert oct((dest / "plugin.py").stat().st_mode)[-3:] == "600"
|
||||
|
||||
|
||||
def test_install_bundled_plugin_overwrites(tmp_path):
|
||||
bundled = tmp_path / "bundled"
|
||||
src = bundled / "myplugin"
|
||||
src.mkdir(parents=True)
|
||||
(src / "manifest.json").write_text('{"name": "myplugin", "version": "1.0"}')
|
||||
|
||||
plugins = tmp_path / "plugins"
|
||||
plugins.mkdir()
|
||||
|
||||
install_bundled_plugin("myplugin", bundled, plugins)
|
||||
install_bundled_plugin("myplugin", bundled, plugins) # second install should not raise
|
||||
assert (plugins / "myplugin").is_dir()
|
||||
@@ -0,0 +1,49 @@
|
||||
import os
|
||||
|
||||
|
||||
def test_ensure_dir_creates_directory(tmp_path):
|
||||
from pyra.utils.paths import ensure_dir
|
||||
target = tmp_path / "new_dir"
|
||||
result = ensure_dir(target, 0o700)
|
||||
assert target.exists()
|
||||
assert target.is_dir()
|
||||
|
||||
|
||||
def test_ensure_dir_returns_path(tmp_path):
|
||||
from pyra.utils.paths import ensure_dir
|
||||
target = tmp_path / "new_dir"
|
||||
result = ensure_dir(target)
|
||||
assert result == target
|
||||
|
||||
|
||||
def test_ensure_dir_idempotent(tmp_path):
|
||||
from pyra.utils.paths import ensure_dir
|
||||
target = tmp_path / "existing_dir"
|
||||
ensure_dir(target, 0o700)
|
||||
ensure_dir(target, 0o700) # should not raise
|
||||
assert target.is_dir()
|
||||
|
||||
|
||||
def test_ensure_dir_creates_nested(tmp_path):
|
||||
from pyra.utils.paths import ensure_dir
|
||||
target = tmp_path / "a" / "b" / "c"
|
||||
ensure_dir(target, 0o700)
|
||||
assert target.exists()
|
||||
|
||||
|
||||
def test_safe_chmod_sets_permissions(tmp_path):
|
||||
from pyra.utils.paths import safe_chmod
|
||||
f = tmp_path / "test.txt"
|
||||
f.write_text("content")
|
||||
safe_chmod(f, 0o600)
|
||||
if os.name != "nt":
|
||||
assert oct(f.stat().st_mode)[-3:] == "600"
|
||||
|
||||
|
||||
def test_safe_chmod_different_modes(tmp_path):
|
||||
from pyra.utils.paths import safe_chmod
|
||||
f = tmp_path / "test.txt"
|
||||
f.write_text("content")
|
||||
safe_chmod(f, 0o644)
|
||||
if os.name != "nt":
|
||||
assert oct(f.stat().st_mode)[-3:] == "644"
|
||||
Reference in New Issue
Block a user