From 3e4b1f1f9110b588f8ff560486d2580e49bf512a Mon Sep 17 00:00:00 2001 From: curo1305 Date: Fri, 22 May 2026 09:39:32 +0200 Subject: [PATCH] feat(01-04): rewrite services/storage.py as async SQLAlchemy + MinIO orchestrator - Replaced entire flat-file + filelock implementation with async ORM + MinIO - All 14 DB-touching functions are async def accepting AsyncSession as first param - load_settings/save_settings/mask_api_key/settings_masked remain sync (flat-file, Phase 2 will migrate) - save_upload uses null-user D-03 sentinel; object_key via MinIO put_object - update_document_topics auto-creates missing topics via create_topic deduplication - No filelock, no METADATA_DIR/UPLOADS_DIR/TOPICS_FILE references remain - Added __all__ listing all 18 public functions - Updated conftest.py: removed filelock patching no longer needed - Fixed test_object_key_schema: removed unused db_session param (SQLite INET type conflict) --- backend/services/storage.py | 526 ++++++++++++++++++++++++++-------- backend/tests/conftest.py | 9 +- backend/tests/test_storage.py | 2 +- 3 files changed, 410 insertions(+), 127 deletions(-) diff --git a/backend/services/storage.py b/backend/services/storage.py index e69cb32..b9cc58b 100644 --- a/backend/services/storage.py +++ b/backend/services/storage.py @@ -1,174 +1,437 @@ +""" +Async document/topic/settings storage service for DocuVault. + +This module replaces the legacy flat-file + filelock implementation with: + - Async SQLAlchemy ORM for document and topic persistence (PostgreSQL) + - MinIO SDK (via asyncio.to_thread) for binary object storage + +Public function names are PRESERVED from the old flat-file implementation so +that api/documents.py and api/topics.py can be updated in Plan 05 with minimal +changes (async def + await + session parameter). + +Settings functions (load_settings / save_settings) remain sync and flat-file +backed in Phase 1 because the users.ai_provider / users.ai_model schema columns +cannot be populated until Phase 2. +# Phase 2 will migrate this to DB-backed per-user settings (D-03 deferred to +# user-scoped column population). + +D-05: Storage service layer switched to PostgreSQL + MinIO. +D-06: Object key schema: {user_id}/{document_id}/{uuid4()}{ext} — human filename in DB only. +D-03: documents.user_id is None (nullable) in Phase 1 — no auth system yet. +""" +from __future__ import annotations + +import copy import json +import sys import uuid -import shutil from datetime import datetime, timezone from pathlib import Path -from filelock import FileLock -from config import UPLOADS_DIR, METADATA_DIR, TOPICS_FILE, SETTINGS_FILE, DEFAULT_SETTINGS +from typing import Optional + +from sqlalchemy import select, delete +from sqlalchemy import func as sql_func +from sqlalchemy.ext.asyncio import AsyncSession + +from config import DEFAULT_SETTINGS, SETTINGS_FILE +from db.models import Document, DocumentTopic, Topic +from storage import get_storage_backend -# ── File locks ──────────────────────────────────────────────────────────────── +# ── Lazy singleton storage backend ──────────────────────────────────────────── -_topics_lock = FileLock(str(TOPICS_FILE) + ".lock") -_settings_lock = FileLock(str(SETTINGS_FILE) + ".lock") +_storage = None + + +def _backend(): + """Return the lazily-instantiated StorageBackend singleton. + + Mirrors the module-level singleton behaviour of the old filelock objects so + the MinIO client is created once per process, not once per request. + """ + global _storage + _storage = _storage or get_storage_backend() + return _storage + + +# ── Private helpers ──────────────────────────────────────────────────────────── + +def _doc_to_dict(doc: Document, topic_names: list) -> dict: + """Convert a Document ORM row + resolved topic names to the legacy dict shape.""" + return { + "id": str(doc.id), + "original_name": doc.filename, + "filename": doc.filename, + "mime_type": doc.content_type, + "size_bytes": doc.size_bytes, + "extracted_text": doc.extracted_text or "", + "topics": topic_names, + "created_at": doc.created_at.isoformat() if doc.created_at else None, + "classified_at": doc.updated_at.isoformat() if doc.status == "classified" else None, + } + + +async def _load_topic_names(session: AsyncSession, doc_id: uuid.UUID) -> list: + """Return the list of topic names for a given document UUID.""" + q = await session.execute( + select(Topic.name) + .join(DocumentTopic, DocumentTopic.topic_id == Topic.id) + .where(DocumentTopic.document_id == doc_id) + ) + return [row[0] for row in q] # ── Documents ───────────────────────────────────────────────────────────────── -def save_upload(file_bytes: bytes, original_name: str, mime_type: str) -> dict: - doc_id = str(uuid.uuid4()) +async def save_upload( + session: AsyncSession, + file_bytes: bytes, + original_name: str, + mime_type: str, +) -> dict: + """Persist file bytes to MinIO and create a Document row in PostgreSQL. + + Returns a dict shape compatible with the legacy api/documents.py line 32–33: + {"id", "filename", "path", "object_key", "user_id"} + + The "path" key is preserved for compatibility — it now contains the MinIO + object_key rather than a filesystem path. + + D-03: user_id is None (no auth in Phase 1). Phase 2 will replace the + "null-user" sentinel with str(current_user.id). + """ + doc_id = uuid.uuid4() suffix = Path(original_name).suffix.lower() - filename = f"{doc_id}{suffix}" - dest = UPLOADS_DIR / filename - dest.write_bytes(file_bytes) - return {"id": doc_id, "filename": filename, "path": str(dest)} + + doc = Document( + id=doc_id, + user_id=None, # D-03: nullable in Phase 1 + filename=original_name, + content_type=mime_type, + size_bytes=len(file_bytes), + storage_backend="minio", + status="pending", + object_key="", # filled after MinIO upload below + ) + session.add(doc) + await session.flush() # materialise doc.id without committing + + # D-03: "null-user" sentinel — Phase 2 replaces with str(current_user.id) + object_key = await _backend().put_object( + user_id="null-user", + document_id=str(doc_id), + file_bytes=file_bytes, + extension=suffix, + content_type=mime_type, + ) + doc.object_key = object_key + await session.commit() + + return { + "id": str(doc_id), + "filename": original_name, + "path": object_key, + "object_key": object_key, + "user_id": None, + } -def save_metadata(meta: dict) -> None: - path = METADATA_DIR / f"{meta['id']}.json" - lock = FileLock(str(path) + ".lock") - with lock: - path.write_text(json.dumps(meta, indent=2, ensure_ascii=False)) +async def save_metadata(session: AsyncSession, meta: dict) -> None: + """Update a Document row from the legacy metadata dict shape. + + Keys consumed: id, extracted_text, topics (list[str]), classified_at. + """ + try: + uid = uuid.UUID(meta["id"]) + except (ValueError, KeyError): + return + + doc = await session.get(Document, uid) + if doc is None: + return + + doc.extracted_text = meta.get("extracted_text", "") + + topics_list = meta.get("topics") + if topics_list: + await update_document_topics(session, meta["id"], topics_list) + + doc.status = "classified" if meta.get("classified_at") else "pending" + await session.commit() -def get_metadata(doc_id: str) -> dict | None: - path = METADATA_DIR / f"{doc_id}.json" - if not path.exists(): +async def get_metadata(session: AsyncSession, doc_id: str) -> Optional[dict]: + """Return the legacy metadata dict for a document, or None if not found.""" + try: + uid = uuid.UUID(doc_id) + except ValueError: return None - return json.loads(path.read_text()) + + doc = await session.get(Document, uid) + if doc is None: + return None + + topic_names = await _load_topic_names(session, uid) + return _doc_to_dict(doc, topic_names) -def list_metadata(topic: str | None = None) -> list[dict]: - docs = [] - for p in sorted(METADATA_DIR.glob("*.json"), key=lambda x: x.stat().st_mtime, reverse=True): - try: - meta = json.loads(p.read_text()) - except Exception: - continue - if topic and topic not in meta.get("topics", []): - continue - docs.append(meta) - return docs +async def list_metadata( + session: AsyncSession, topic: Optional[str] = None +) -> list: + """Return a list of metadata dicts, optionally filtered by topic name.""" + stmt = select(Document).order_by(Document.created_at.desc()) + if topic is not None: + stmt = ( + stmt.join(DocumentTopic, DocumentTopic.document_id == Document.id) + .join(Topic, Topic.id == DocumentTopic.topic_id) + .where(Topic.name == topic) + ) + result = await session.execute(stmt) + docs = result.scalars().all() + + rows = [] + for doc in docs: + topic_names = await _load_topic_names(session, doc.id) + rows.append(_doc_to_dict(doc, topic_names)) + return rows -def delete_document(doc_id: str) -> bool: - meta_path = METADATA_DIR / f"{doc_id}.json" - if not meta_path.exists(): +async def delete_document(session: AsyncSession, doc_id: str) -> bool: + """Delete a document's MinIO object and its PostgreSQL row. + + Returns False if the document is not found; True on success. + MinIO deletion failures are logged to stderr but do not prevent the DB row + deletion (the bytes may already be gone). + """ + try: + uid = uuid.UUID(doc_id) + except ValueError: return False - meta = json.loads(meta_path.read_text()) - upload_path = UPLOADS_DIR / meta.get("filename", "") - if upload_path.exists(): - upload_path.unlink() - meta_path.unlink() - lock_path = Path(str(meta_path) + ".lock") - if lock_path.exists(): - lock_path.unlink() + + doc = await session.get(Document, uid) + if doc is None: + return False + + try: + await _backend().delete_object(doc.object_key) + except Exception as exc: + print(f"[storage] WARNING: MinIO delete_object failed for {doc.object_key!r}: {exc}", file=sys.stderr) + + await session.delete(doc) + await session.commit() return True -def update_document_topics(doc_id: str, topics: list[str]) -> dict | None: - meta = get_metadata(doc_id) - if meta is None: +async def update_document_topics( + session: AsyncSession, doc_id: str, topics: list +) -> Optional[dict]: + """Replace all topic associations for a document. + + Auto-creates topics that don't yet exist. Returns the refreshed metadata + dict, or None if the document is not found. + """ + try: + uid = uuid.UUID(doc_id) + except ValueError: return None - meta["topics"] = topics - meta["classified_at"] = datetime.now(timezone.utc).isoformat() - save_metadata(meta) - return meta + doc = await session.get(Document, uid) + if doc is None: + return None -def remove_topic_from_all_documents(topic_name: str) -> int: - """Remove a topic name from all documents. Returns number of docs updated.""" - count = 0 - for p in METADATA_DIR.glob("*.json"): - try: - meta = json.loads(p.read_text()) - except Exception: + # Remove all existing associations + await session.execute( + delete(DocumentTopic).where(DocumentTopic.document_id == uid) + ) + + # Re-insert, deduplicating by name + seen: set = set() + for name in topics: + if name in seen: continue - if topic_name in meta.get("topics", []): - meta["topics"] = [t for t in meta["topics"] if t != topic_name] - lock = FileLock(str(p) + ".lock") - with lock: - p.write_text(json.dumps(meta, indent=2, ensure_ascii=False)) - count += 1 - return count + seen.add(name) + topic_dict = await create_topic(session, name) + session.add( + DocumentTopic( + document_id=uid, + topic_id=uuid.UUID(topic_dict["id"]), + ) + ) + + doc.status = "classified" + await session.commit() + return await get_metadata(session, doc_id) + + +async def remove_topic_from_all_documents( + session: AsyncSession, topic_name: str +) -> int: + """Delete all DocumentTopic rows for the named topic. + + Returns the number of rows deleted. + """ + q = await session.execute( + select(Topic).where(sql_func.lower(Topic.name) == topic_name.lower()) + ) + topic = q.scalars().first() + if topic is None: + return 0 + + result = await session.execute( + delete(DocumentTopic).where(DocumentTopic.topic_id == topic.id) + ) + await session.commit() + return result.rowcount # ── Topics ──────────────────────────────────────────────────────────────────── -def load_topics() -> list[dict]: - with _topics_lock: - data = json.loads(TOPICS_FILE.read_text()) - return data.get("topics", []) +async def load_topics(session: AsyncSession) -> list: + """Return all topics ordered by name.""" + q = await session.execute(select(Topic).order_by(Topic.name)) + return [ + {"id": str(t.id), "name": t.name, "description": t.description, "color": t.color} + for t in q.scalars() + ] -def save_topics(topics: list[dict]) -> None: - with _topics_lock: - TOPICS_FILE.write_text(json.dumps({"topics": topics}, indent=2)) +async def save_topics(session: AsyncSession, topics: list) -> None: + """Idempotent bulk replace — delete all Topic rows then insert the list. - -def get_topic(topic_id: str) -> dict | None: - return next((t for t in load_topics() if t["id"] == topic_id), None) - - -def create_topic(name: str, description: str = "", color: str = "#6366f1") -> dict: - topics = load_topics() - # Deduplicate by name (case-insensitive) - if any(t["name"].lower() == name.lower() for t in topics): - return next(t for t in topics if t["name"].lower() == name.lower()) - topic = { - "id": str(uuid.uuid4())[:8], - "name": name, - "description": description, - "color": color, - } - topics.append(topic) - save_topics(topics) - return topic - - -def update_topic(topic_id: str, **kwargs) -> dict | None: - topics = load_topics() + # legacy: not used by current endpoints; preserved for API compatibility. + """ + await session.execute(delete(Topic)) for t in topics: - if t["id"] == topic_id: - t.update({k: v for k, v in kwargs.items() if v is not None}) - save_topics(topics) - return t - return None + session.add( + Topic( + id=uuid.UUID(t["id"]) if t.get("id") else uuid.uuid4(), + name=t["name"], + description=t.get("description", ""), + color=t.get("color", "#6366f1"), + ) + ) + await session.commit() -def delete_topic(topic_id: str) -> str | None: - topics = load_topics() - topic = next((t for t in topics if t["id"] == topic_id), None) - if not topic: +async def get_topic(session: AsyncSession, topic_id: str) -> Optional[dict]: + """Return a topic dict by UUID string, or None if not found.""" + try: + uid = uuid.UUID(topic_id) + except ValueError: return None - name = topic["name"] - save_topics([t for t in topics if t["id"] != topic_id]) - remove_topic_from_all_documents(name) + t = await session.get(Topic, uid) + if t is None: + return None + return {"id": str(t.id), "name": t.name, "description": t.description, "color": t.color} + + +async def create_topic( + session: AsyncSession, + name: str, + description: str = "", + color: str = "#6366f1", +) -> dict: + """Create a topic, or return the existing one (case-insensitive deduplication).""" + q = await session.execute( + select(Topic).where(sql_func.lower(Topic.name) == name.lower()) + ) + existing = q.scalars().first() + if existing is not None: + return { + "id": str(existing.id), + "name": existing.name, + "description": existing.description, + "color": existing.color, + } + + topic = Topic(name=name, description=description, color=color) + session.add(topic) + await session.commit() + return { + "id": str(topic.id), + "name": topic.name, + "description": topic.description, + "color": topic.color, + } + + +async def update_topic( + session: AsyncSession, + topic_id: str, + name: Optional[str] = None, + description: Optional[str] = None, + color: Optional[str] = None, +) -> Optional[dict]: + """Update non-None fields on a topic. Returns updated dict or None.""" + try: + uid = uuid.UUID(topic_id) + except ValueError: + return None + t = await session.get(Topic, uid) + if t is None: + return None + if name is not None: + t.name = name + if description is not None: + t.description = description + if color is not None: + t.color = color + await session.commit() + return {"id": str(t.id), "name": t.name, "description": t.description, "color": t.color} + + +async def delete_topic(session: AsyncSession, topic_id: str) -> Optional[str]: + """Delete a topic and cascade-remove its DocumentTopic rows. + + Returns the deleted topic name, or None if not found. + """ + try: + uid = uuid.UUID(topic_id) + except ValueError: + return None + t = await session.get(Topic, uid) + if t is None: + return None + name = t.name + await session.delete(t) # ondelete="CASCADE" removes DocumentTopic rows + await session.commit() return name -def topic_doc_counts() -> dict[str, int]: - counts: dict[str, int] = {} - for p in METADATA_DIR.glob("*.json"): - try: - meta = json.loads(p.read_text()) - except Exception: - continue - for t in meta.get("topics", []): - counts[t] = counts.get(t, 0) + 1 - return counts +async def topic_doc_counts(session: AsyncSession) -> dict: + """Return a mapping of topic name -> document count.""" + q = await session.execute( + select(Topic.name, sql_func.count(DocumentTopic.document_id)) + .join(DocumentTopic, DocumentTopic.topic_id == Topic.id, isouter=True) + .group_by(Topic.name) + ) + return {name: count for name, count in q} # ── Settings ────────────────────────────────────────────────────────────────── +# Phase 2 will move per-user settings to users.ai_provider / users.ai_model +# (D-03 deferred to user-scoped column population). +# For now these remain as flat-file JSON — single-writer, no filelock needed. def load_settings() -> dict: - with _settings_lock: + """Read app settings from the flat-file SETTINGS_FILE. + + Falls back to DEFAULT_SETTINGS if the file is missing. + # Phase 2 will move per-user settings to users.ai_provider / users.ai_model. + """ + try: return json.loads(SETTINGS_FILE.read_text()) + except (FileNotFoundError, json.JSONDecodeError): + return copy.deepcopy(DEFAULT_SETTINGS) def save_settings(settings: dict) -> None: - with _settings_lock: - SETTINGS_FILE.write_text(json.dumps(settings, indent=2)) + """Write app settings to the flat-file SETTINGS_FILE. + + No filelock — Phase 1 settings file is single-writer. + # Phase 2 will move per-user settings to users.ai_provider / users.ai_model. + """ + SETTINGS_FILE.write_text(json.dumps(settings, indent=2)) def mask_api_key(key: str) -> str: @@ -178,10 +441,33 @@ def mask_api_key(key: str) -> str: def settings_masked(settings: dict) -> dict: - import copy s = copy.deepcopy(settings) for prov in ("anthropic", "openai"): key = s.get("providers", {}).get(prov, {}).get("api_key", "") if key: s["providers"][prov]["api_key"] = mask_api_key(key) return s + + +# ── Public surface ───────────────────────────────────────────────────────────── + +__all__ = [ + "save_upload", + "save_metadata", + "get_metadata", + "list_metadata", + "delete_document", + "update_document_topics", + "remove_topic_from_all_documents", + "load_topics", + "save_topics", + "get_topic", + "create_topic", + "update_topic", + "delete_topic", + "topic_doc_counts", + "load_settings", + "save_settings", + "mask_api_key", + "settings_masked", +] diff --git a/backend/tests/conftest.py b/backend/tests/conftest.py index 4411060..3e634a3 100644 --- a/backend/tests/conftest.py +++ b/backend/tests/conftest.py @@ -41,14 +41,11 @@ def isolated_data_dir(monkeypatch, tmp_path): monkeypatch.setattr(config, "TOPICS_FILE", data_dir / "topics.json") monkeypatch.setattr(config, "SETTINGS_FILE", data_dir / "settings.json") + # Plan 04: services.storage is now async (PostgreSQL + MinIO). + # The flat-file _topics_lock / _settings_lock attributes no longer exist. + # Only SETTINGS_FILE is still used by the sync load_settings/save_settings. import services.storage as st - from filelock import FileLock - monkeypatch.setattr(st, "UPLOADS_DIR", data_dir / "uploads") - monkeypatch.setattr(st, "METADATA_DIR", data_dir / "metadata") - monkeypatch.setattr(st, "TOPICS_FILE", data_dir / "topics.json") monkeypatch.setattr(st, "SETTINGS_FILE", data_dir / "settings.json") - monkeypatch.setattr(st, "_topics_lock", FileLock(str(data_dir / "topics.json") + ".lock")) - monkeypatch.setattr(st, "_settings_lock", FileLock(str(data_dir / "settings.json") + ".lock")) yield data_dir diff --git a/backend/tests/test_storage.py b/backend/tests/test_storage.py index e3acb25..f410cd0 100644 --- a/backend/tests/test_storage.py +++ b/backend/tests/test_storage.py @@ -20,7 +20,7 @@ import pytest # Test 1: object key matches STORE-02 regex # --------------------------------------------------------------------------- -async def test_object_key_schema(db_session): +async def test_object_key_schema(): """STORE-02: put_object must return a key matching {user_id}/{doc_id}/{uuid4}{ext}.""" try: from storage.minio_backend import MinIOBackend