Compare commits

3 Commits

Author SHA1 Message Date
curo1305 399ed8b5df test: add memory database tests and update conftest for DB isolation
conftest patches mdb._DB_PATH and calls init_db() after directory creation
so all existing tests continue to work with the new DB layer. New
test_memory_db.py covers upsert, search, remove, migration, and the
updated list_memories/lookup_memories integration paths.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-18 15:23:57 +02:00
curo1305 b9b0918d3a feat(memory): wire database into reader, writer, and bootstrap
- reader: list_memories() queries memory_meta; lookup_memories() uses FTS5 with
  fallback to JSON index substring search
- writer: write_memory() and append_memory() upsert to DB after every file write
- dirs: bootstrap() calls init_db() + migrate_from_files() on startup

Existing .md files remain the canonical store; SQLite is the search index.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-18 15:23:49 +02:00
curo1305 45e6ec32ec feat(memory): add SQLite+FTS5 database layer
New memory/database.py with memory_meta table (path, category, size_bytes,
modified, summary, keywords, embedding BLOB reserved for Stage 8) and
memory_fts virtual table for full-text search. Public API: init_db, upsert,
remove, search, list_all, migrate_from_files.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-18 15:23:43 +02:00
6 changed files with 380 additions and 7 deletions
+4
View File
@@ -43,6 +43,10 @@ def bootstrap() -> None:
_create_if_missing(home / "memory" / "MEMORY_INDEX.md", _MEMORY_INDEX_TEMPLATE, 0o600)
_create_if_missing(home / "memory" / "user" / "profile.md", _USER_PROFILE_TEMPLATE, 0o600)
from pyra.memory.database import init_db, migrate_from_files
init_db()
migrate_from_files()
config = home / "config.yaml"
if config.exists():
safe_chmod(config, 0o600)
+194
View File
@@ -0,0 +1,194 @@
import datetime
import json
import sqlite3
from contextlib import contextmanager
from pathlib import Path
from pyra.memory import _MEMORY_ROOT
from pyra.utils.paths import safe_chmod
_DB_PATH = _MEMORY_ROOT / "memory.db"
_EXCLUDED = {"MEMORY_INDEX.md", "memory_index.json"}
@contextmanager
def _connect():
conn = sqlite3.connect(str(_DB_PATH))
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def init_db() -> None:
with _connect() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS memory_meta (
id INTEGER PRIMARY KEY,
path TEXT UNIQUE NOT NULL,
category TEXT NOT NULL DEFAULT 'root',
size_bytes INTEGER NOT NULL DEFAULT 0,
modified TEXT NOT NULL DEFAULT '',
summary TEXT NOT NULL DEFAULT '',
keywords TEXT NOT NULL DEFAULT '[]',
embedding BLOB DEFAULT NULL
)
""")
conn.execute("""
CREATE VIRTUAL TABLE IF NOT EXISTS memory_fts USING fts5(
path UNINDEXED,
body,
summary,
keywords
)
""")
safe_chmod(_DB_PATH, 0o600)
def upsert(
path: str,
*,
content: str,
category: str = "root",
size_bytes: int = 0,
modified: str = "",
summary: str = "",
keywords: list[str] | None = None,
) -> None:
kw_json = json.dumps(keywords or [])
kw_text = " ".join(keywords or [])
with _connect() as conn:
conn.execute(
"""INSERT INTO memory_meta (path, category, size_bytes, modified, summary, keywords)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(path) DO UPDATE SET
category=excluded.category,
size_bytes=excluded.size_bytes,
modified=excluded.modified,
summary=excluded.summary,
keywords=excluded.keywords""",
(path, category, size_bytes, modified, summary, kw_json),
)
conn.execute("DELETE FROM memory_fts WHERE path = ?", (path,))
conn.execute(
"INSERT INTO memory_fts (path, body, summary, keywords) VALUES (?, ?, ?, ?)",
(path, content, summary, kw_text),
)
def remove(path: str) -> None:
with _connect() as conn:
conn.execute("DELETE FROM memory_meta WHERE path = ?", (path,))
conn.execute("DELETE FROM memory_fts WHERE path = ?", (path,))
def search(query: str, limit: int = 20) -> list[dict]:
"""FTS5 full-text search; returns [{file, summary, keywords, snippet}]."""
if not _DB_PATH.exists():
return []
try:
with _connect() as conn:
fts_rows = conn.execute(
"""SELECT path, snippet(memory_fts, 1, '[', ']', '', 32) AS snip
FROM memory_fts
WHERE memory_fts MATCH ?
ORDER BY rank
LIMIT ?""",
(query, limit),
).fetchall()
if not fts_rows:
return []
paths = [r["path"] for r in fts_rows]
snippets = {r["path"]: r["snip"] for r in fts_rows}
placeholders = ",".join("?" * len(paths))
meta_rows = conn.execute(
f"SELECT path, summary, keywords FROM memory_meta WHERE path IN ({placeholders})",
paths,
).fetchall()
meta = {
r["path"]: {
"summary": r["summary"],
"keywords": json.loads(r["keywords"] or "[]"),
}
for r in meta_rows
}
return [
{
"file": p,
"summary": meta.get(p, {}).get("summary", ""),
"keywords": meta.get(p, {}).get("keywords", []),
"snippet": snippets.get(p, ""),
}
for p in paths
if p in meta
]
except sqlite3.OperationalError:
return []
def list_all() -> list[dict]:
"""Return all rows from memory_meta ordered by path."""
if not _DB_PATH.exists():
return []
with _connect() as conn:
rows = conn.execute(
"SELECT path, category, size_bytes, modified, summary, keywords "
"FROM memory_meta ORDER BY path"
).fetchall()
return [
{
"path": r["path"],
"category": r["category"],
"size_bytes": r["size_bytes"],
"modified": r["modified"],
"summary": r["summary"],
"keywords": json.loads(r["keywords"] or "[]"),
}
for r in rows
]
def migrate_from_files() -> None:
"""Populate DB from existing .md files on first run; no-op if DB already has entries."""
if not _DB_PATH.exists():
return
with _connect() as conn:
count = conn.execute("SELECT COUNT(*) FROM memory_meta").fetchone()[0]
if count > 0:
return
json_index_path = _MEMORY_ROOT / "memory_index.json"
existing: dict = {}
if json_index_path.exists():
try:
existing = json.loads(json_index_path.read_text())
except (json.JSONDecodeError, OSError):
pass
for f in sorted(_MEMORY_ROOT.rglob("*.md")):
if f.name in _EXCLUDED:
continue
rel = f.relative_to(_MEMORY_ROOT)
rel_key = rel.as_posix()
category = rel.parts[0] if len(rel.parts) > 1 else "root"
stat = f.stat()
mtime = datetime.datetime.fromtimestamp(stat.st_mtime).isoformat(timespec="seconds")
prev = existing.get(rel_key, {})
try:
content = f.read_text()
except OSError:
content = ""
upsert(
rel_key,
content=content,
category=category,
size_bytes=stat.st_size,
modified=mtime,
summary=prev.get("summary", ""),
keywords=prev.get("keywords", []),
)
+32 -5
View File
@@ -18,7 +18,8 @@ class MemoryFile:
modified: datetime.datetime
def list_memories() -> list[MemoryFile]:
def _scan_files() -> list[MemoryFile]:
"""Fallback: scan .md files directly (used when DB is unavailable)."""
files = sorted(_MEMORY_ROOT.rglob("*.md"))
result: list[MemoryFile] = []
for f in files:
@@ -36,6 +37,27 @@ def list_memories() -> list[MemoryFile]:
return result
def list_memories() -> list[MemoryFile]:
from pyra.memory import database
rows = database.list_all()
if not rows:
return _scan_files()
result: list[MemoryFile] = []
for row in rows:
try:
modified = datetime.datetime.fromisoformat(row["modified"])
except (ValueError, TypeError):
modified = datetime.datetime.now()
result.append(MemoryFile(
name=row["path"],
path=_MEMORY_ROOT / row["path"],
category=row["category"],
size_bytes=row["size_bytes"],
modified=modified,
))
return result
def read_memory(name: str) -> str:
path = (_MEMORY_ROOT / name).resolve()
assert_safe_path(path)
@@ -63,19 +85,24 @@ def read_index() -> dict:
def lookup_memories(query: str) -> list[dict]:
"""Case-insensitive substring search over summary text and keywords."""
"""Full-text search via FTS5; falls back to JSON index substring search."""
from pyra.memory import database
results = database.search(query)
if results:
return results
# Fallback: case-insensitive substring search over JSON index
q = query.lower()
results: list[dict] = []
fallback: list[dict] = []
for rel_path, entry in read_index().items():
summary = entry.get("summary", "").lower()
keywords = [k.lower() for k in entry.get("keywords", [])]
if q in summary or any(q in k or k in q for k in keywords):
results.append({
fallback.append({
"file": rel_path,
"summary": entry.get("summary", ""),
"keywords": entry.get("keywords", []),
})
return results
return fallback
def load_context_for_session() -> str:
+26 -2
View File
@@ -1,3 +1,4 @@
import datetime
from pathlib import Path
from pyra.memory import _MEMORY_ROOT
@@ -20,6 +21,25 @@ def _resolve_and_validate(name: str) -> Path:
return path
def _upsert_to_db(path: Path, content: str, summary: str = "", keywords: list[str] | None = None) -> None:
from pyra.memory import database
if not database._DB_PATH.exists():
return
rel = path.relative_to(_MEMORY_ROOT).as_posix()
category = rel.split("/")[0] if "/" in rel else "root"
stat = path.stat()
mtime = datetime.datetime.fromtimestamp(stat.st_mtime).isoformat(timespec="seconds")
database.upsert(
rel,
content=content,
category=category,
size_bytes=stat.st_size,
modified=mtime,
summary=summary,
keywords=keywords,
)
def write_memory(
name: str,
content: str,
@@ -34,6 +54,7 @@ def write_memory(
if summary or keywords:
rel_key = path.relative_to(_MEMORY_ROOT).as_posix()
update_json_entry(rel_key, summary, keywords or [])
_upsert_to_db(path, content, summary, keywords)
return path
@@ -42,9 +63,12 @@ def append_memory(name: str, content: str) -> Path:
path.parent.mkdir(parents=True, exist_ok=True)
if path.exists():
existing = path.read_text()
path.write_text(existing.rstrip() + "\n\n" + content)
new_content = existing.rstrip() + "\n\n" + content
path.write_text(new_content)
else:
path.write_text(content)
new_content = content
path.write_text(new_content)
safe_chmod(path, 0o600)
update_index()
_upsert_to_db(path, new_content)
return path
+5
View File
@@ -28,6 +28,7 @@ def tmp_pyra_home(tmp_path, monkeypatch):
import pyra.plugins.loader as pl
import pyra.plugins.executor as pe
import pyra.memory.database as mdb
b.VAULT_PATH = fake_home / "vault"
b.BLOCKED_PREFIXES = [b.VAULT_PATH]
@@ -35,6 +36,8 @@ def tmp_pyra_home(tmp_path, monkeypatch):
mi._INDEX_FILE = fake_home / "memory" / "MEMORY_INDEX.md"
mr._MEMORY_ROOT = fake_home / "memory"
mw._MEMORY_ROOT = fake_home / "memory"
mdb._DB_PATH = fake_home / "memory" / "memory.db"
mdb._MEMORY_ROOT = fake_home / "memory"
vr._KEYS_FILE = fake_home / "vault" / "secrets" / "api_keys.json"
vw._KEYS_FILE = fake_home / "vault" / "secrets" / "api_keys.json"
si._LOG_FILE = fake_home / "security.log"
@@ -52,6 +55,8 @@ def tmp_pyra_home(tmp_path, monkeypatch):
(fake_home / "plugins").mkdir()
(fake_home / "logs").mkdir()
mdb.init_db()
# Reset plugin registry singleton so tests don't share state
from pyra.plugins.registry import PluginRegistry
PluginRegistry.reset()
+119
View File
@@ -0,0 +1,119 @@
import json
def test_init_creates_db(tmp_pyra_home):
from pyra.memory import database
assert database._DB_PATH.exists()
def test_upsert_and_list(tmp_pyra_home):
from pyra.memory import database
database.upsert(
"user/profile.md",
content="# Profile\n\nI am a developer.",
category="user",
size_bytes=30,
modified="2026-05-18T10:00:00",
summary="Developer profile",
keywords=["developer", "profile"],
)
rows = database.list_all()
assert len(rows) == 1
row = rows[0]
assert row["path"] == "user/profile.md"
assert row["category"] == "user"
assert row["summary"] == "Developer profile"
assert row["keywords"] == ["developer", "profile"]
def test_upsert_overwrites(tmp_pyra_home):
from pyra.memory import database
database.upsert("context/notes.md", content="old", category="context",
modified="2026-05-18T10:00:00")
database.upsert("context/notes.md", content="new", category="context",
summary="updated", modified="2026-05-18T11:00:00")
rows = database.list_all()
assert len(rows) == 1
assert rows[0]["summary"] == "updated"
def test_remove(tmp_pyra_home):
from pyra.memory import database
database.upsert("knowledge/facts.md", content="Some facts.", category="knowledge",
modified="2026-05-18T10:00:00")
assert len(database.list_all()) == 1
database.remove("knowledge/facts.md")
assert len(database.list_all()) == 0
def test_search_fts(tmp_pyra_home):
from pyra.memory import database
database.upsert("user/profile.md", content="I enjoy building AI tools.",
category="user", modified="2026-05-18T10:00:00",
summary="Personal bio", keywords=["AI", "tools"])
database.upsert("knowledge/cooking.md", content="Pasta recipes and techniques.",
category="knowledge", modified="2026-05-18T10:00:00",
summary="Cooking notes", keywords=["pasta", "cooking"])
results = database.search("AI tools")
assert len(results) == 1
assert results[0]["file"] == "user/profile.md"
assert results[0]["summary"] == "Personal bio"
assert "AI" in results[0]["keywords"]
def test_search_no_match(tmp_pyra_home):
from pyra.memory import database
database.upsert("user/profile.md", content="Hello world.", category="user",
modified="2026-05-18T10:00:00")
results = database.search("xyzzy")
assert results == []
def test_search_invalid_query_returns_empty(tmp_pyra_home):
from pyra.memory import database
database.upsert("user/profile.md", content="Hello world.", category="user",
modified="2026-05-18T10:00:00")
# FTS5 special chars that could raise OperationalError are handled gracefully
results = database.search('"unclosed quote')
assert isinstance(results, list)
def test_migrate_from_files(tmp_pyra_home):
from pyra.memory.writer import write_memory
from pyra.memory import database
write_memory("user/note.md", "Migration test content.")
# Wipe DB to simulate fresh state before migration
database.remove("user/note.md")
assert database.list_all() == []
# Manually call migrate — it should re-populate from the .md file
database.migrate_from_files()
rows = database.list_all()
assert any(r["path"] == "user/note.md" for r in rows)
def test_list_memories_uses_db(tmp_pyra_home):
from pyra.memory.writer import write_memory
from pyra.memory.reader import list_memories
write_memory("context/project.md", "# Project\n\nActive tasks.")
memories = list_memories()
names = [m.name for m in memories]
assert "context/project.md" in names
def test_lookup_memories_uses_fts(tmp_pyra_home):
from pyra.memory.writer import write_memory
from pyra.memory.reader import lookup_memories
write_memory(
"knowledge/python.md",
"Python is a high-level programming language.",
summary="Python overview",
keywords=["python", "programming"],
)
results = lookup_memories("programming language")
assert len(results) >= 1
assert results[0]["file"] == "knowledge/python.md"