Files
kite/backend/services/storage.py
T
curo1305 95c7ed786a feat(06.2-03): backend — cloud-aware delete routing + skip_quota + remove_only param
- storage.delete_document gains skip_quota=False param; quota decrement gated on it
- DELETE /api/documents/{id} gains remove_only=bool query param
- Cloud docs (storage_backend != minio): attempt cloud backend delete_object first
  - On failure: return HTTP 200 {success: false, cloud_delete_failed: true} (not 4xx)
  - On success or remove_only: delete DB row with skip_quota=True
- Cloud creds/exception message never included in response body (T-06.2-03-02)
- Promote 3 xfail stubs to real tests (propagates, failure, remove_only)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-31 15:09:44 +02:00

446 lines
15 KiB
Python

"""
Async document/topic/settings storage service for DocuVault.
This module replaces the legacy flat-file + filelock implementation with:
- Async SQLAlchemy ORM for document and topic persistence (PostgreSQL)
- MinIO SDK (via asyncio.to_thread) for binary object storage
Public function names are PRESERVED from the old flat-file implementation so
that api/documents.py and api/topics.py can be updated in Plan 05 with minimal
changes (async def + await + session parameter).
Phase 3 D-12: load_settings / save_settings / mask_api_key / settings_masked removed.
All AI config comes from DB (users.ai_provider / users.ai_model set by admin).
D-05: Storage service layer switched to PostgreSQL + MinIO.
D-06: Object key schema: {user_id}/{document_id}/{uuid4()}{ext} — human filename in DB only.
D-03: documents.user_id is None (nullable) in Phase 1 — no auth system yet.
"""
from __future__ import annotations
import sys
import uuid
from datetime import datetime, timezone
from typing import Optional
from sqlalchemy import select, delete, text, or_
from sqlalchemy import func as sql_func
from sqlalchemy.ext.asyncio import AsyncSession
from db.models import Document, DocumentTopic, Topic
from storage import get_storage_backend
# ── Lazy singleton storage backend ────────────────────────────────────────────
_storage = None
def _backend():
"""Return the lazily-instantiated StorageBackend singleton.
Mirrors the module-level singleton behaviour of the old filelock objects so
the MinIO client is created once per process, not once per request.
"""
global _storage
_storage = _storage or get_storage_backend()
return _storage
# ── Private helpers ────────────────────────────────────────────────────────────
def _doc_to_dict(doc: Document, topic_names: list) -> dict:
"""Convert a Document ORM row + resolved topic names to the legacy dict shape."""
return {
"id": str(doc.id),
"original_name": doc.filename,
"filename": doc.filename,
"mime_type": doc.content_type,
"size_bytes": doc.size_bytes,
"extracted_text": doc.extracted_text or "",
"topics": topic_names,
"created_at": doc.created_at.isoformat() if doc.created_at else None,
"classified_at": doc.updated_at.isoformat() if doc.status == "classified" else None,
}
async def _load_topic_names(session: AsyncSession, doc_id: uuid.UUID) -> list:
"""Return the list of topic names for a given document UUID."""
q = await session.execute(
select(Topic.name)
.join(DocumentTopic, DocumentTopic.topic_id == Topic.id)
.where(DocumentTopic.document_id == doc_id)
)
return [row[0] for row in q]
# ── Documents ─────────────────────────────────────────────────────────────────
async def save_metadata(session: AsyncSession, meta: dict) -> None:
"""Update a Document row from the legacy metadata dict shape.
Keys consumed: id, extracted_text, topics (list[str]), classified_at.
"""
try:
uid = uuid.UUID(meta["id"])
except (ValueError, KeyError):
return
doc = await session.get(Document, uid)
if doc is None:
return
doc.extracted_text = meta.get("extracted_text", "")
topics_list = meta.get("topics")
if topics_list:
await update_document_topics(session, meta["id"], topics_list)
doc.status = "classified" if meta.get("classified_at") else "pending"
await session.commit()
async def get_metadata(session: AsyncSession, doc_id: str) -> Optional[dict]:
"""Return the legacy metadata dict for a document, or None if not found."""
try:
uid = uuid.UUID(doc_id)
except ValueError:
return None
doc = await session.get(Document, uid)
if doc is None:
return None
topic_names = await _load_topic_names(session, uid)
return _doc_to_dict(doc, topic_names)
async def list_metadata(
session: AsyncSession, user_id: uuid.UUID, topic: Optional[str] = None
) -> list:
"""Return a list of metadata dicts for a specific user, optionally filtered by topic name.
D-16: always filters by user_id — a user can only see their own documents.
"""
stmt = select(Document).where(Document.user_id == user_id).order_by(Document.created_at.desc())
if topic is not None:
stmt = (
stmt.join(DocumentTopic, DocumentTopic.document_id == Document.id)
.join(Topic, Topic.id == DocumentTopic.topic_id)
.where(Topic.name == topic)
)
result = await session.execute(stmt)
docs = result.scalars().all()
rows = []
for doc in docs:
topic_names = await _load_topic_names(session, doc.id)
rows.append(_doc_to_dict(doc, topic_names))
return rows
async def delete_document(session: AsyncSession, doc_id: str, skip_quota: bool = False) -> bool:
"""Delete a document's MinIO object and its PostgreSQL row.
Returns False if the document is not found; True on success.
MinIO deletion failures are logged to stderr but do not prevent the DB row
deletion (the bytes may already be gone).
skip_quota=True skips the quota decrement — used for cloud-stored documents
that were never charged against the user's MinIO quota (T-06.2-03-01).
"""
try:
uid = uuid.UUID(doc_id)
except ValueError:
return False
doc = await session.get(Document, uid)
if doc is None:
return False
try:
await _backend().delete_object(doc.object_key)
except Exception as exc:
print(f"[storage] WARNING: MinIO delete_object failed for {doc.object_key!r}: {exc}", file=sys.stderr)
if not skip_quota:
# Atomic quota decrement (STORE-06, D-07).
# user_id is always set post-migration (Plan 03-03+) — guard removed.
# Use CASE WHEN instead of GREATEST() for SQLite compatibility
# (PostgreSQL supports both; SQLite lacks the GREATEST scalar function).
await session.execute(
text(
"UPDATE quotas "
"SET used_bytes = CASE WHEN used_bytes > :delta THEN used_bytes - :delta ELSE 0 END "
"WHERE user_id = :uid"
),
{"delta": doc.size_bytes, "uid": str(doc.user_id)},
)
await session.delete(doc)
await session.commit()
return True
async def update_document_topics(
session: AsyncSession, doc_id: str, topics: list
) -> Optional[dict]:
"""Replace all topic associations for a document.
Auto-creates topics that don't yet exist. Returns the refreshed metadata
dict, or None if the document is not found.
"""
try:
uid = uuid.UUID(doc_id)
except ValueError:
return None
doc = await session.get(Document, uid)
if doc is None:
return None
# Remove all existing associations
await session.execute(
delete(DocumentTopic).where(DocumentTopic.document_id == uid)
)
# Re-insert, deduplicating by name
seen: set = set()
for name in topics:
if name in seen:
continue
seen.add(name)
topic_dict = await create_topic(session, name)
session.add(
DocumentTopic(
document_id=uid,
topic_id=uuid.UUID(topic_dict["id"]),
)
)
doc.status = "classified"
await session.commit()
return await get_metadata(session, doc_id)
async def remove_topic_from_all_documents(
session: AsyncSession, topic_name: str
) -> int:
"""Delete all DocumentTopic rows for the named topic.
Returns the number of rows deleted.
"""
q = await session.execute(
select(Topic).where(sql_func.lower(Topic.name) == topic_name.lower())
)
topic = q.scalars().first()
if topic is None:
return 0
result = await session.execute(
delete(DocumentTopic).where(DocumentTopic.topic_id == topic.id)
)
await session.commit()
return result.rowcount
# ── Topics ────────────────────────────────────────────────────────────────────
async def load_topics(session: AsyncSession) -> list:
"""Return all topics ordered by name."""
q = await session.execute(select(Topic).order_by(Topic.name))
return [
{"id": str(t.id), "name": t.name, "description": t.description, "color": t.color}
for t in q.scalars()
]
async def load_topics_for_user(session: AsyncSession, user_id: uuid.UUID) -> list:
"""Return system topics (user_id IS NULL) + the user's own topics, ordered by name.
D-08 + D-17 + DOC-04: layered topic namespace. System topics are visible to all
users; per-user topics are visible only to their owner. A user's topic list is
the union of both sets.
"""
q = await session.execute(
select(Topic).where(
or_(Topic.user_id == user_id, Topic.user_id.is_(None))
).order_by(Topic.name)
)
return [
{"id": str(t.id), "name": t.name, "description": t.description, "color": t.color}
for t in q.scalars()
]
async def save_topics(session: AsyncSession, topics: list) -> None:
"""Idempotent bulk replace — delete all Topic rows then insert the list.
# legacy: not used by current endpoints; preserved for API compatibility.
"""
await session.execute(delete(Topic))
for t in topics:
session.add(
Topic(
id=uuid.UUID(t["id"]) if t.get("id") else uuid.uuid4(),
name=t["name"],
description=t.get("description", ""),
color=t.get("color", "#6366f1"),
)
)
await session.commit()
async def get_topic(session: AsyncSession, topic_id: str) -> Optional[dict]:
"""Return a topic dict by UUID string, or None if not found."""
try:
uid = uuid.UUID(topic_id)
except ValueError:
return None
t = await session.get(Topic, uid)
if t is None:
return None
return {"id": str(t.id), "name": t.name, "description": t.description, "color": t.color}
async def create_topic(
session: AsyncSession,
name: str,
description: str = "",
color: str = "#6366f1",
user_id: Optional[uuid.UUID] = None,
) -> dict:
"""Create a topic, or return the existing one (case-insensitive, namespace-scoped dedup).
D-08: user_id=None creates a system topic (visible to all users).
D-08: user_id=<uuid> creates a per-user topic (visible only to that user).
Deduplication is scoped by user_id namespace:
- System topics (user_id=None) dedup against other system topics only
- Per-user topics dedup within that user's namespace only
This allows "Finance" to exist as both a system topic and a per-user topic.
SQLite note: Uses a branching approach instead of IS NOT DISTINCT FROM
(SQLite doesn't support that PostgreSQL construct for NULL comparison).
"""
if user_id is None:
q = await session.execute(
select(Topic).where(
sql_func.lower(Topic.name) == name.lower(),
Topic.user_id.is_(None),
)
)
else:
q = await session.execute(
select(Topic).where(
sql_func.lower(Topic.name) == name.lower(),
Topic.user_id == user_id,
)
)
existing = q.scalars().first()
if existing is not None:
return {
"id": str(existing.id),
"name": existing.name,
"description": existing.description,
"color": existing.color,
}
topic = Topic(name=name, description=description, color=color, user_id=user_id)
session.add(topic)
await session.commit()
return {
"id": str(topic.id),
"name": topic.name,
"description": topic.description,
"color": topic.color,
}
async def update_topic(
session: AsyncSession,
topic_id: str,
name: Optional[str] = None,
description: Optional[str] = None,
color: Optional[str] = None,
) -> Optional[dict]:
"""Update non-None fields on a topic. Returns updated dict or None."""
try:
uid = uuid.UUID(topic_id)
except ValueError:
return None
t = await session.get(Topic, uid)
if t is None:
return None
if name is not None:
t.name = name
if description is not None:
t.description = description
if color is not None:
t.color = color
await session.commit()
return {"id": str(t.id), "name": t.name, "description": t.description, "color": t.color}
async def delete_topic(session: AsyncSession, topic_id: str) -> Optional[str]:
"""Delete a topic and cascade-remove its DocumentTopic rows.
Returns the deleted topic name, or None if not found.
"""
try:
uid = uuid.UUID(topic_id)
except ValueError:
return None
t = await session.get(Topic, uid)
if t is None:
return None
name = t.name
await session.delete(t) # ondelete="CASCADE" removes DocumentTopic rows
await session.commit()
return name
async def topic_doc_counts(
session: AsyncSession, user_id: Optional[uuid.UUID] = None
) -> dict:
"""Return a mapping of topic name -> document count.
If user_id is provided, counts only documents belonging to that user.
This ensures a user sees the count of their own documents for each topic,
not the global count across all users.
"""
stmt = (
select(Topic.name, sql_func.count(DocumentTopic.document_id))
.join(DocumentTopic, DocumentTopic.topic_id == Topic.id, isouter=True)
)
if user_id is not None:
stmt = stmt.join(
Document, Document.id == DocumentTopic.document_id, isouter=True
).where(
or_(Document.user_id == user_id, Document.user_id.is_(None))
)
stmt = stmt.group_by(Topic.name)
q = await session.execute(stmt)
return {name: count for name, count in q}
# ── Public surface ─────────────────────────────────────────────────────────────
__all__ = [
"save_metadata",
"get_metadata",
"list_metadata",
"delete_document",
"update_document_topics",
"remove_topic_from_all_documents",
"load_topics",
"load_topics_for_user",
"save_topics",
"get_topic",
"create_topic",
"update_topic",
"delete_topic",
"topic_doc_counts",
]