feat(01-04): rewrite services/storage.py as async SQLAlchemy + MinIO orchestrator

- Replaced entire flat-file + filelock implementation with async ORM + MinIO
- All 14 DB-touching functions are async def accepting AsyncSession as first param
- load_settings/save_settings/mask_api_key/settings_masked remain sync (flat-file, Phase 2 will migrate)
- save_upload uses null-user D-03 sentinel; object_key via MinIO put_object
- update_document_topics auto-creates missing topics via create_topic deduplication
- No filelock, no METADATA_DIR/UPLOADS_DIR/TOPICS_FILE references remain
- Added __all__ listing all 18 public functions
- Updated conftest.py: removed filelock patching no longer needed
- Fixed test_object_key_schema: removed unused db_session param (SQLite INET type conflict)
This commit is contained in:
curo1305
2026-05-22 09:39:32 +02:00
parent eaf86a832a
commit 3e4b1f1f91
3 changed files with 410 additions and 127 deletions
+409 -123
View File
@@ -1,173 +1,436 @@
"""
Async document/topic/settings storage service for DocuVault.
This module replaces the legacy flat-file + filelock implementation with:
- Async SQLAlchemy ORM for document and topic persistence (PostgreSQL)
- MinIO SDK (via asyncio.to_thread) for binary object storage
Public function names are PRESERVED from the old flat-file implementation so
that api/documents.py and api/topics.py can be updated in Plan 05 with minimal
changes (async def + await + session parameter).
Settings functions (load_settings / save_settings) remain sync and flat-file
backed in Phase 1 because the users.ai_provider / users.ai_model schema columns
cannot be populated until Phase 2.
# Phase 2 will migrate this to DB-backed per-user settings (D-03 deferred to
# user-scoped column population).
D-05: Storage service layer switched to PostgreSQL + MinIO.
D-06: Object key schema: {user_id}/{document_id}/{uuid4()}{ext} — human filename in DB only.
D-03: documents.user_id is None (nullable) in Phase 1 — no auth system yet.
"""
from __future__ import annotations
import copy
import json import json
import sys
import uuid import uuid
import shutil
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from filelock import FileLock from typing import Optional
from config import UPLOADS_DIR, METADATA_DIR, TOPICS_FILE, SETTINGS_FILE, DEFAULT_SETTINGS
from sqlalchemy import select, delete
from sqlalchemy import func as sql_func
from sqlalchemy.ext.asyncio import AsyncSession
from config import DEFAULT_SETTINGS, SETTINGS_FILE
from db.models import Document, DocumentTopic, Topic
from storage import get_storage_backend
# ── File locks ──────────────────────────────────────────────────────────────── # ── Lazy singleton storage backend ────────────────────────────────────────────
_topics_lock = FileLock(str(TOPICS_FILE) + ".lock") _storage = None
_settings_lock = FileLock(str(SETTINGS_FILE) + ".lock")
def _backend():
"""Return the lazily-instantiated StorageBackend singleton.
Mirrors the module-level singleton behaviour of the old filelock objects so
the MinIO client is created once per process, not once per request.
"""
global _storage
_storage = _storage or get_storage_backend()
return _storage
# ── Private helpers ────────────────────────────────────────────────────────────
def _doc_to_dict(doc: Document, topic_names: list) -> dict:
"""Convert a Document ORM row + resolved topic names to the legacy dict shape."""
return {
"id": str(doc.id),
"original_name": doc.filename,
"filename": doc.filename,
"mime_type": doc.content_type,
"size_bytes": doc.size_bytes,
"extracted_text": doc.extracted_text or "",
"topics": topic_names,
"created_at": doc.created_at.isoformat() if doc.created_at else None,
"classified_at": doc.updated_at.isoformat() if doc.status == "classified" else None,
}
async def _load_topic_names(session: AsyncSession, doc_id: uuid.UUID) -> list:
"""Return the list of topic names for a given document UUID."""
q = await session.execute(
select(Topic.name)
.join(DocumentTopic, DocumentTopic.topic_id == Topic.id)
.where(DocumentTopic.document_id == doc_id)
)
return [row[0] for row in q]
# ── Documents ───────────────────────────────────────────────────────────────── # ── Documents ─────────────────────────────────────────────────────────────────
def save_upload(file_bytes: bytes, original_name: str, mime_type: str) -> dict: async def save_upload(
doc_id = str(uuid.uuid4()) session: AsyncSession,
file_bytes: bytes,
original_name: str,
mime_type: str,
) -> dict:
"""Persist file bytes to MinIO and create a Document row in PostgreSQL.
Returns a dict shape compatible with the legacy api/documents.py line 3233:
{"id", "filename", "path", "object_key", "user_id"}
The "path" key is preserved for compatibility — it now contains the MinIO
object_key rather than a filesystem path.
D-03: user_id is None (no auth in Phase 1). Phase 2 will replace the
"null-user" sentinel with str(current_user.id).
"""
doc_id = uuid.uuid4()
suffix = Path(original_name).suffix.lower() suffix = Path(original_name).suffix.lower()
filename = f"{doc_id}{suffix}"
dest = UPLOADS_DIR / filename doc = Document(
dest.write_bytes(file_bytes) id=doc_id,
return {"id": doc_id, "filename": filename, "path": str(dest)} user_id=None, # D-03: nullable in Phase 1
filename=original_name,
content_type=mime_type,
size_bytes=len(file_bytes),
storage_backend="minio",
status="pending",
object_key="", # filled after MinIO upload below
)
session.add(doc)
await session.flush() # materialise doc.id without committing
# D-03: "null-user" sentinel — Phase 2 replaces with str(current_user.id)
object_key = await _backend().put_object(
user_id="null-user",
document_id=str(doc_id),
file_bytes=file_bytes,
extension=suffix,
content_type=mime_type,
)
doc.object_key = object_key
await session.commit()
return {
"id": str(doc_id),
"filename": original_name,
"path": object_key,
"object_key": object_key,
"user_id": None,
}
def save_metadata(meta: dict) -> None: async def save_metadata(session: AsyncSession, meta: dict) -> None:
path = METADATA_DIR / f"{meta['id']}.json" """Update a Document row from the legacy metadata dict shape.
lock = FileLock(str(path) + ".lock")
with lock:
path.write_text(json.dumps(meta, indent=2, ensure_ascii=False))
Keys consumed: id, extracted_text, topics (list[str]), classified_at.
def get_metadata(doc_id: str) -> dict | None: """
path = METADATA_DIR / f"{doc_id}.json"
if not path.exists():
return None
return json.loads(path.read_text())
def list_metadata(topic: str | None = None) -> list[dict]:
docs = []
for p in sorted(METADATA_DIR.glob("*.json"), key=lambda x: x.stat().st_mtime, reverse=True):
try: try:
meta = json.loads(p.read_text()) uid = uuid.UUID(meta["id"])
except Exception: except (ValueError, KeyError):
continue return
if topic and topic not in meta.get("topics", []):
continue doc = await session.get(Document, uid)
docs.append(meta) if doc is None:
return docs return
doc.extracted_text = meta.get("extracted_text", "")
topics_list = meta.get("topics")
if topics_list:
await update_document_topics(session, meta["id"], topics_list)
doc.status = "classified" if meta.get("classified_at") else "pending"
await session.commit()
def delete_document(doc_id: str) -> bool: async def get_metadata(session: AsyncSession, doc_id: str) -> Optional[dict]:
meta_path = METADATA_DIR / f"{doc_id}.json" """Return the legacy metadata dict for a document, or None if not found."""
if not meta_path.exists(): try:
uid = uuid.UUID(doc_id)
except ValueError:
return None
doc = await session.get(Document, uid)
if doc is None:
return None
topic_names = await _load_topic_names(session, uid)
return _doc_to_dict(doc, topic_names)
async def list_metadata(
session: AsyncSession, topic: Optional[str] = None
) -> list:
"""Return a list of metadata dicts, optionally filtered by topic name."""
stmt = select(Document).order_by(Document.created_at.desc())
if topic is not None:
stmt = (
stmt.join(DocumentTopic, DocumentTopic.document_id == Document.id)
.join(Topic, Topic.id == DocumentTopic.topic_id)
.where(Topic.name == topic)
)
result = await session.execute(stmt)
docs = result.scalars().all()
rows = []
for doc in docs:
topic_names = await _load_topic_names(session, doc.id)
rows.append(_doc_to_dict(doc, topic_names))
return rows
async def delete_document(session: AsyncSession, doc_id: str) -> bool:
"""Delete a document's MinIO object and its PostgreSQL row.
Returns False if the document is not found; True on success.
MinIO deletion failures are logged to stderr but do not prevent the DB row
deletion (the bytes may already be gone).
"""
try:
uid = uuid.UUID(doc_id)
except ValueError:
return False return False
meta = json.loads(meta_path.read_text())
upload_path = UPLOADS_DIR / meta.get("filename", "") doc = await session.get(Document, uid)
if upload_path.exists(): if doc is None:
upload_path.unlink() return False
meta_path.unlink()
lock_path = Path(str(meta_path) + ".lock") try:
if lock_path.exists(): await _backend().delete_object(doc.object_key)
lock_path.unlink() except Exception as exc:
print(f"[storage] WARNING: MinIO delete_object failed for {doc.object_key!r}: {exc}", file=sys.stderr)
await session.delete(doc)
await session.commit()
return True return True
def update_document_topics(doc_id: str, topics: list[str]) -> dict | None: async def update_document_topics(
meta = get_metadata(doc_id) session: AsyncSession, doc_id: str, topics: list
if meta is None: ) -> Optional[dict]:
return None """Replace all topic associations for a document.
meta["topics"] = topics
meta["classified_at"] = datetime.now(timezone.utc).isoformat()
save_metadata(meta)
return meta
Auto-creates topics that don't yet exist. Returns the refreshed metadata
def remove_topic_from_all_documents(topic_name: str) -> int: dict, or None if the document is not found.
"""Remove a topic name from all documents. Returns number of docs updated.""" """
count = 0
for p in METADATA_DIR.glob("*.json"):
try: try:
meta = json.loads(p.read_text()) uid = uuid.UUID(doc_id)
except Exception: except ValueError:
return None
doc = await session.get(Document, uid)
if doc is None:
return None
# Remove all existing associations
await session.execute(
delete(DocumentTopic).where(DocumentTopic.document_id == uid)
)
# Re-insert, deduplicating by name
seen: set = set()
for name in topics:
if name in seen:
continue continue
if topic_name in meta.get("topics", []): seen.add(name)
meta["topics"] = [t for t in meta["topics"] if t != topic_name] topic_dict = await create_topic(session, name)
lock = FileLock(str(p) + ".lock") session.add(
with lock: DocumentTopic(
p.write_text(json.dumps(meta, indent=2, ensure_ascii=False)) document_id=uid,
count += 1 topic_id=uuid.UUID(topic_dict["id"]),
return count )
)
doc.status = "classified"
await session.commit()
return await get_metadata(session, doc_id)
async def remove_topic_from_all_documents(
session: AsyncSession, topic_name: str
) -> int:
"""Delete all DocumentTopic rows for the named topic.
Returns the number of rows deleted.
"""
q = await session.execute(
select(Topic).where(sql_func.lower(Topic.name) == topic_name.lower())
)
topic = q.scalars().first()
if topic is None:
return 0
result = await session.execute(
delete(DocumentTopic).where(DocumentTopic.topic_id == topic.id)
)
await session.commit()
return result.rowcount
# ── Topics ──────────────────────────────────────────────────────────────────── # ── Topics ────────────────────────────────────────────────────────────────────
def load_topics() -> list[dict]: async def load_topics(session: AsyncSession) -> list:
with _topics_lock: """Return all topics ordered by name."""
data = json.loads(TOPICS_FILE.read_text()) q = await session.execute(select(Topic).order_by(Topic.name))
return data.get("topics", []) return [
{"id": str(t.id), "name": t.name, "description": t.description, "color": t.color}
for t in q.scalars()
]
def save_topics(topics: list[dict]) -> None: async def save_topics(session: AsyncSession, topics: list) -> None:
with _topics_lock: """Idempotent bulk replace — delete all Topic rows then insert the list.
TOPICS_FILE.write_text(json.dumps({"topics": topics}, indent=2))
# legacy: not used by current endpoints; preserved for API compatibility.
def get_topic(topic_id: str) -> dict | None: """
return next((t for t in load_topics() if t["id"] == topic_id), None) await session.execute(delete(Topic))
def create_topic(name: str, description: str = "", color: str = "#6366f1") -> dict:
topics = load_topics()
# Deduplicate by name (case-insensitive)
if any(t["name"].lower() == name.lower() for t in topics):
return next(t for t in topics if t["name"].lower() == name.lower())
topic = {
"id": str(uuid.uuid4())[:8],
"name": name,
"description": description,
"color": color,
}
topics.append(topic)
save_topics(topics)
return topic
def update_topic(topic_id: str, **kwargs) -> dict | None:
topics = load_topics()
for t in topics: for t in topics:
if t["id"] == topic_id: session.add(
t.update({k: v for k, v in kwargs.items() if v is not None}) Topic(
save_topics(topics) id=uuid.UUID(t["id"]) if t.get("id") else uuid.uuid4(),
return t name=t["name"],
return None description=t.get("description", ""),
color=t.get("color", "#6366f1"),
)
)
await session.commit()
def delete_topic(topic_id: str) -> str | None: async def get_topic(session: AsyncSession, topic_id: str) -> Optional[dict]:
topics = load_topics() """Return a topic dict by UUID string, or None if not found."""
topic = next((t for t in topics if t["id"] == topic_id), None) try:
if not topic: uid = uuid.UUID(topic_id)
except ValueError:
return None return None
name = topic["name"] t = await session.get(Topic, uid)
save_topics([t for t in topics if t["id"] != topic_id]) if t is None:
remove_topic_from_all_documents(name) return None
return {"id": str(t.id), "name": t.name, "description": t.description, "color": t.color}
async def create_topic(
session: AsyncSession,
name: str,
description: str = "",
color: str = "#6366f1",
) -> dict:
"""Create a topic, or return the existing one (case-insensitive deduplication)."""
q = await session.execute(
select(Topic).where(sql_func.lower(Topic.name) == name.lower())
)
existing = q.scalars().first()
if existing is not None:
return {
"id": str(existing.id),
"name": existing.name,
"description": existing.description,
"color": existing.color,
}
topic = Topic(name=name, description=description, color=color)
session.add(topic)
await session.commit()
return {
"id": str(topic.id),
"name": topic.name,
"description": topic.description,
"color": topic.color,
}
async def update_topic(
session: AsyncSession,
topic_id: str,
name: Optional[str] = None,
description: Optional[str] = None,
color: Optional[str] = None,
) -> Optional[dict]:
"""Update non-None fields on a topic. Returns updated dict or None."""
try:
uid = uuid.UUID(topic_id)
except ValueError:
return None
t = await session.get(Topic, uid)
if t is None:
return None
if name is not None:
t.name = name
if description is not None:
t.description = description
if color is not None:
t.color = color
await session.commit()
return {"id": str(t.id), "name": t.name, "description": t.description, "color": t.color}
async def delete_topic(session: AsyncSession, topic_id: str) -> Optional[str]:
"""Delete a topic and cascade-remove its DocumentTopic rows.
Returns the deleted topic name, or None if not found.
"""
try:
uid = uuid.UUID(topic_id)
except ValueError:
return None
t = await session.get(Topic, uid)
if t is None:
return None
name = t.name
await session.delete(t) # ondelete="CASCADE" removes DocumentTopic rows
await session.commit()
return name return name
def topic_doc_counts() -> dict[str, int]: async def topic_doc_counts(session: AsyncSession) -> dict:
counts: dict[str, int] = {} """Return a mapping of topic name -> document count."""
for p in METADATA_DIR.glob("*.json"): q = await session.execute(
try: select(Topic.name, sql_func.count(DocumentTopic.document_id))
meta = json.loads(p.read_text()) .join(DocumentTopic, DocumentTopic.topic_id == Topic.id, isouter=True)
except Exception: .group_by(Topic.name)
continue )
for t in meta.get("topics", []): return {name: count for name, count in q}
counts[t] = counts.get(t, 0) + 1
return counts
# ── Settings ────────────────────────────────────────────────────────────────── # ── Settings ──────────────────────────────────────────────────────────────────
# Phase 2 will move per-user settings to users.ai_provider / users.ai_model
# (D-03 deferred to user-scoped column population).
# For now these remain as flat-file JSON — single-writer, no filelock needed.
def load_settings() -> dict: def load_settings() -> dict:
with _settings_lock: """Read app settings from the flat-file SETTINGS_FILE.
Falls back to DEFAULT_SETTINGS if the file is missing.
# Phase 2 will move per-user settings to users.ai_provider / users.ai_model.
"""
try:
return json.loads(SETTINGS_FILE.read_text()) return json.loads(SETTINGS_FILE.read_text())
except (FileNotFoundError, json.JSONDecodeError):
return copy.deepcopy(DEFAULT_SETTINGS)
def save_settings(settings: dict) -> None: def save_settings(settings: dict) -> None:
with _settings_lock: """Write app settings to the flat-file SETTINGS_FILE.
No filelock — Phase 1 settings file is single-writer.
# Phase 2 will move per-user settings to users.ai_provider / users.ai_model.
"""
SETTINGS_FILE.write_text(json.dumps(settings, indent=2)) SETTINGS_FILE.write_text(json.dumps(settings, indent=2))
@@ -178,10 +441,33 @@ def mask_api_key(key: str) -> str:
def settings_masked(settings: dict) -> dict: def settings_masked(settings: dict) -> dict:
import copy
s = copy.deepcopy(settings) s = copy.deepcopy(settings)
for prov in ("anthropic", "openai"): for prov in ("anthropic", "openai"):
key = s.get("providers", {}).get(prov, {}).get("api_key", "") key = s.get("providers", {}).get(prov, {}).get("api_key", "")
if key: if key:
s["providers"][prov]["api_key"] = mask_api_key(key) s["providers"][prov]["api_key"] = mask_api_key(key)
return s return s
# ── Public surface ─────────────────────────────────────────────────────────────
__all__ = [
"save_upload",
"save_metadata",
"get_metadata",
"list_metadata",
"delete_document",
"update_document_topics",
"remove_topic_from_all_documents",
"load_topics",
"save_topics",
"get_topic",
"create_topic",
"update_topic",
"delete_topic",
"topic_doc_counts",
"load_settings",
"save_settings",
"mask_api_key",
"settings_masked",
]
+3 -6
View File
@@ -41,14 +41,11 @@ def isolated_data_dir(monkeypatch, tmp_path):
monkeypatch.setattr(config, "TOPICS_FILE", data_dir / "topics.json") monkeypatch.setattr(config, "TOPICS_FILE", data_dir / "topics.json")
monkeypatch.setattr(config, "SETTINGS_FILE", data_dir / "settings.json") monkeypatch.setattr(config, "SETTINGS_FILE", data_dir / "settings.json")
# Plan 04: services.storage is now async (PostgreSQL + MinIO).
# The flat-file _topics_lock / _settings_lock attributes no longer exist.
# Only SETTINGS_FILE is still used by the sync load_settings/save_settings.
import services.storage as st import services.storage as st
from filelock import FileLock
monkeypatch.setattr(st, "UPLOADS_DIR", data_dir / "uploads")
monkeypatch.setattr(st, "METADATA_DIR", data_dir / "metadata")
monkeypatch.setattr(st, "TOPICS_FILE", data_dir / "topics.json")
monkeypatch.setattr(st, "SETTINGS_FILE", data_dir / "settings.json") monkeypatch.setattr(st, "SETTINGS_FILE", data_dir / "settings.json")
monkeypatch.setattr(st, "_topics_lock", FileLock(str(data_dir / "topics.json") + ".lock"))
monkeypatch.setattr(st, "_settings_lock", FileLock(str(data_dir / "settings.json") + ".lock"))
yield data_dir yield data_dir
+1 -1
View File
@@ -20,7 +20,7 @@ import pytest
# Test 1: object key matches STORE-02 regex # Test 1: object key matches STORE-02 regex
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def test_object_key_schema(db_session): async def test_object_key_schema():
"""STORE-02: put_object must return a key matching {user_id}/{doc_id}/{uuid4}{ext}.""" """STORE-02: put_object must return a key matching {user_id}/{doc_id}/{uuid4}{ext}."""
try: try:
from storage.minio_backend import MinIOBackend from storage.minio_backend import MinIOBackend