feat: migrate doc-service to use storage-service for file I/O (Phase 2)
- storage.py: replace aiofiles filesystem ops with httpx calls to
storage-service PUT/GET/DELETE /objects/documents/{key}
- Document model: rename file_path → storage_key (plain object key, no path prefix)
- Migration 0008: ALTER COLUMN + data migration strips /data/documents/ prefix
- documents.py: update upload, delete, download endpoints; _extract_pdf_text
now takes bytes (pdfplumber.open(BytesIO)) instead of a filesystem path
- file_watcher.py: store storage_key instead of file_path on ingestion
- doc-service config: add STORAGE_SERVICE_URL env var
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,56 @@
|
||||
"""rename file_path to storage_key and strip filesystem prefix from existing rows
|
||||
|
||||
Revision ID: 0008
|
||||
Revises: 0007
|
||||
Create Date: 2026-04-20
|
||||
|
||||
Renames the documents.file_path column to storage_key.
|
||||
Existing rows have paths like '/data/documents/{user_id}/{doc_id}.pdf' or
|
||||
'/data/documents/watch/{doc_id}.pdf'. The migration strips the leading
|
||||
'/data/documents/' prefix so the value becomes a plain storage key
|
||||
(e.g. '{user_id}/{doc_id}.pdf') that the storage-service uses as the object key.
|
||||
"""
|
||||
from typing import Sequence, Union
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
revision: str = "0008"
|
||||
down_revision: Union[str, None] = "0007"
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
with op.batch_alter_table("documents") as batch_op:
|
||||
batch_op.alter_column(
|
||||
"file_path",
|
||||
new_column_name="storage_key",
|
||||
existing_type=sa.String(),
|
||||
existing_nullable=False,
|
||||
)
|
||||
|
||||
# Strip the '/data/documents/' filesystem prefix from pre-migration rows.
|
||||
op.execute(
|
||||
sa.text(
|
||||
"UPDATE documents SET storage_key = REPLACE(storage_key, '/data/documents/', '')"
|
||||
" WHERE storage_key LIKE '/data/documents/%'"
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
# Restore the filesystem prefix so old code can still find the files.
|
||||
op.execute(
|
||||
sa.text(
|
||||
"UPDATE documents SET storage_key = '/data/documents/' || storage_key"
|
||||
" WHERE storage_key NOT LIKE '/data/documents/%'"
|
||||
)
|
||||
)
|
||||
with op.batch_alter_table("documents") as batch_op:
|
||||
batch_op.alter_column(
|
||||
"storage_key",
|
||||
new_column_name="file_path",
|
||||
existing_type=sa.String(),
|
||||
existing_nullable=False,
|
||||
)
|
||||
@@ -7,6 +7,7 @@ class Settings(BaseSettings):
|
||||
DATA_DIR: str = "/data/documents"
|
||||
CONFIG_PATH: str = "/config/doc_service_config.json"
|
||||
AI_SERVICE_URL: str = "http://ai-service:8010"
|
||||
STORAGE_SERVICE_URL: str = "http://storage-service:8020"
|
||||
|
||||
class Config:
|
||||
env_file = ".env"
|
||||
|
||||
@@ -13,7 +13,7 @@ class Document(Base):
|
||||
id: Mapped[str] = mapped_column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
|
||||
user_id: Mapped[str] = mapped_column(String, nullable=False, index=True)
|
||||
filename: Mapped[str] = mapped_column(String, nullable=False)
|
||||
file_path: Mapped[str] = mapped_column(String, nullable=False)
|
||||
storage_key: Mapped[str] = mapped_column(String, nullable=False)
|
||||
file_size: Mapped[int] = mapped_column(Integer, nullable=False)
|
||||
status: Mapped[str] = mapped_column(String, nullable=False, default="pending")
|
||||
title: Mapped[str | None] = mapped_column(String(500), nullable=True)
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
import asyncio
|
||||
import io
|
||||
import json
|
||||
import math
|
||||
import uuid
|
||||
from datetime import datetime, timezone
|
||||
|
||||
import aiofiles
|
||||
import pdfplumber
|
||||
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query, UploadFile
|
||||
from fastapi.responses import StreamingResponse
|
||||
@@ -29,7 +29,7 @@ from app.schemas.document import (
|
||||
from app.schemas.share import DocumentShareCreate, DocumentShareOut, SharedDocumentOut
|
||||
from app.services.ai_client import AIServiceError, classify_document
|
||||
from app.services.config_reader import load_doc_config
|
||||
from app.services.storage import delete_file, get_upload_path, save_upload
|
||||
from app.services.storage import delete_file, download_file, save_upload
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
@@ -118,10 +118,10 @@ def _doc_with_categories(
|
||||
)
|
||||
|
||||
|
||||
def _extract_pdf_text(file_path: str) -> str:
|
||||
def _extract_pdf_text(pdf_bytes: bytes) -> str:
|
||||
"""Synchronous — must be called via asyncio.to_thread."""
|
||||
text_parts = []
|
||||
with pdfplumber.open(file_path) as pdf:
|
||||
with pdfplumber.open(io.BytesIO(pdf_bytes)) as pdf:
|
||||
for page in pdf.pages:
|
||||
page_text = page.extract_text()
|
||||
if page_text:
|
||||
@@ -146,7 +146,8 @@ async def process_document(doc_id: str) -> None:
|
||||
await db.commit()
|
||||
|
||||
try:
|
||||
text = await asyncio.to_thread(_extract_pdf_text, doc.file_path)
|
||||
pdf_bytes = await download_file(doc.storage_key)
|
||||
text = await asyncio.to_thread(_extract_pdf_text, pdf_bytes)
|
||||
result = await classify_document(text)
|
||||
|
||||
doc.raw_text = text[:500_000] # cap stored text at 500k chars
|
||||
@@ -187,13 +188,13 @@ async def upload_document(
|
||||
)
|
||||
|
||||
doc_id = str(uuid.uuid4())
|
||||
dest = await save_upload(file_data, user_id, doc_id)
|
||||
storage_key = await save_upload(file_data, user_id, doc_id)
|
||||
|
||||
doc = Document(
|
||||
id=doc_id,
|
||||
user_id=user_id,
|
||||
filename=file.filename or "upload.pdf",
|
||||
file_path=str(dest),
|
||||
storage_key=storage_key,
|
||||
file_size=len(file_data),
|
||||
status="pending",
|
||||
)
|
||||
@@ -578,7 +579,7 @@ async def delete_document(
|
||||
if not can_delete_via_share and not can_delete_as_group_admin:
|
||||
raise HTTPException(status_code=403, detail="Not allowed to delete this document")
|
||||
|
||||
delete_file(doc.file_path)
|
||||
await delete_file(doc.storage_key)
|
||||
await db.delete(doc)
|
||||
await db.commit()
|
||||
|
||||
@@ -609,13 +610,13 @@ async def download_file(
|
||||
if doc is None:
|
||||
raise HTTPException(status_code=404, detail="Document not found")
|
||||
|
||||
async def file_generator():
|
||||
async with aiofiles.open(doc.file_path, "rb") as f:
|
||||
while chunk := await f.read(64 * 1024):
|
||||
yield chunk
|
||||
try:
|
||||
pdf_bytes = await download_file(doc.storage_key)
|
||||
except FileNotFoundError:
|
||||
raise HTTPException(status_code=404, detail="File not found in storage")
|
||||
|
||||
return StreamingResponse(
|
||||
file_generator(),
|
||||
iter([pdf_bytes]),
|
||||
media_type="application/pdf",
|
||||
headers={"Content-Disposition": f'inline; filename="{doc.filename}"'},
|
||||
)
|
||||
|
||||
@@ -3,7 +3,7 @@ File-system watcher for the watch directory.
|
||||
|
||||
Uses the watchdog library to monitor a configured directory for new PDF files.
|
||||
When a PDF is detected, it is automatically ingested into the document service
|
||||
(copied to /data/documents, a DB record is created, and the AI pipeline runs).
|
||||
(uploaded to storage-service, a DB record is created, and the AI pipeline runs).
|
||||
|
||||
Key design decisions:
|
||||
- No-remove policy: on_deleted and on_moved events are intentionally ignored.
|
||||
@@ -82,13 +82,13 @@ async def ingest_file(path_str: str, watch_root: Path, config: dict) -> None:
|
||||
logger.warning("[watcher] Cannot read %s: %s", path_str, exc)
|
||||
return
|
||||
|
||||
# Save a copy to /data/documents/watch/{doc_id}.pdf
|
||||
# Upload to storage-service under documents/watch/{doc_id}.pdf
|
||||
doc_id = existing.id if existing is not None else str(uuid.uuid4())
|
||||
dest = await save_upload(file_data, WATCH_USER_ID, doc_id)
|
||||
storage_key = await save_upload(file_data, WATCH_USER_ID, doc_id)
|
||||
|
||||
if existing is not None:
|
||||
# Re-ingest a previously failed document
|
||||
existing.file_path = str(dest)
|
||||
existing.storage_key = storage_key
|
||||
existing.file_size = len(file_data)
|
||||
existing.status = "pending"
|
||||
existing.error_message = None
|
||||
@@ -100,7 +100,7 @@ async def ingest_file(path_str: str, watch_root: Path, config: dict) -> None:
|
||||
source="watch",
|
||||
watch_path=path_str,
|
||||
filename=path.name,
|
||||
file_path=str(dest),
|
||||
storage_key=storage_key,
|
||||
file_size=len(file_data),
|
||||
status="pending",
|
||||
)
|
||||
|
||||
@@ -1,27 +1,61 @@
|
||||
import asyncio
|
||||
from pathlib import Path
|
||||
"""
|
||||
Storage client for the storage-service HTTP API.
|
||||
|
||||
import aiofiles
|
||||
All persistent file I/O goes through storage-service:8020.
|
||||
The bucket for all document PDFs is 'documents'.
|
||||
Keys follow the pattern:
|
||||
uploaded: {user_id}/{doc_id}.pdf
|
||||
watch-ingested: watch/{doc_id}.pdf
|
||||
"""
|
||||
import logging
|
||||
|
||||
import httpx
|
||||
|
||||
from app.core.config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def get_upload_path(user_id: str, doc_id: str) -> Path:
|
||||
"""Return /data/documents/{user_id}/{doc_id}.pdf, creating the directory if needed."""
|
||||
user_dir = Path(settings.DATA_DIR) / user_id
|
||||
user_dir.mkdir(parents=True, exist_ok=True)
|
||||
return user_dir / f"{doc_id}.pdf"
|
||||
_BUCKET = "documents"
|
||||
|
||||
|
||||
async def save_upload(file_data: bytes, user_id: str, doc_id: str) -> Path:
|
||||
dest = get_upload_path(user_id, doc_id)
|
||||
async with aiofiles.open(dest, "wb") as f:
|
||||
await f.write(file_data)
|
||||
return dest
|
||||
def _storage_url(key: str) -> str:
|
||||
return f"{settings.STORAGE_SERVICE_URL}/objects/{_BUCKET}/{key}"
|
||||
|
||||
|
||||
def delete_file(file_path: str) -> None:
|
||||
def build_storage_key(user_id: str, doc_id: str) -> str:
|
||||
"""Return the canonical storage key for a document."""
|
||||
return f"{user_id}/{doc_id}.pdf"
|
||||
|
||||
|
||||
async def save_upload(file_data: bytes, user_id: str, doc_id: str) -> str:
|
||||
"""Upload bytes to storage-service. Returns the storage key."""
|
||||
key = build_storage_key(user_id, doc_id)
|
||||
async with httpx.AsyncClient(timeout=30.0) as client:
|
||||
resp = await client.put(
|
||||
_storage_url(key),
|
||||
content=file_data,
|
||||
headers={"Content-Type": "application/octet-stream"},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
return key
|
||||
|
||||
|
||||
async def download_file(storage_key: str) -> bytes:
|
||||
"""Download bytes from storage-service by storage key."""
|
||||
async with httpx.AsyncClient(timeout=60.0) as client:
|
||||
resp = await client.get(_storage_url(storage_key))
|
||||
if resp.status_code == 404:
|
||||
raise FileNotFoundError(f"Object not found: {storage_key}")
|
||||
resp.raise_for_status()
|
||||
return resp.content
|
||||
|
||||
|
||||
async def delete_file(storage_key: str) -> None:
|
||||
"""Delete an object from storage-service. Swallows errors — deletion failure must not 500."""
|
||||
try:
|
||||
Path(file_path).unlink(missing_ok=True)
|
||||
except OSError:
|
||||
pass # log but do not raise — deletion failure must not 500
|
||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||
resp = await client.delete(_storage_url(storage_key))
|
||||
if resp.status_code not in (204, 404):
|
||||
logger.warning("storage-service DELETE returned %s for key %s", resp.status_code, storage_key)
|
||||
except Exception as exc:
|
||||
logger.warning("Could not delete %s from storage-service: %s", storage_key, exc)
|
||||
|
||||
Reference in New Issue
Block a user