""" Async document/topic/settings storage service for DocuVault. This module replaces the legacy flat-file + filelock implementation with: - Async SQLAlchemy ORM for document and topic persistence (PostgreSQL) - MinIO SDK (via asyncio.to_thread) for binary object storage Public function names are PRESERVED from the old flat-file implementation so that api/documents.py and api/topics.py can be updated in Plan 05 with minimal changes (async def + await + session parameter). Phase 3 D-12: load_settings / save_settings / mask_api_key / settings_masked removed. All AI config comes from DB (users.ai_provider / users.ai_model set by admin). D-05: Storage service layer switched to PostgreSQL + MinIO. D-06: Object key schema: {user_id}/{document_id}/{uuid4()}{ext} — human filename in DB only. D-03: documents.user_id is None (nullable) in Phase 1 — no auth system yet. """ from __future__ import annotations import sys import uuid from datetime import datetime, timezone from typing import Optional from sqlalchemy import select, delete, text, or_ from sqlalchemy import func as sql_func from sqlalchemy.ext.asyncio import AsyncSession from db.models import Document, DocumentTopic, Topic from storage import get_storage_backend # ── Lazy singleton storage backend ──────────────────────────────────────────── _storage = None def _backend(): """Return the lazily-instantiated StorageBackend singleton. Mirrors the module-level singleton behaviour of the old filelock objects so the MinIO client is created once per process, not once per request. """ global _storage _storage = _storage or get_storage_backend() return _storage # ── Private helpers ──────────────────────────────────────────────────────────── def _doc_to_dict(doc: Document, topic_names: list) -> dict: """Convert a Document ORM row + resolved topic names to the legacy dict shape.""" return { "id": str(doc.id), "original_name": doc.filename, "filename": doc.filename, "mime_type": doc.content_type, "size_bytes": doc.size_bytes, "extracted_text": doc.extracted_text or "", "topics": topic_names, "created_at": doc.created_at.isoformat() if doc.created_at else None, "classified_at": doc.updated_at.isoformat() if doc.status == "classified" else None, } async def _load_topic_names(session: AsyncSession, doc_id: uuid.UUID) -> list: """Return the list of topic names for a given document UUID.""" q = await session.execute( select(Topic.name) .join(DocumentTopic, DocumentTopic.topic_id == Topic.id) .where(DocumentTopic.document_id == doc_id) ) return [row[0] for row in q] # ── Documents ───────────────────────────────────────────────────────────────── async def save_metadata(session: AsyncSession, meta: dict) -> None: """Update a Document row from the legacy metadata dict shape. Keys consumed: id, extracted_text, topics (list[str]), classified_at. """ try: uid = uuid.UUID(meta["id"]) except (ValueError, KeyError): return doc = await session.get(Document, uid) if doc is None: return doc.extracted_text = meta.get("extracted_text", "") topics_list = meta.get("topics") if topics_list: await update_document_topics(session, meta["id"], topics_list) doc.status = "classified" if meta.get("classified_at") else "pending" await session.commit() async def get_metadata(session: AsyncSession, doc_id: str) -> Optional[dict]: """Return the legacy metadata dict for a document, or None if not found.""" try: uid = uuid.UUID(doc_id) except ValueError: return None doc = await session.get(Document, uid) if doc is None: return None topic_names = await _load_topic_names(session, uid) return _doc_to_dict(doc, topic_names) async def list_metadata( session: AsyncSession, user_id: uuid.UUID, topic: Optional[str] = None ) -> list: """Return a list of metadata dicts for a specific user, optionally filtered by topic name. D-16: always filters by user_id — a user can only see their own documents. """ stmt = select(Document).where(Document.user_id == user_id).order_by(Document.created_at.desc()) if topic is not None: stmt = ( stmt.join(DocumentTopic, DocumentTopic.document_id == Document.id) .join(Topic, Topic.id == DocumentTopic.topic_id) .where(Topic.name == topic) ) result = await session.execute(stmt) docs = result.scalars().all() rows = [] for doc in docs: topic_names = await _load_topic_names(session, doc.id) rows.append(_doc_to_dict(doc, topic_names)) return rows async def delete_document(session: AsyncSession, doc_id: str) -> bool: """Delete a document's MinIO object and its PostgreSQL row. Returns False if the document is not found; True on success. MinIO deletion failures are logged to stderr but do not prevent the DB row deletion (the bytes may already be gone). """ try: uid = uuid.UUID(doc_id) except ValueError: return False doc = await session.get(Document, uid) if doc is None: return False try: await _backend().delete_object(doc.object_key) except Exception as exc: print(f"[storage] WARNING: MinIO delete_object failed for {doc.object_key!r}: {exc}", file=sys.stderr) # Atomic quota decrement (STORE-06, D-07). # user_id is always set post-migration (Plan 03-03+) — guard removed. # Use CASE WHEN instead of GREATEST() for SQLite compatibility # (PostgreSQL supports both; SQLite lacks the GREATEST scalar function). await session.execute( text( "UPDATE quotas " "SET used_bytes = CASE WHEN used_bytes > :delta THEN used_bytes - :delta ELSE 0 END " "WHERE user_id = :uid" ), {"delta": doc.size_bytes, "uid": str(doc.user_id)}, ) await session.delete(doc) await session.commit() return True async def update_document_topics( session: AsyncSession, doc_id: str, topics: list ) -> Optional[dict]: """Replace all topic associations for a document. Auto-creates topics that don't yet exist. Returns the refreshed metadata dict, or None if the document is not found. """ try: uid = uuid.UUID(doc_id) except ValueError: return None doc = await session.get(Document, uid) if doc is None: return None # Remove all existing associations await session.execute( delete(DocumentTopic).where(DocumentTopic.document_id == uid) ) # Re-insert, deduplicating by name seen: set = set() for name in topics: if name in seen: continue seen.add(name) topic_dict = await create_topic(session, name) session.add( DocumentTopic( document_id=uid, topic_id=uuid.UUID(topic_dict["id"]), ) ) doc.status = "classified" await session.commit() return await get_metadata(session, doc_id) async def remove_topic_from_all_documents( session: AsyncSession, topic_name: str ) -> int: """Delete all DocumentTopic rows for the named topic. Returns the number of rows deleted. """ q = await session.execute( select(Topic).where(sql_func.lower(Topic.name) == topic_name.lower()) ) topic = q.scalars().first() if topic is None: return 0 result = await session.execute( delete(DocumentTopic).where(DocumentTopic.topic_id == topic.id) ) await session.commit() return result.rowcount # ── Topics ──────────────────────────────────────────────────────────────────── async def load_topics(session: AsyncSession) -> list: """Return all topics ordered by name.""" q = await session.execute(select(Topic).order_by(Topic.name)) return [ {"id": str(t.id), "name": t.name, "description": t.description, "color": t.color} for t in q.scalars() ] async def load_topics_for_user(session: AsyncSession, user_id: uuid.UUID) -> list: """Return system topics (user_id IS NULL) + the user's own topics, ordered by name. D-08 + D-17 + DOC-04: layered topic namespace. System topics are visible to all users; per-user topics are visible only to their owner. A user's topic list is the union of both sets. """ q = await session.execute( select(Topic).where( or_(Topic.user_id == user_id, Topic.user_id.is_(None)) ).order_by(Topic.name) ) return [ {"id": str(t.id), "name": t.name, "description": t.description, "color": t.color} for t in q.scalars() ] async def save_topics(session: AsyncSession, topics: list) -> None: """Idempotent bulk replace — delete all Topic rows then insert the list. # legacy: not used by current endpoints; preserved for API compatibility. """ await session.execute(delete(Topic)) for t in topics: session.add( Topic( id=uuid.UUID(t["id"]) if t.get("id") else uuid.uuid4(), name=t["name"], description=t.get("description", ""), color=t.get("color", "#6366f1"), ) ) await session.commit() async def get_topic(session: AsyncSession, topic_id: str) -> Optional[dict]: """Return a topic dict by UUID string, or None if not found.""" try: uid = uuid.UUID(topic_id) except ValueError: return None t = await session.get(Topic, uid) if t is None: return None return {"id": str(t.id), "name": t.name, "description": t.description, "color": t.color} async def create_topic( session: AsyncSession, name: str, description: str = "", color: str = "#6366f1", user_id: Optional[uuid.UUID] = None, ) -> dict: """Create a topic, or return the existing one (case-insensitive, namespace-scoped dedup). D-08: user_id=None creates a system topic (visible to all users). D-08: user_id= creates a per-user topic (visible only to that user). Deduplication is scoped by user_id namespace: - System topics (user_id=None) dedup against other system topics only - Per-user topics dedup within that user's namespace only This allows "Finance" to exist as both a system topic and a per-user topic. SQLite note: Uses a branching approach instead of IS NOT DISTINCT FROM (SQLite doesn't support that PostgreSQL construct for NULL comparison). """ if user_id is None: q = await session.execute( select(Topic).where( sql_func.lower(Topic.name) == name.lower(), Topic.user_id.is_(None), ) ) else: q = await session.execute( select(Topic).where( sql_func.lower(Topic.name) == name.lower(), Topic.user_id == user_id, ) ) existing = q.scalars().first() if existing is not None: return { "id": str(existing.id), "name": existing.name, "description": existing.description, "color": existing.color, } topic = Topic(name=name, description=description, color=color, user_id=user_id) session.add(topic) await session.commit() return { "id": str(topic.id), "name": topic.name, "description": topic.description, "color": topic.color, } async def update_topic( session: AsyncSession, topic_id: str, name: Optional[str] = None, description: Optional[str] = None, color: Optional[str] = None, ) -> Optional[dict]: """Update non-None fields on a topic. Returns updated dict or None.""" try: uid = uuid.UUID(topic_id) except ValueError: return None t = await session.get(Topic, uid) if t is None: return None if name is not None: t.name = name if description is not None: t.description = description if color is not None: t.color = color await session.commit() return {"id": str(t.id), "name": t.name, "description": t.description, "color": t.color} async def delete_topic(session: AsyncSession, topic_id: str) -> Optional[str]: """Delete a topic and cascade-remove its DocumentTopic rows. Returns the deleted topic name, or None if not found. """ try: uid = uuid.UUID(topic_id) except ValueError: return None t = await session.get(Topic, uid) if t is None: return None name = t.name await session.delete(t) # ondelete="CASCADE" removes DocumentTopic rows await session.commit() return name async def topic_doc_counts( session: AsyncSession, user_id: Optional[uuid.UUID] = None ) -> dict: """Return a mapping of topic name -> document count. If user_id is provided, counts only documents belonging to that user. This ensures a user sees the count of their own documents for each topic, not the global count across all users. """ stmt = ( select(Topic.name, sql_func.count(DocumentTopic.document_id)) .join(DocumentTopic, DocumentTopic.topic_id == Topic.id, isouter=True) ) if user_id is not None: stmt = stmt.join( Document, Document.id == DocumentTopic.document_id, isouter=True ).where( or_(Document.user_id == user_id, Document.user_id.is_(None)) ) stmt = stmt.group_by(Topic.name) q = await session.execute(stmt) return {name: count for name, count in q} # ── Public surface ───────────────────────────────────────────────────────────── __all__ = [ "save_metadata", "get_metadata", "list_metadata", "delete_document", "update_document_topics", "remove_topic_from_all_documents", "load_topics", "load_topics_for_user", "save_topics", "get_topic", "create_topic", "update_topic", "delete_topic", "topic_doc_counts", ]