"""Unit tests for the IPC layer.""" from __future__ import annotations import asyncio import os import sys import tempfile from pathlib import Path import pytest from pyra.daemon.ipc import ( IpcClient, IpcMessage, IpcResponse, IpcServer, decode_message, encode_message, is_unix_socket, ) @pytest.fixture def sock_path(): """Short socket path that fits within macOS's 104-char AF_UNIX limit.""" with tempfile.TemporaryDirectory(dir="/tmp") as d: yield Path(d) / "t.sock" # ── Protocol encode / decode ────────────────────────────────────────────────── def test_encode_appends_newline() -> None: data = encode_message({"cmd": "ping"}) assert data.endswith(b"\n") def test_encode_is_valid_json() -> None: import json data = encode_message({"cmd": "status", "extra": 42}) assert json.loads(data) == {"cmd": "status", "extra": 42} def test_decode_roundtrip() -> None: msg: IpcMessage = {"cmd": "stop"} assert decode_message(encode_message(msg)) == msg def test_decode_strips_newline() -> None: assert decode_message(b'{"cmd": "stop"}\n')["cmd"] == "stop" def test_decode_raises_on_bad_json() -> None: with pytest.raises(ValueError, match="Invalid IPC message"): decode_message(b"not json\n") def test_decode_raises_on_empty_line() -> None: with pytest.raises(ValueError): decode_message(b"\n") # ── is_unix_socket ──────────────────────────────────────────────────────────── def test_is_unix_socket_matches_platform() -> None: if sys.platform == "win32": assert not is_unix_socket() else: assert is_unix_socket() # ── Server + client roundtrip (Unix only) ───────────────────────────────────── @pytest.mark.skipif(sys.platform == "win32", reason="Unix socket test") async def test_server_client_ping(sock_path: Path) -> None: async def handler(msg: IpcMessage) -> IpcResponse: return {"ok": True, "data": {"pong": True}} server = IpcServer(sock_path, handler) await server.start() try: resp = await IpcClient(sock_path).send({"cmd": "ping"}) assert resp["ok"] is True assert resp["data"]["pong"] is True finally: await server.stop() @pytest.mark.skipif(sys.platform == "win32", reason="Unix socket test") async def test_server_echoes_error_for_bad_json(sock_path: Path) -> None: async def handler(msg: IpcMessage) -> IpcResponse: return {"ok": True, "data": {}} server = IpcServer(sock_path, handler) await server.start() try: reader, writer = await asyncio.open_unix_connection(str(sock_path)) writer.write(b"not valid json\n") await writer.drain() line = await asyncio.wait_for(reader.readline(), timeout=3.0) resp = decode_message(line) assert resp["ok"] is False assert "error" in resp["data"] finally: try: writer.close() except Exception: pass await server.stop() @pytest.mark.skipif(sys.platform == "win32", reason="Unix socket test") async def test_handler_response_returned_to_client(sock_path: Path) -> None: async def handler(msg: IpcMessage) -> IpcResponse: if msg.get("cmd") == "status": return {"ok": True, "data": {"uptime": 99.0}} return {"ok": False, "data": {"error": "unknown"}} server = IpcServer(sock_path, handler) await server.start() try: resp = await IpcClient(sock_path).send({"cmd": "status"}) assert resp["ok"] is True assert resp["data"]["uptime"] == 99.0 resp2 = await IpcClient(sock_path).send({"cmd": "bogus"}) assert resp2["ok"] is False finally: await server.stop() @pytest.mark.skipif(sys.platform == "win32", reason="Unix socket test") async def test_client_raises_when_no_server(sock_path: Path) -> None: client = IpcClient(sock_path) with pytest.raises((ConnectionRefusedError, FileNotFoundError, OSError)): await client.send({"cmd": "ping"}) @pytest.mark.skipif(sys.platform == "win32", reason="Unix socket test") async def test_socket_file_chmod_600(sock_path: Path) -> None: async def handler(msg: IpcMessage) -> IpcResponse: return {"ok": True, "data": {}} server = IpcServer(sock_path, handler) await server.start() try: mode = oct(sock_path.stat().st_mode & 0o777) assert mode == oct(0o600), f"Expected 0o600, got {mode}" finally: await server.stop() @pytest.mark.skipif(sys.platform == "win32", reason="Unix socket test") async def test_stop_removes_socket_file(sock_path: Path) -> None: async def handler(msg: IpcMessage) -> IpcResponse: return {"ok": True, "data": {}} server = IpcServer(sock_path, handler) await server.start() assert sock_path.exists() await server.stop() assert not sock_path.exists()