feat(memory): sandboxed markdown memory system
- memory/index.py: auto-regenerate MEMORY_INDEX.md on every write - memory/reader.py: list_memories(), read_memory(), load_context_for_session() all go through assert_safe_path() + relative_to check - memory/writer.py: write_memory(), append_memory() — relative names only, no absolute paths or traversal, calls update_index() after every write Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,30 @@
|
|||||||
|
import datetime
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from pyra.utils.paths import pyra_home, safe_chmod
|
||||||
|
|
||||||
|
_MEMORY_ROOT = pyra_home() / "memory"
|
||||||
|
_INDEX_FILE = _MEMORY_ROOT / "MEMORY_INDEX.md"
|
||||||
|
|
||||||
|
|
||||||
|
def update_index() -> None:
|
||||||
|
files = sorted(_MEMORY_ROOT.rglob("*.md"))
|
||||||
|
files = [f for f in files if f.name != "MEMORY_INDEX.md"]
|
||||||
|
|
||||||
|
rows: list[str] = []
|
||||||
|
for f in files:
|
||||||
|
rel = f.relative_to(_MEMORY_ROOT)
|
||||||
|
category = rel.parts[0] if len(rel.parts) > 1 else "root"
|
||||||
|
mtime = datetime.datetime.fromtimestamp(f.stat().st_mtime).strftime("%Y-%m-%d %H:%M")
|
||||||
|
rows.append(f"| {rel} | {category} | {mtime} |")
|
||||||
|
|
||||||
|
table = "\n".join(rows) if rows else "| _(no memory files)_ | — | — |"
|
||||||
|
content = (
|
||||||
|
"# Memory Index\n\n"
|
||||||
|
"| File | Category | Last Modified |\n"
|
||||||
|
"|------|----------|---------------|\n"
|
||||||
|
f"{table}\n\n"
|
||||||
|
"_Auto-maintained by Pyra. Do not edit manually._\n"
|
||||||
|
)
|
||||||
|
_INDEX_FILE.write_text(content)
|
||||||
|
safe_chmod(_INDEX_FILE, 0o600)
|
||||||
@@ -0,0 +1,69 @@
|
|||||||
|
import datetime
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from pyra.security.boundaries import assert_safe_path
|
||||||
|
from pyra.utils.paths import pyra_home
|
||||||
|
|
||||||
|
_MEMORY_ROOT = pyra_home() / "memory"
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class MemoryFile:
|
||||||
|
name: str
|
||||||
|
path: Path
|
||||||
|
category: str
|
||||||
|
size_bytes: int
|
||||||
|
modified: datetime.datetime
|
||||||
|
|
||||||
|
|
||||||
|
def list_memories() -> list[MemoryFile]:
|
||||||
|
files = sorted(_MEMORY_ROOT.rglob("*.md"))
|
||||||
|
result: list[MemoryFile] = []
|
||||||
|
for f in files:
|
||||||
|
assert_safe_path(f)
|
||||||
|
rel = f.relative_to(_MEMORY_ROOT)
|
||||||
|
category = rel.parts[0] if len(rel.parts) > 1 else "root"
|
||||||
|
stat = f.stat()
|
||||||
|
result.append(MemoryFile(
|
||||||
|
name=str(rel),
|
||||||
|
path=f,
|
||||||
|
category=category,
|
||||||
|
size_bytes=stat.st_size,
|
||||||
|
modified=datetime.datetime.fromtimestamp(stat.st_mtime),
|
||||||
|
))
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def read_memory(name: str) -> str:
|
||||||
|
path = (_MEMORY_ROOT / name).resolve()
|
||||||
|
assert_safe_path(path)
|
||||||
|
|
||||||
|
# Confirm path stays within memory root
|
||||||
|
try:
|
||||||
|
path.relative_to(_MEMORY_ROOT.resolve())
|
||||||
|
except ValueError:
|
||||||
|
raise PermissionError(f"Path escapes memory directory: {name!r}")
|
||||||
|
|
||||||
|
if not path.exists():
|
||||||
|
raise FileNotFoundError(f"Memory file not found: {name!r}")
|
||||||
|
|
||||||
|
return path.read_text()
|
||||||
|
|
||||||
|
|
||||||
|
def load_context_for_session() -> str:
|
||||||
|
memories = list_memories()
|
||||||
|
if not memories:
|
||||||
|
return ""
|
||||||
|
|
||||||
|
parts: list[str] = ["## Long-term Memory\n"]
|
||||||
|
for mem in memories:
|
||||||
|
if mem.name == "MEMORY_INDEX.md":
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
content = read_memory(mem.name)
|
||||||
|
parts.append(f"### {mem.name}\n{content}\n")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return "\n".join(parts)
|
||||||
@@ -0,0 +1,43 @@
|
|||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from pyra.memory.index import update_index
|
||||||
|
from pyra.security.boundaries import assert_safe_path
|
||||||
|
from pyra.utils.paths import pyra_home, safe_chmod
|
||||||
|
|
||||||
|
_MEMORY_ROOT = pyra_home() / "memory"
|
||||||
|
|
||||||
|
|
||||||
|
def _resolve_and_validate(name: str) -> Path:
|
||||||
|
if name.startswith("/") or name.startswith("~"):
|
||||||
|
raise ValueError(f"Memory name must be relative, got: {name!r}")
|
||||||
|
path = (_MEMORY_ROOT / name).resolve()
|
||||||
|
assert_safe_path(path)
|
||||||
|
try:
|
||||||
|
path.relative_to(_MEMORY_ROOT.resolve())
|
||||||
|
except ValueError:
|
||||||
|
raise PermissionError(f"Path escapes memory directory: {name!r}")
|
||||||
|
if path.suffix.lower() != ".md":
|
||||||
|
path = path.with_suffix(".md")
|
||||||
|
return path
|
||||||
|
|
||||||
|
|
||||||
|
def write_memory(name: str, content: str) -> Path:
|
||||||
|
path = _resolve_and_validate(name)
|
||||||
|
path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
path.write_text(content)
|
||||||
|
safe_chmod(path, 0o600)
|
||||||
|
update_index()
|
||||||
|
return path
|
||||||
|
|
||||||
|
|
||||||
|
def append_memory(name: str, content: str) -> Path:
|
||||||
|
path = _resolve_and_validate(name)
|
||||||
|
if path.exists():
|
||||||
|
existing = path.read_text()
|
||||||
|
path.write_text(existing.rstrip() + "\n\n" + content)
|
||||||
|
else:
|
||||||
|
path.write_text(content)
|
||||||
|
path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
safe_chmod(path, 0o600)
|
||||||
|
update_index()
|
||||||
|
return path
|
||||||
Reference in New Issue
Block a user