import datetime import json import sqlite3 from contextlib import contextmanager from pathlib import Path from pyra.memory import _MEMORY_ROOT from pyra.utils.paths import safe_chmod _DB_PATH = _MEMORY_ROOT / "memory.db" _EXCLUDED = {"MEMORY_INDEX.md", "memory_index.json"} @contextmanager def _connect(): conn = sqlite3.connect(str(_DB_PATH)) conn.row_factory = sqlite3.Row try: yield conn conn.commit() except Exception: conn.rollback() raise finally: conn.close() def init_db() -> None: with _connect() as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS memory_meta ( id INTEGER PRIMARY KEY, path TEXT UNIQUE NOT NULL, category TEXT NOT NULL DEFAULT 'root', size_bytes INTEGER NOT NULL DEFAULT 0, modified TEXT NOT NULL DEFAULT '', summary TEXT NOT NULL DEFAULT '', keywords TEXT NOT NULL DEFAULT '[]', embedding BLOB DEFAULT NULL ) """) conn.execute(""" CREATE VIRTUAL TABLE IF NOT EXISTS memory_fts USING fts5( path UNINDEXED, body, summary, keywords ) """) safe_chmod(_DB_PATH, 0o600) def upsert( path: str, *, content: str, category: str = "root", size_bytes: int = 0, modified: str = "", summary: str = "", keywords: list[str] | None = None, ) -> None: kw_json = json.dumps(keywords or []) kw_text = " ".join(keywords or []) with _connect() as conn: conn.execute( """INSERT INTO memory_meta (path, category, size_bytes, modified, summary, keywords) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT(path) DO UPDATE SET category=excluded.category, size_bytes=excluded.size_bytes, modified=excluded.modified, summary=excluded.summary, keywords=excluded.keywords""", (path, category, size_bytes, modified, summary, kw_json), ) conn.execute("DELETE FROM memory_fts WHERE path = ?", (path,)) conn.execute( "INSERT INTO memory_fts (path, body, summary, keywords) VALUES (?, ?, ?, ?)", (path, content, summary, kw_text), ) def remove(path: str) -> None: with _connect() as conn: conn.execute("DELETE FROM memory_meta WHERE path = ?", (path,)) conn.execute("DELETE FROM memory_fts WHERE path = ?", (path,)) def search(query: str, limit: int = 20) -> list[dict]: """FTS5 full-text search; returns [{file, summary, keywords, snippet}].""" if not _DB_PATH.exists(): return [] try: with _connect() as conn: fts_rows = conn.execute( """SELECT path, snippet(memory_fts, 1, '[', ']', '…', 32) AS snip FROM memory_fts WHERE memory_fts MATCH ? ORDER BY rank LIMIT ?""", (query, limit), ).fetchall() if not fts_rows: return [] paths = [r["path"] for r in fts_rows] snippets = {r["path"]: r["snip"] for r in fts_rows} placeholders = ",".join("?" * len(paths)) meta_rows = conn.execute( f"SELECT path, summary, keywords FROM memory_meta WHERE path IN ({placeholders})", paths, ).fetchall() meta = { r["path"]: { "summary": r["summary"], "keywords": json.loads(r["keywords"] or "[]"), } for r in meta_rows } return [ { "file": p, "summary": meta.get(p, {}).get("summary", ""), "keywords": meta.get(p, {}).get("keywords", []), "snippet": snippets.get(p, ""), } for p in paths if p in meta ] except sqlite3.OperationalError: return [] def list_all() -> list[dict]: """Return all rows from memory_meta ordered by path.""" if not _DB_PATH.exists(): return [] with _connect() as conn: rows = conn.execute( "SELECT path, category, size_bytes, modified, summary, keywords " "FROM memory_meta ORDER BY path" ).fetchall() return [ { "path": r["path"], "category": r["category"], "size_bytes": r["size_bytes"], "modified": r["modified"], "summary": r["summary"], "keywords": json.loads(r["keywords"] or "[]"), } for r in rows ] def migrate_from_files() -> None: """Populate DB from existing .md files on first run; no-op if DB already has entries.""" if not _DB_PATH.exists(): return with _connect() as conn: count = conn.execute("SELECT COUNT(*) FROM memory_meta").fetchone()[0] if count > 0: return json_index_path = _MEMORY_ROOT / "memory_index.json" existing: dict = {} if json_index_path.exists(): try: existing = json.loads(json_index_path.read_text()) except (json.JSONDecodeError, OSError): pass for f in sorted(_MEMORY_ROOT.rglob("*.md")): if f.name in _EXCLUDED: continue rel = f.relative_to(_MEMORY_ROOT) rel_key = rel.as_posix() category = rel.parts[0] if len(rel.parts) > 1 else "root" stat = f.stat() mtime = datetime.datetime.fromtimestamp(stat.st_mtime).isoformat(timespec="seconds") prev = existing.get(rel_key, {}) try: content = f.read_text() except OSError: content = "" upsert( rel_key, content=content, category=category, size_bytes=stat.st_size, modified=mtime, summary=prev.get("summary", ""), keywords=prev.get("keywords", []), )