feat(01-05): wire main.py lifespan+health and rewrite documents+topics to async session

- Rewrite main.py lifespan: MinIO client created at startup, docuvault bucket
  auto-created if missing, stored on app.state.minio; engine.dispose() on shutdown
- Extend /health endpoint: probes PostgreSQL (SELECT 1) and MinIO (bucket_exists)
  returning {"status": "ok"|"degraded", "checks": {"postgres": ..., "minio": ...}}
- Rewrite api/documents.py: all routes inject session: AsyncSession = Depends(get_db);
  save_upload/save_metadata/list_metadata/get_metadata/delete_document all async;
  upload handler queues extract_and_classify.delay() instead of inline classification;
  /classify endpoint retains synchronous await classifier.classify_document() for
  backward-compatible immediate response
- Rewrite api/topics.py: all routes inject session dependency; all storage calls
  are async with session parameter; Pydantic models TopicCreate/TopicUpdate/
  SuggestRequest preserved verbatim
This commit is contained in:
curo1305
2026-05-22 09:47:00 +02:00
parent 32d67de1ca
commit c1931fd566
3 changed files with 120 additions and 42 deletions
+36 -20
View File
@@ -1,6 +1,11 @@
from datetime import datetime, timezone from datetime import datetime, timezone
from fastapi import APIRouter, UploadFile, File, Form, HTTPException, Query
from services import storage, extractor, classifier from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, UploadFile
from sqlalchemy.ext.asyncio import AsyncSession
from deps.db import get_db
from services import classifier, extractor, storage
from tasks.document_tasks import extract_and_classify
router = APIRouter(prefix="/api/documents", tags=["documents"]) router = APIRouter(prefix="/api/documents", tags=["documents"])
@@ -22,6 +27,7 @@ ALLOWED_MIME_TYPES = {
async def upload_document( async def upload_document(
file: UploadFile = File(...), file: UploadFile = File(...),
auto_classify: bool = Form(True), auto_classify: bool = Form(True),
session: AsyncSession = Depends(get_db),
): ):
content = await file.read() content = await file.read()
if len(content) == 0: if len(content) == 0:
@@ -29,8 +35,10 @@ async def upload_document(
mime = file.content_type or "application/octet-stream" mime = file.content_type or "application/octet-stream"
saved = storage.save_upload(content, file.filename or "upload", mime) saved = await storage.save_upload(session, content, file.filename or "upload", mime)
text = extractor.extract_text(saved["path"], mime)
# Extract text from the in-memory bytes (avoid a second MinIO round-trip at upload time)
text = extractor.extract_text_from_bytes(content, mime)
now = datetime.now(timezone.utc).isoformat() now = datetime.now(timezone.utc).isoformat()
meta = { meta = {
@@ -40,20 +48,20 @@ async def upload_document(
"mime_type": mime, "mime_type": mime,
"size_bytes": len(content), "size_bytes": len(content),
"extracted_text": text, "extracted_text": text,
# Phase 1 cutover: topics are empty at upload time; the Celery worker
# fills them in asynchronously after extract_and_classify completes.
"topics": [], "topics": [],
"created_at": now, "created_at": now,
"classified_at": None, "classified_at": None,
} }
storage.save_metadata(meta) await storage.save_metadata(session, meta)
if auto_classify: if auto_classify:
try: # Queue the extract+classify task on the Celery documents queue (STORE-08).
topics = await classifier.classify_document(saved["id"]) # The task re-fetches bytes from MinIO, extracts text, and classifies.
meta["topics"] = topics # The upload response returns topics=[] — polling GET /api/documents/{id}
meta["classified_at"] = datetime.now(timezone.utc).isoformat() # will show the populated topics once the worker completes.
except Exception as e: extract_and_classify.delay(str(saved["id"]))
# Classification failure is non-fatal; document is still saved
meta["classification_error"] = str(e)
return meta return meta
@@ -63,38 +71,46 @@ async def list_documents(
topic: str | None = Query(None), topic: str | None = Query(None),
page: int = Query(1, ge=1), page: int = Query(1, ge=1),
per_page: int = Query(20, ge=1, le=100), per_page: int = Query(20, ge=1, le=100),
session: AsyncSession = Depends(get_db),
): ):
docs = storage.list_metadata(topic=topic) docs = await storage.list_metadata(session, topic=topic)
total = len(docs) total = len(docs)
start = (page - 1) * per_page start = (page - 1) * per_page
return {"items": docs[start : start + per_page], "total": total, "page": page, "per_page": per_page} return {"items": docs[start : start + per_page], "total": total, "page": page, "per_page": per_page}
@router.get("/{doc_id}") @router.get("/{doc_id}")
async def get_document(doc_id: str): async def get_document(doc_id: str, session: AsyncSession = Depends(get_db)):
meta = storage.get_metadata(doc_id) meta = await storage.get_metadata(session, doc_id)
if meta is None: if meta is None:
raise HTTPException(404, "Document not found") raise HTTPException(404, "Document not found")
return meta return meta
@router.delete("/{doc_id}") @router.delete("/{doc_id}")
async def delete_document(doc_id: str): async def delete_document(doc_id: str, session: AsyncSession = Depends(get_db)):
ok = storage.delete_document(doc_id) ok = await storage.delete_document(session, doc_id)
if not ok: if not ok:
raise HTTPException(404, "Document not found") raise HTTPException(404, "Document not found")
return {"success": True} return {"success": True}
@router.post("/{doc_id}/classify") @router.post("/{doc_id}/classify")
async def classify_document(doc_id: str, body: dict = {}): async def classify_document(
meta = storage.get_metadata(doc_id) doc_id: str,
body: dict = {},
session: AsyncSession = Depends(get_db),
):
meta = await storage.get_metadata(session, doc_id)
if meta is None: if meta is None:
raise HTTPException(404, "Document not found") raise HTTPException(404, "Document not found")
topic_names = body.get("topics") if body else None topic_names = body.get("topics") if body else None
try: try:
topics = await classifier.classify_document(doc_id, topic_names) # The /classify endpoint calls classifier synchronously and returns the
# topic list immediately — this preserves the historical behavior.
# The upload-time path uses Celery .delay() instead (Phase 1 cutover).
topics = await classifier.classify_document(session, doc_id, topic_names)
except Exception as e: except Exception as e:
raise HTTPException(500, f"Classification failed: {e}") raise HTTPException(500, f"Classification failed: {e}")
+23 -15
View File
@@ -1,6 +1,9 @@
from fastapi import APIRouter, HTTPException from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel from pydantic import BaseModel
from services import storage, classifier from sqlalchemy.ext.asyncio import AsyncSession
from deps.db import get_db
from services import classifier, storage
router = APIRouter(prefix="/api/topics", tags=["topics"]) router = APIRouter(prefix="/api/topics", tags=["topics"])
@@ -22,24 +25,29 @@ class SuggestRequest(BaseModel):
@router.get("") @router.get("")
async def list_topics(): async def list_topics(session: AsyncSession = Depends(get_db)):
topics = storage.load_topics() topics = await storage.load_topics(session)
counts = storage.topic_doc_counts() counts = await storage.topic_doc_counts(session)
for t in topics: for t in topics:
t["doc_count"] = counts.get(t["name"], 0) t["doc_count"] = counts.get(t["name"], 0)
return {"topics": topics} return {"topics": topics}
@router.post("") @router.post("")
async def create_topic(body: TopicCreate): async def create_topic(body: TopicCreate, session: AsyncSession = Depends(get_db)):
topic = storage.create_topic(body.name, body.description, body.color) topic = await storage.create_topic(session, body.name, body.description, body.color)
topic["doc_count"] = 0 topic["doc_count"] = 0
return topic return topic
@router.patch("/{topic_id}") @router.patch("/{topic_id}")
async def update_topic(topic_id: str, body: TopicUpdate): async def update_topic(
topic = storage.update_topic( topic_id: str,
body: TopicUpdate,
session: AsyncSession = Depends(get_db),
):
topic = await storage.update_topic(
session,
topic_id, topic_id,
name=body.name, name=body.name,
description=body.description, description=body.description,
@@ -47,26 +55,26 @@ async def update_topic(topic_id: str, body: TopicUpdate):
) )
if topic is None: if topic is None:
raise HTTPException(404, "Topic not found") raise HTTPException(404, "Topic not found")
counts = storage.topic_doc_counts() counts = await storage.topic_doc_counts(session)
topic["doc_count"] = counts.get(topic["name"], 0) topic["doc_count"] = counts.get(topic["name"], 0)
return topic return topic
@router.delete("/{topic_id}") @router.delete("/{topic_id}")
async def delete_topic(topic_id: str): async def delete_topic(topic_id: str, session: AsyncSession = Depends(get_db)):
name = storage.delete_topic(topic_id) name = await storage.delete_topic(session, topic_id)
if name is None: if name is None:
raise HTTPException(404, "Topic not found") raise HTTPException(404, "Topic not found")
return {"success": True, "removed_from_documents": True} return {"success": True, "removed_from_documents": True}
@router.post("/suggest") @router.post("/suggest")
async def suggest_topics(body: SuggestRequest): async def suggest_topics(body: SuggestRequest, session: AsyncSession = Depends(get_db)):
meta = storage.get_metadata(body.document_id) meta = await storage.get_metadata(session, body.document_id)
if meta is None: if meta is None:
raise HTTPException(404, "Document not found") raise HTTPException(404, "Document not found")
try: try:
suggestions = await classifier.suggest_topics_for_document(body.document_id) suggestions = await classifier.suggest_topics_for_document(session, body.document_id)
except Exception as e: except Exception as e:
raise HTTPException(500, f"Suggestion failed: {e}") raise HTTPException(500, f"Suggestion failed: {e}")
return {"suggested": suggestions} return {"suggested": suggestions}
+61 -7
View File
@@ -1,31 +1,85 @@
import asyncio
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from config import ensure_data_dirs from minio import Minio
from sqlalchemy import text
from api.documents import router as documents_router from api.documents import router as documents_router
from api.topics import router as topics_router
from api.settings import router as settings_router from api.settings import router as settings_router
from api.topics import router as topics_router
from config import settings
from db.session import AsyncSessionLocal, engine
@asynccontextmanager @asynccontextmanager
async def lifespan(app: FastAPI): async def lifespan(app: FastAPI):
ensure_data_dirs() """FastAPI lifespan: create MinIO bucket at startup, dispose engine at shutdown.
D-07: bucket auto-create ensures the docuvault bucket exists on every reboot.
MinIO client stored on app.state.minio for use in the /health endpoint.
"""
# MinIO bucket initialization (RESEARCH.md Pattern 4)
minio_client = Minio(
settings.minio_endpoint,
access_key=settings.minio_access_key,
secret_key=settings.minio_secret_key,
secure=False,
)
exists = await asyncio.to_thread(minio_client.bucket_exists, settings.minio_bucket)
if not exists:
await asyncio.to_thread(minio_client.make_bucket, settings.minio_bucket)
app.state.minio = minio_client
yield yield
# Shutdown: close all pooled connections
await engine.dispose()
app = FastAPI(title="Document Scanner API", version="1.0.0", lifespan=lifespan) app = FastAPI(title="Document Scanner API", version="1.0.0", lifespan=lifespan)
app.add_middleware( app.add_middleware(
CORSMiddleware, CORSMiddleware,
allow_origins=["*"], allow_origins=["*"], # Phase 1: locked down in Phase 2 after auth lands
allow_methods=["*"], allow_methods=["*"],
allow_headers=["*"], allow_headers=["*"],
) )
@app.get("/health") @app.get("/health")
async def health(): async def health(request: Request):
return {"status": "ok"} """Extended health probe: reports PostgreSQL and MinIO connectivity (D-07).
Always returns HTTP 200 — 'degraded' status signals a partial outage without
causing load-balancer retries.
Note (T-01-05-03): error strings expose Python exception class names — acceptable
for an internal/dev endpoint in Phase 1. Phase 2 will trim to 'error' or
'unhealthy' once the endpoint is internet-facing.
"""
checks: dict = {}
# PostgreSQL probe
try:
async with AsyncSessionLocal() as session:
await session.execute(text("SELECT 1"))
checks["postgres"] = "ok"
except Exception as e:
checks["postgres"] = f"error: {type(e).__name__}: {e}"
# MinIO probe
try:
ok = await asyncio.to_thread(
request.app.state.minio.bucket_exists, settings.minio_bucket
)
checks["minio"] = "ok" if ok else "error: bucket missing"
except Exception as e:
checks["minio"] = f"error: {type(e).__name__}: {e}"
status = "ok" if all(v == "ok" for v in checks.values()) else "degraded"
return {"status": status, "checks": checks}
app.include_router(documents_router) app.include_router(documents_router)