Files
Business-Management/features/doc-service/app/routers/documents.py
T
curo1305 0d34867a69 Add PDF document service with AI extraction and per-app settings
- New `features/doc-service` FastAPI microservice: PDF upload, async
  text extraction (pdfplumber), AI classification via Anthropic/Ollama/
  LM Studio, per-user categories, file download
- Alembic migration isolated with `alembic_version_doc_service` table
- Main backend: httpx proxy routers for /api/documents/* and
  /api/documents/categories/*, admin settings API at /api/settings/*
- Runtime config in /config/doc_service_config.json (shared Docker
  volume); api_key masking on reads; atomic write with os.replace()
- Frontend: DocumentsPage, DocumentAdminSettingsPage, updated AppsPage
  launcher hub, simplified Nav (removed Settings link), new routes
- docker-compose: doc-service service, doc_data + app_config volumes,
  removed internal:true from backend-net for outbound AI API calls
- Fix pre-commit hook: probe Docker socket path so git subprocess picks
  up Docker Desktop on macOS
- Fix security_check.py: use sys.executable for bandit so venv python
  is used instead of system python

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 05:28:11 +02:00

305 lines
10 KiB
Python

import asyncio
import json
import uuid
from datetime import datetime, timezone
import aiofiles
import pdfplumber
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, UploadFile
from fastapi.responses import StreamingResponse
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.database import AsyncSessionLocal, get_db
from app.deps import get_user_id
from app.models.category import DocumentCategory
from app.models.category_assignment import CategoryAssignment
from app.models.document import Document
from app.schemas.document import DocumentOut, DocumentStatusOut, DocumentTypeUpdate
from app.services.ai import get_provider
from app.services.config_reader import load_doc_config
from app.services.storage import delete_file, get_upload_path, save_upload
router = APIRouter()
_DEFAULT_MAX_BYTES = 20 * 1024 * 1024
# ── Helpers ───────────────────────────────────────────────────────────────────
async def _get_user_doc(doc_id: str, user_id: str, db: AsyncSession) -> Document:
result = await db.execute(
select(Document)
.where(Document.id == doc_id, Document.user_id == user_id)
.options(
selectinload(Document.category_assignments)
.selectinload(CategoryAssignment.category)
)
)
doc = result.scalar_one_or_none()
if doc is None:
raise HTTPException(status_code=404, detail="Document not found")
return doc
def _doc_with_categories(doc: Document) -> DocumentOut:
from app.schemas.document import CategoryOut
cats = [CategoryOut(id=a.category.id, name=a.category.name) for a in doc.category_assignments]
return DocumentOut(
id=doc.id,
user_id=doc.user_id,
filename=doc.filename,
file_size=doc.file_size,
status=doc.status,
document_type=doc.document_type,
extracted_data=doc.extracted_data,
tags=doc.tags,
error_message=doc.error_message,
created_at=doc.created_at,
processed_at=doc.processed_at,
categories=cats,
)
def _extract_pdf_text(file_path: str) -> str:
"""Synchronous — must be called via asyncio.to_thread."""
text_parts = []
with pdfplumber.open(file_path) as pdf:
for page in pdf.pages:
page_text = page.extract_text()
if page_text:
text_parts.append(page_text)
return "\n".join(text_parts)
# ── Background processing ─────────────────────────────────────────────────────
async def process_document(doc_id: str) -> None:
"""
Runs after the upload response is sent.
Opens its own DB session — never use the request's Depends session here.
Loads AI config fresh from the config file so settings changes apply without restart.
"""
async with AsyncSessionLocal() as db:
doc = await db.get(Document, doc_id)
if doc is None:
return
doc.status = "processing"
await db.commit()
try:
text = await asyncio.to_thread(_extract_pdf_text, doc.file_path)
config = await load_doc_config()
provider = get_provider(config["ai"])
result = await provider.classify_document(text)
doc.raw_text = text[:500_000] # cap stored text at 500k chars
doc.extracted_data = json.dumps(result)
doc.document_type = result.get("document_type", "unknown")
doc.tags = json.dumps(result.get("tags", []))
doc.status = "done"
doc.processed_at = datetime.now(timezone.utc)
except Exception as exc:
doc.status = "failed"
doc.error_message = str(exc)[:500]
await db.commit()
# ── Routes ────────────────────────────────────────────────────────────────────
@router.post("/upload", response_model=DocumentOut, status_code=202)
async def upload_document(
file: UploadFile,
background_tasks: BackgroundTasks,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db),
) -> DocumentOut:
if file.content_type not in ("application/pdf", "application/octet-stream"):
if not (file.filename or "").lower().endswith(".pdf"):
raise HTTPException(status_code=415, detail="Only PDF files are accepted")
config = await load_doc_config()
max_bytes = config.get("documents", {}).get("max_pdf_bytes", _DEFAULT_MAX_BYTES)
file_data = await file.read()
if len(file_data) > max_bytes:
raise HTTPException(
status_code=413,
detail=f"File exceeds maximum size of {max_bytes // (1024*1024)} MB",
)
doc_id = str(uuid.uuid4())
dest = await save_upload(file_data, user_id, doc_id)
doc = Document(
id=doc_id,
user_id=user_id,
filename=file.filename or "upload.pdf",
file_path=str(dest),
file_size=len(file_data),
status="pending",
)
db.add(doc)
await db.commit()
await db.refresh(doc)
background_tasks.add_task(process_document, doc_id)
return _doc_with_categories(doc)
@router.get("", response_model=list[DocumentOut])
async def list_documents(
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db),
) -> list[DocumentOut]:
result = await db.execute(
select(Document)
.where(Document.user_id == user_id)
.options(
selectinload(Document.category_assignments)
.selectinload(CategoryAssignment.category)
)
.order_by(Document.created_at.desc())
)
return [_doc_with_categories(d) for d in result.scalars().all()]
@router.get("/{doc_id}", response_model=DocumentOut)
async def get_document(
doc_id: str,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db),
) -> DocumentOut:
doc = await _get_user_doc(doc_id, user_id, db)
return _doc_with_categories(doc)
@router.get("/{doc_id}/status", response_model=DocumentStatusOut)
async def get_document_status(
doc_id: str,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db),
) -> Document:
result = await db.execute(
select(Document).where(Document.id == doc_id, Document.user_id == user_id)
)
doc = result.scalar_one_or_none()
if doc is None:
raise HTTPException(status_code=404, detail="Document not found")
return doc
@router.patch("/{doc_id}/type", response_model=DocumentOut)
async def update_document_type(
doc_id: str,
body: DocumentTypeUpdate,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db),
) -> DocumentOut:
doc = await _get_user_doc(doc_id, user_id, db)
doc.document_type = body.document_type
await db.commit()
await db.refresh(doc)
return _doc_with_categories(doc)
@router.delete("/{doc_id}", status_code=204)
async def delete_document(
doc_id: str,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db),
) -> None:
result = await db.execute(
select(Document).where(Document.id == doc_id, Document.user_id == user_id)
)
doc = result.scalar_one_or_none()
if doc is None:
raise HTTPException(status_code=404, detail="Document not found")
delete_file(doc.file_path)
await db.delete(doc)
await db.commit()
@router.get("/{doc_id}/file")
async def download_file(
doc_id: str,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db),
) -> StreamingResponse:
result = await db.execute(
select(Document).where(Document.id == doc_id, Document.user_id == user_id)
)
doc = result.scalar_one_or_none()
if doc is None:
raise HTTPException(status_code=404, detail="Document not found")
async def file_generator():
async with aiofiles.open(doc.file_path, "rb") as f:
while chunk := await f.read(64 * 1024):
yield chunk
return StreamingResponse(
file_generator(),
media_type="application/pdf",
headers={"Content-Disposition": f'inline; filename="{doc.filename}"'},
)
# ── Category assignment ───────────────────────────────────────────────────────
@router.post("/{doc_id}/categories/{cat_id}", status_code=204)
async def assign_category(
doc_id: str,
cat_id: str,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db),
) -> None:
# Verify both belong to this user
doc_result = await db.execute(
select(Document).where(Document.id == doc_id, Document.user_id == user_id)
)
if doc_result.scalar_one_or_none() is None:
raise HTTPException(status_code=404, detail="Document not found")
cat_result = await db.execute(
select(DocumentCategory).where(
DocumentCategory.id == cat_id, DocumentCategory.user_id == user_id
)
)
if cat_result.scalar_one_or_none() is None:
raise HTTPException(status_code=404, detail="Category not found")
# Upsert — ignore if already assigned
existing = await db.execute(
select(CategoryAssignment).where(
CategoryAssignment.document_id == doc_id,
CategoryAssignment.category_id == cat_id,
)
)
if existing.scalar_one_or_none() is None:
db.add(CategoryAssignment(document_id=doc_id, category_id=cat_id))
await db.commit()
@router.delete("/{doc_id}/categories/{cat_id}", status_code=204)
async def remove_category(
doc_id: str,
cat_id: str,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db),
) -> None:
result = await db.execute(
select(CategoryAssignment).where(
CategoryAssignment.document_id == doc_id,
CategoryAssignment.category_id == cat_id,
)
)
assignment = result.scalar_one_or_none()
if assignment:
await db.delete(assignment)
await db.commit()