diff --git a/backend/api/documents.py b/backend/api/documents.py index b627475..13b7ad9 100644 --- a/backend/api/documents.py +++ b/backend/api/documents.py @@ -1,6 +1,11 @@ from datetime import datetime, timezone -from fastapi import APIRouter, UploadFile, File, Form, HTTPException, Query -from services import storage, extractor, classifier + +from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, UploadFile +from sqlalchemy.ext.asyncio import AsyncSession + +from deps.db import get_db +from services import classifier, extractor, storage +from tasks.document_tasks import extract_and_classify router = APIRouter(prefix="/api/documents", tags=["documents"]) @@ -22,6 +27,7 @@ ALLOWED_MIME_TYPES = { async def upload_document( file: UploadFile = File(...), auto_classify: bool = Form(True), + session: AsyncSession = Depends(get_db), ): content = await file.read() if len(content) == 0: @@ -29,8 +35,10 @@ async def upload_document( mime = file.content_type or "application/octet-stream" - saved = storage.save_upload(content, file.filename or "upload", mime) - text = extractor.extract_text(saved["path"], mime) + saved = await storage.save_upload(session, content, file.filename or "upload", mime) + + # Extract text from the in-memory bytes (avoid a second MinIO round-trip at upload time) + text = extractor.extract_text_from_bytes(content, mime) now = datetime.now(timezone.utc).isoformat() meta = { @@ -40,20 +48,20 @@ async def upload_document( "mime_type": mime, "size_bytes": len(content), "extracted_text": text, + # Phase 1 cutover: topics are empty at upload time; the Celery worker + # fills them in asynchronously after extract_and_classify completes. "topics": [], "created_at": now, "classified_at": None, } - storage.save_metadata(meta) + await storage.save_metadata(session, meta) if auto_classify: - try: - topics = await classifier.classify_document(saved["id"]) - meta["topics"] = topics - meta["classified_at"] = datetime.now(timezone.utc).isoformat() - except Exception as e: - # Classification failure is non-fatal; document is still saved - meta["classification_error"] = str(e) + # Queue the extract+classify task on the Celery documents queue (STORE-08). + # The task re-fetches bytes from MinIO, extracts text, and classifies. + # The upload response returns topics=[] — polling GET /api/documents/{id} + # will show the populated topics once the worker completes. + extract_and_classify.delay(str(saved["id"])) return meta @@ -63,38 +71,46 @@ async def list_documents( topic: str | None = Query(None), page: int = Query(1, ge=1), per_page: int = Query(20, ge=1, le=100), + session: AsyncSession = Depends(get_db), ): - docs = storage.list_metadata(topic=topic) + docs = await storage.list_metadata(session, topic=topic) total = len(docs) start = (page - 1) * per_page return {"items": docs[start : start + per_page], "total": total, "page": page, "per_page": per_page} @router.get("/{doc_id}") -async def get_document(doc_id: str): - meta = storage.get_metadata(doc_id) +async def get_document(doc_id: str, session: AsyncSession = Depends(get_db)): + meta = await storage.get_metadata(session, doc_id) if meta is None: raise HTTPException(404, "Document not found") return meta @router.delete("/{doc_id}") -async def delete_document(doc_id: str): - ok = storage.delete_document(doc_id) +async def delete_document(doc_id: str, session: AsyncSession = Depends(get_db)): + ok = await storage.delete_document(session, doc_id) if not ok: raise HTTPException(404, "Document not found") return {"success": True} @router.post("/{doc_id}/classify") -async def classify_document(doc_id: str, body: dict = {}): - meta = storage.get_metadata(doc_id) +async def classify_document( + doc_id: str, + body: dict = {}, + session: AsyncSession = Depends(get_db), +): + meta = await storage.get_metadata(session, doc_id) if meta is None: raise HTTPException(404, "Document not found") topic_names = body.get("topics") if body else None try: - topics = await classifier.classify_document(doc_id, topic_names) + # The /classify endpoint calls classifier synchronously and returns the + # topic list immediately — this preserves the historical behavior. + # The upload-time path uses Celery .delay() instead (Phase 1 cutover). + topics = await classifier.classify_document(session, doc_id, topic_names) except Exception as e: raise HTTPException(500, f"Classification failed: {e}") diff --git a/backend/api/topics.py b/backend/api/topics.py index fa3d379..3a81d74 100644 --- a/backend/api/topics.py +++ b/backend/api/topics.py @@ -1,6 +1,9 @@ -from fastapi import APIRouter, HTTPException +from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel -from services import storage, classifier +from sqlalchemy.ext.asyncio import AsyncSession + +from deps.db import get_db +from services import classifier, storage router = APIRouter(prefix="/api/topics", tags=["topics"]) @@ -22,24 +25,29 @@ class SuggestRequest(BaseModel): @router.get("") -async def list_topics(): - topics = storage.load_topics() - counts = storage.topic_doc_counts() +async def list_topics(session: AsyncSession = Depends(get_db)): + topics = await storage.load_topics(session) + counts = await storage.topic_doc_counts(session) for t in topics: t["doc_count"] = counts.get(t["name"], 0) return {"topics": topics} @router.post("") -async def create_topic(body: TopicCreate): - topic = storage.create_topic(body.name, body.description, body.color) +async def create_topic(body: TopicCreate, session: AsyncSession = Depends(get_db)): + topic = await storage.create_topic(session, body.name, body.description, body.color) topic["doc_count"] = 0 return topic @router.patch("/{topic_id}") -async def update_topic(topic_id: str, body: TopicUpdate): - topic = storage.update_topic( +async def update_topic( + topic_id: str, + body: TopicUpdate, + session: AsyncSession = Depends(get_db), +): + topic = await storage.update_topic( + session, topic_id, name=body.name, description=body.description, @@ -47,26 +55,26 @@ async def update_topic(topic_id: str, body: TopicUpdate): ) if topic is None: raise HTTPException(404, "Topic not found") - counts = storage.topic_doc_counts() + counts = await storage.topic_doc_counts(session) topic["doc_count"] = counts.get(topic["name"], 0) return topic @router.delete("/{topic_id}") -async def delete_topic(topic_id: str): - name = storage.delete_topic(topic_id) +async def delete_topic(topic_id: str, session: AsyncSession = Depends(get_db)): + name = await storage.delete_topic(session, topic_id) if name is None: raise HTTPException(404, "Topic not found") return {"success": True, "removed_from_documents": True} @router.post("/suggest") -async def suggest_topics(body: SuggestRequest): - meta = storage.get_metadata(body.document_id) +async def suggest_topics(body: SuggestRequest, session: AsyncSession = Depends(get_db)): + meta = await storage.get_metadata(session, body.document_id) if meta is None: raise HTTPException(404, "Document not found") try: - suggestions = await classifier.suggest_topics_for_document(body.document_id) + suggestions = await classifier.suggest_topics_for_document(session, body.document_id) except Exception as e: raise HTTPException(500, f"Suggestion failed: {e}") return {"suggested": suggestions} diff --git a/backend/main.py b/backend/main.py index 112ddbd..7c54475 100644 --- a/backend/main.py +++ b/backend/main.py @@ -1,31 +1,85 @@ +import asyncio from contextlib import asynccontextmanager -from fastapi import FastAPI + +from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware -from config import ensure_data_dirs +from minio import Minio +from sqlalchemy import text + from api.documents import router as documents_router -from api.topics import router as topics_router from api.settings import router as settings_router +from api.topics import router as topics_router +from config import settings +from db.session import AsyncSessionLocal, engine @asynccontextmanager async def lifespan(app: FastAPI): - ensure_data_dirs() + """FastAPI lifespan: create MinIO bucket at startup, dispose engine at shutdown. + + D-07: bucket auto-create ensures the docuvault bucket exists on every reboot. + MinIO client stored on app.state.minio for use in the /health endpoint. + """ + # MinIO bucket initialization (RESEARCH.md Pattern 4) + minio_client = Minio( + settings.minio_endpoint, + access_key=settings.minio_access_key, + secret_key=settings.minio_secret_key, + secure=False, + ) + exists = await asyncio.to_thread(minio_client.bucket_exists, settings.minio_bucket) + if not exists: + await asyncio.to_thread(minio_client.make_bucket, settings.minio_bucket) + app.state.minio = minio_client + yield + # Shutdown: close all pooled connections + await engine.dispose() + app = FastAPI(title="Document Scanner API", version="1.0.0", lifespan=lifespan) app.add_middleware( CORSMiddleware, - allow_origins=["*"], + allow_origins=["*"], # Phase 1: locked down in Phase 2 after auth lands allow_methods=["*"], allow_headers=["*"], ) @app.get("/health") -async def health(): - return {"status": "ok"} +async def health(request: Request): + """Extended health probe: reports PostgreSQL and MinIO connectivity (D-07). + + Always returns HTTP 200 — 'degraded' status signals a partial outage without + causing load-balancer retries. + + Note (T-01-05-03): error strings expose Python exception class names — acceptable + for an internal/dev endpoint in Phase 1. Phase 2 will trim to 'error' or + 'unhealthy' once the endpoint is internet-facing. + """ + checks: dict = {} + + # PostgreSQL probe + try: + async with AsyncSessionLocal() as session: + await session.execute(text("SELECT 1")) + checks["postgres"] = "ok" + except Exception as e: + checks["postgres"] = f"error: {type(e).__name__}: {e}" + + # MinIO probe + try: + ok = await asyncio.to_thread( + request.app.state.minio.bucket_exists, settings.minio_bucket + ) + checks["minio"] = "ok" if ok else "error: bucket missing" + except Exception as e: + checks["minio"] = f"error: {type(e).__name__}: {e}" + + status = "ok" if all(v == "ok" for v in checks.values()) else "degraded" + return {"status": status, "checks": checks} app.include_router(documents_router)