Files
kite/backend/services/storage.py
T
curo1305 3e4b1f1f91 feat(01-04): rewrite services/storage.py as async SQLAlchemy + MinIO orchestrator
- Replaced entire flat-file + filelock implementation with async ORM + MinIO
- All 14 DB-touching functions are async def accepting AsyncSession as first param
- load_settings/save_settings/mask_api_key/settings_masked remain sync (flat-file, Phase 2 will migrate)
- save_upload uses null-user D-03 sentinel; object_key via MinIO put_object
- update_document_topics auto-creates missing topics via create_topic deduplication
- No filelock, no METADATA_DIR/UPLOADS_DIR/TOPICS_FILE references remain
- Added __all__ listing all 18 public functions
- Updated conftest.py: removed filelock patching no longer needed
- Fixed test_object_key_schema: removed unused db_session param (SQLite INET type conflict)
2026-05-22 09:39:32 +02:00

474 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Async document/topic/settings storage service for DocuVault.
This module replaces the legacy flat-file + filelock implementation with:
- Async SQLAlchemy ORM for document and topic persistence (PostgreSQL)
- MinIO SDK (via asyncio.to_thread) for binary object storage
Public function names are PRESERVED from the old flat-file implementation so
that api/documents.py and api/topics.py can be updated in Plan 05 with minimal
changes (async def + await + session parameter).
Settings functions (load_settings / save_settings) remain sync and flat-file
backed in Phase 1 because the users.ai_provider / users.ai_model schema columns
cannot be populated until Phase 2.
# Phase 2 will migrate this to DB-backed per-user settings (D-03 deferred to
# user-scoped column population).
D-05: Storage service layer switched to PostgreSQL + MinIO.
D-06: Object key schema: {user_id}/{document_id}/{uuid4()}{ext} — human filename in DB only.
D-03: documents.user_id is None (nullable) in Phase 1 — no auth system yet.
"""
from __future__ import annotations
import copy
import json
import sys
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from sqlalchemy import select, delete
from sqlalchemy import func as sql_func
from sqlalchemy.ext.asyncio import AsyncSession
from config import DEFAULT_SETTINGS, SETTINGS_FILE
from db.models import Document, DocumentTopic, Topic
from storage import get_storage_backend
# ── Lazy singleton storage backend ────────────────────────────────────────────
_storage = None
def _backend():
"""Return the lazily-instantiated StorageBackend singleton.
Mirrors the module-level singleton behaviour of the old filelock objects so
the MinIO client is created once per process, not once per request.
"""
global _storage
_storage = _storage or get_storage_backend()
return _storage
# ── Private helpers ────────────────────────────────────────────────────────────
def _doc_to_dict(doc: Document, topic_names: list) -> dict:
"""Convert a Document ORM row + resolved topic names to the legacy dict shape."""
return {
"id": str(doc.id),
"original_name": doc.filename,
"filename": doc.filename,
"mime_type": doc.content_type,
"size_bytes": doc.size_bytes,
"extracted_text": doc.extracted_text or "",
"topics": topic_names,
"created_at": doc.created_at.isoformat() if doc.created_at else None,
"classified_at": doc.updated_at.isoformat() if doc.status == "classified" else None,
}
async def _load_topic_names(session: AsyncSession, doc_id: uuid.UUID) -> list:
"""Return the list of topic names for a given document UUID."""
q = await session.execute(
select(Topic.name)
.join(DocumentTopic, DocumentTopic.topic_id == Topic.id)
.where(DocumentTopic.document_id == doc_id)
)
return [row[0] for row in q]
# ── Documents ─────────────────────────────────────────────────────────────────
async def save_upload(
session: AsyncSession,
file_bytes: bytes,
original_name: str,
mime_type: str,
) -> dict:
"""Persist file bytes to MinIO and create a Document row in PostgreSQL.
Returns a dict shape compatible with the legacy api/documents.py line 3233:
{"id", "filename", "path", "object_key", "user_id"}
The "path" key is preserved for compatibility — it now contains the MinIO
object_key rather than a filesystem path.
D-03: user_id is None (no auth in Phase 1). Phase 2 will replace the
"null-user" sentinel with str(current_user.id).
"""
doc_id = uuid.uuid4()
suffix = Path(original_name).suffix.lower()
doc = Document(
id=doc_id,
user_id=None, # D-03: nullable in Phase 1
filename=original_name,
content_type=mime_type,
size_bytes=len(file_bytes),
storage_backend="minio",
status="pending",
object_key="", # filled after MinIO upload below
)
session.add(doc)
await session.flush() # materialise doc.id without committing
# D-03: "null-user" sentinel — Phase 2 replaces with str(current_user.id)
object_key = await _backend().put_object(
user_id="null-user",
document_id=str(doc_id),
file_bytes=file_bytes,
extension=suffix,
content_type=mime_type,
)
doc.object_key = object_key
await session.commit()
return {
"id": str(doc_id),
"filename": original_name,
"path": object_key,
"object_key": object_key,
"user_id": None,
}
async def save_metadata(session: AsyncSession, meta: dict) -> None:
"""Update a Document row from the legacy metadata dict shape.
Keys consumed: id, extracted_text, topics (list[str]), classified_at.
"""
try:
uid = uuid.UUID(meta["id"])
except (ValueError, KeyError):
return
doc = await session.get(Document, uid)
if doc is None:
return
doc.extracted_text = meta.get("extracted_text", "")
topics_list = meta.get("topics")
if topics_list:
await update_document_topics(session, meta["id"], topics_list)
doc.status = "classified" if meta.get("classified_at") else "pending"
await session.commit()
async def get_metadata(session: AsyncSession, doc_id: str) -> Optional[dict]:
"""Return the legacy metadata dict for a document, or None if not found."""
try:
uid = uuid.UUID(doc_id)
except ValueError:
return None
doc = await session.get(Document, uid)
if doc is None:
return None
topic_names = await _load_topic_names(session, uid)
return _doc_to_dict(doc, topic_names)
async def list_metadata(
session: AsyncSession, topic: Optional[str] = None
) -> list:
"""Return a list of metadata dicts, optionally filtered by topic name."""
stmt = select(Document).order_by(Document.created_at.desc())
if topic is not None:
stmt = (
stmt.join(DocumentTopic, DocumentTopic.document_id == Document.id)
.join(Topic, Topic.id == DocumentTopic.topic_id)
.where(Topic.name == topic)
)
result = await session.execute(stmt)
docs = result.scalars().all()
rows = []
for doc in docs:
topic_names = await _load_topic_names(session, doc.id)
rows.append(_doc_to_dict(doc, topic_names))
return rows
async def delete_document(session: AsyncSession, doc_id: str) -> bool:
"""Delete a document's MinIO object and its PostgreSQL row.
Returns False if the document is not found; True on success.
MinIO deletion failures are logged to stderr but do not prevent the DB row
deletion (the bytes may already be gone).
"""
try:
uid = uuid.UUID(doc_id)
except ValueError:
return False
doc = await session.get(Document, uid)
if doc is None:
return False
try:
await _backend().delete_object(doc.object_key)
except Exception as exc:
print(f"[storage] WARNING: MinIO delete_object failed for {doc.object_key!r}: {exc}", file=sys.stderr)
await session.delete(doc)
await session.commit()
return True
async def update_document_topics(
session: AsyncSession, doc_id: str, topics: list
) -> Optional[dict]:
"""Replace all topic associations for a document.
Auto-creates topics that don't yet exist. Returns the refreshed metadata
dict, or None if the document is not found.
"""
try:
uid = uuid.UUID(doc_id)
except ValueError:
return None
doc = await session.get(Document, uid)
if doc is None:
return None
# Remove all existing associations
await session.execute(
delete(DocumentTopic).where(DocumentTopic.document_id == uid)
)
# Re-insert, deduplicating by name
seen: set = set()
for name in topics:
if name in seen:
continue
seen.add(name)
topic_dict = await create_topic(session, name)
session.add(
DocumentTopic(
document_id=uid,
topic_id=uuid.UUID(topic_dict["id"]),
)
)
doc.status = "classified"
await session.commit()
return await get_metadata(session, doc_id)
async def remove_topic_from_all_documents(
session: AsyncSession, topic_name: str
) -> int:
"""Delete all DocumentTopic rows for the named topic.
Returns the number of rows deleted.
"""
q = await session.execute(
select(Topic).where(sql_func.lower(Topic.name) == topic_name.lower())
)
topic = q.scalars().first()
if topic is None:
return 0
result = await session.execute(
delete(DocumentTopic).where(DocumentTopic.topic_id == topic.id)
)
await session.commit()
return result.rowcount
# ── Topics ────────────────────────────────────────────────────────────────────
async def load_topics(session: AsyncSession) -> list:
"""Return all topics ordered by name."""
q = await session.execute(select(Topic).order_by(Topic.name))
return [
{"id": str(t.id), "name": t.name, "description": t.description, "color": t.color}
for t in q.scalars()
]
async def save_topics(session: AsyncSession, topics: list) -> None:
"""Idempotent bulk replace — delete all Topic rows then insert the list.
# legacy: not used by current endpoints; preserved for API compatibility.
"""
await session.execute(delete(Topic))
for t in topics:
session.add(
Topic(
id=uuid.UUID(t["id"]) if t.get("id") else uuid.uuid4(),
name=t["name"],
description=t.get("description", ""),
color=t.get("color", "#6366f1"),
)
)
await session.commit()
async def get_topic(session: AsyncSession, topic_id: str) -> Optional[dict]:
"""Return a topic dict by UUID string, or None if not found."""
try:
uid = uuid.UUID(topic_id)
except ValueError:
return None
t = await session.get(Topic, uid)
if t is None:
return None
return {"id": str(t.id), "name": t.name, "description": t.description, "color": t.color}
async def create_topic(
session: AsyncSession,
name: str,
description: str = "",
color: str = "#6366f1",
) -> dict:
"""Create a topic, or return the existing one (case-insensitive deduplication)."""
q = await session.execute(
select(Topic).where(sql_func.lower(Topic.name) == name.lower())
)
existing = q.scalars().first()
if existing is not None:
return {
"id": str(existing.id),
"name": existing.name,
"description": existing.description,
"color": existing.color,
}
topic = Topic(name=name, description=description, color=color)
session.add(topic)
await session.commit()
return {
"id": str(topic.id),
"name": topic.name,
"description": topic.description,
"color": topic.color,
}
async def update_topic(
session: AsyncSession,
topic_id: str,
name: Optional[str] = None,
description: Optional[str] = None,
color: Optional[str] = None,
) -> Optional[dict]:
"""Update non-None fields on a topic. Returns updated dict or None."""
try:
uid = uuid.UUID(topic_id)
except ValueError:
return None
t = await session.get(Topic, uid)
if t is None:
return None
if name is not None:
t.name = name
if description is not None:
t.description = description
if color is not None:
t.color = color
await session.commit()
return {"id": str(t.id), "name": t.name, "description": t.description, "color": t.color}
async def delete_topic(session: AsyncSession, topic_id: str) -> Optional[str]:
"""Delete a topic and cascade-remove its DocumentTopic rows.
Returns the deleted topic name, or None if not found.
"""
try:
uid = uuid.UUID(topic_id)
except ValueError:
return None
t = await session.get(Topic, uid)
if t is None:
return None
name = t.name
await session.delete(t) # ondelete="CASCADE" removes DocumentTopic rows
await session.commit()
return name
async def topic_doc_counts(session: AsyncSession) -> dict:
"""Return a mapping of topic name -> document count."""
q = await session.execute(
select(Topic.name, sql_func.count(DocumentTopic.document_id))
.join(DocumentTopic, DocumentTopic.topic_id == Topic.id, isouter=True)
.group_by(Topic.name)
)
return {name: count for name, count in q}
# ── Settings ──────────────────────────────────────────────────────────────────
# Phase 2 will move per-user settings to users.ai_provider / users.ai_model
# (D-03 deferred to user-scoped column population).
# For now these remain as flat-file JSON — single-writer, no filelock needed.
def load_settings() -> dict:
"""Read app settings from the flat-file SETTINGS_FILE.
Falls back to DEFAULT_SETTINGS if the file is missing.
# Phase 2 will move per-user settings to users.ai_provider / users.ai_model.
"""
try:
return json.loads(SETTINGS_FILE.read_text())
except (FileNotFoundError, json.JSONDecodeError):
return copy.deepcopy(DEFAULT_SETTINGS)
def save_settings(settings: dict) -> None:
"""Write app settings to the flat-file SETTINGS_FILE.
No filelock — Phase 1 settings file is single-writer.
# Phase 2 will move per-user settings to users.ai_provider / users.ai_model.
"""
SETTINGS_FILE.write_text(json.dumps(settings, indent=2))
def mask_api_key(key: str) -> str:
if not key or len(key) <= 4:
return "****"
return "****" + key[-4:]
def settings_masked(settings: dict) -> dict:
s = copy.deepcopy(settings)
for prov in ("anthropic", "openai"):
key = s.get("providers", {}).get(prov, {}).get("api_key", "")
if key:
s["providers"][prov]["api_key"] = mask_api_key(key)
return s
# ── Public surface ─────────────────────────────────────────────────────────────
__all__ = [
"save_upload",
"save_metadata",
"get_metadata",
"list_metadata",
"delete_document",
"update_document_topics",
"remove_topic_from_all_documents",
"load_topics",
"save_topics",
"get_topic",
"create_topic",
"update_topic",
"delete_topic",
"topic_doc_counts",
"load_settings",
"save_settings",
"mask_api_key",
"settings_masked",
]