d42b8b4a47
Newline-delimited JSON over Unix socket (macOS/Linux, chmod 600, UID-checked via SO_PEERCRED/getpeereid) with TCP loopback fallback on Windows. Port written to ~/.pyra/daemon.port for Windows clients. Sync send_command() wrapper for CLI. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
163 lines
5.1 KiB
Python
163 lines
5.1 KiB
Python
"""Unit tests for the IPC layer."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from pyra.daemon.ipc import (
|
|
IpcClient,
|
|
IpcMessage,
|
|
IpcResponse,
|
|
IpcServer,
|
|
decode_message,
|
|
encode_message,
|
|
is_unix_socket,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def sock_path():
|
|
"""Short socket path that fits within macOS's 104-char AF_UNIX limit."""
|
|
with tempfile.TemporaryDirectory(dir="/tmp") as d:
|
|
yield Path(d) / "t.sock"
|
|
|
|
|
|
# ── Protocol encode / decode ──────────────────────────────────────────────────
|
|
|
|
def test_encode_appends_newline() -> None:
|
|
data = encode_message({"cmd": "ping"})
|
|
assert data.endswith(b"\n")
|
|
|
|
|
|
def test_encode_is_valid_json() -> None:
|
|
import json
|
|
data = encode_message({"cmd": "status", "extra": 42})
|
|
assert json.loads(data) == {"cmd": "status", "extra": 42}
|
|
|
|
|
|
def test_decode_roundtrip() -> None:
|
|
msg: IpcMessage = {"cmd": "stop"}
|
|
assert decode_message(encode_message(msg)) == msg
|
|
|
|
|
|
def test_decode_strips_newline() -> None:
|
|
assert decode_message(b'{"cmd": "stop"}\n')["cmd"] == "stop"
|
|
|
|
|
|
def test_decode_raises_on_bad_json() -> None:
|
|
with pytest.raises(ValueError, match="Invalid IPC message"):
|
|
decode_message(b"not json\n")
|
|
|
|
|
|
def test_decode_raises_on_empty_line() -> None:
|
|
with pytest.raises(ValueError):
|
|
decode_message(b"\n")
|
|
|
|
|
|
# ── is_unix_socket ────────────────────────────────────────────────────────────
|
|
|
|
def test_is_unix_socket_matches_platform() -> None:
|
|
if sys.platform == "win32":
|
|
assert not is_unix_socket()
|
|
else:
|
|
assert is_unix_socket()
|
|
|
|
|
|
# ── Server + client roundtrip (Unix only) ─────────────────────────────────────
|
|
|
|
@pytest.mark.skipif(sys.platform == "win32", reason="Unix socket test")
|
|
async def test_server_client_ping(sock_path: Path) -> None:
|
|
async def handler(msg: IpcMessage) -> IpcResponse:
|
|
return {"ok": True, "data": {"pong": True}}
|
|
|
|
server = IpcServer(sock_path, handler)
|
|
await server.start()
|
|
try:
|
|
resp = await IpcClient(sock_path).send({"cmd": "ping"})
|
|
assert resp["ok"] is True
|
|
assert resp["data"]["pong"] is True
|
|
finally:
|
|
await server.stop()
|
|
|
|
|
|
@pytest.mark.skipif(sys.platform == "win32", reason="Unix socket test")
|
|
async def test_server_echoes_error_for_bad_json(sock_path: Path) -> None:
|
|
async def handler(msg: IpcMessage) -> IpcResponse:
|
|
return {"ok": True, "data": {}}
|
|
|
|
server = IpcServer(sock_path, handler)
|
|
await server.start()
|
|
try:
|
|
reader, writer = await asyncio.open_unix_connection(str(sock_path))
|
|
writer.write(b"not valid json\n")
|
|
await writer.drain()
|
|
line = await asyncio.wait_for(reader.readline(), timeout=3.0)
|
|
resp = decode_message(line)
|
|
assert resp["ok"] is False
|
|
assert "error" in resp["data"]
|
|
finally:
|
|
try:
|
|
writer.close()
|
|
except Exception:
|
|
pass
|
|
await server.stop()
|
|
|
|
|
|
@pytest.mark.skipif(sys.platform == "win32", reason="Unix socket test")
|
|
async def test_handler_response_returned_to_client(sock_path: Path) -> None:
|
|
async def handler(msg: IpcMessage) -> IpcResponse:
|
|
if msg.get("cmd") == "status":
|
|
return {"ok": True, "data": {"uptime": 99.0}}
|
|
return {"ok": False, "data": {"error": "unknown"}}
|
|
|
|
server = IpcServer(sock_path, handler)
|
|
await server.start()
|
|
try:
|
|
resp = await IpcClient(sock_path).send({"cmd": "status"})
|
|
assert resp["ok"] is True
|
|
assert resp["data"]["uptime"] == 99.0
|
|
|
|
resp2 = await IpcClient(sock_path).send({"cmd": "bogus"})
|
|
assert resp2["ok"] is False
|
|
finally:
|
|
await server.stop()
|
|
|
|
|
|
@pytest.mark.skipif(sys.platform == "win32", reason="Unix socket test")
|
|
async def test_client_raises_when_no_server(sock_path: Path) -> None:
|
|
client = IpcClient(sock_path)
|
|
with pytest.raises((ConnectionRefusedError, FileNotFoundError, OSError)):
|
|
await client.send({"cmd": "ping"})
|
|
|
|
|
|
@pytest.mark.skipif(sys.platform == "win32", reason="Unix socket test")
|
|
async def test_socket_file_chmod_600(sock_path: Path) -> None:
|
|
async def handler(msg: IpcMessage) -> IpcResponse:
|
|
return {"ok": True, "data": {}}
|
|
|
|
server = IpcServer(sock_path, handler)
|
|
await server.start()
|
|
try:
|
|
mode = oct(sock_path.stat().st_mode & 0o777)
|
|
assert mode == oct(0o600), f"Expected 0o600, got {mode}"
|
|
finally:
|
|
await server.stop()
|
|
|
|
|
|
@pytest.mark.skipif(sys.platform == "win32", reason="Unix socket test")
|
|
async def test_stop_removes_socket_file(sock_path: Path) -> None:
|
|
async def handler(msg: IpcMessage) -> IpcResponse:
|
|
return {"ok": True, "data": {}}
|
|
|
|
server = IpcServer(sock_path, handler)
|
|
await server.start()
|
|
assert sock_path.exists()
|
|
await server.stop()
|
|
assert not sock_path.exists()
|