diff --git a/features/doc-service/alembic/versions/0008_rename_file_path_to_storage_key.py b/features/doc-service/alembic/versions/0008_rename_file_path_to_storage_key.py new file mode 100644 index 0000000..5228a30 --- /dev/null +++ b/features/doc-service/alembic/versions/0008_rename_file_path_to_storage_key.py @@ -0,0 +1,56 @@ +"""rename file_path to storage_key and strip filesystem prefix from existing rows + +Revision ID: 0008 +Revises: 0007 +Create Date: 2026-04-20 + +Renames the documents.file_path column to storage_key. +Existing rows have paths like '/data/documents/{user_id}/{doc_id}.pdf' or +'/data/documents/watch/{doc_id}.pdf'. The migration strips the leading +'/data/documents/' prefix so the value becomes a plain storage key +(e.g. '{user_id}/{doc_id}.pdf') that the storage-service uses as the object key. +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + +revision: str = "0008" +down_revision: Union[str, None] = "0007" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + with op.batch_alter_table("documents") as batch_op: + batch_op.alter_column( + "file_path", + new_column_name="storage_key", + existing_type=sa.String(), + existing_nullable=False, + ) + + # Strip the '/data/documents/' filesystem prefix from pre-migration rows. + op.execute( + sa.text( + "UPDATE documents SET storage_key = REPLACE(storage_key, '/data/documents/', '')" + " WHERE storage_key LIKE '/data/documents/%'" + ) + ) + + +def downgrade() -> None: + # Restore the filesystem prefix so old code can still find the files. + op.execute( + sa.text( + "UPDATE documents SET storage_key = '/data/documents/' || storage_key" + " WHERE storage_key NOT LIKE '/data/documents/%'" + ) + ) + with op.batch_alter_table("documents") as batch_op: + batch_op.alter_column( + "storage_key", + new_column_name="file_path", + existing_type=sa.String(), + existing_nullable=False, + ) diff --git a/features/doc-service/app/core/config.py b/features/doc-service/app/core/config.py index 8af5112..978c4db 100644 --- a/features/doc-service/app/core/config.py +++ b/features/doc-service/app/core/config.py @@ -7,6 +7,7 @@ class Settings(BaseSettings): DATA_DIR: str = "/data/documents" CONFIG_PATH: str = "/config/doc_service_config.json" AI_SERVICE_URL: str = "http://ai-service:8010" + STORAGE_SERVICE_URL: str = "http://storage-service:8020" class Config: env_file = ".env" diff --git a/features/doc-service/app/models/document.py b/features/doc-service/app/models/document.py index 6339c3e..7aaa0b9 100644 --- a/features/doc-service/app/models/document.py +++ b/features/doc-service/app/models/document.py @@ -13,7 +13,7 @@ class Document(Base): id: Mapped[str] = mapped_column(String, primary_key=True, default=lambda: str(uuid.uuid4())) user_id: Mapped[str] = mapped_column(String, nullable=False, index=True) filename: Mapped[str] = mapped_column(String, nullable=False) - file_path: Mapped[str] = mapped_column(String, nullable=False) + storage_key: Mapped[str] = mapped_column(String, nullable=False) file_size: Mapped[int] = mapped_column(Integer, nullable=False) status: Mapped[str] = mapped_column(String, nullable=False, default="pending") title: Mapped[str | None] = mapped_column(String(500), nullable=True) diff --git a/features/doc-service/app/routers/documents.py b/features/doc-service/app/routers/documents.py index 942c997..fd54855 100644 --- a/features/doc-service/app/routers/documents.py +++ b/features/doc-service/app/routers/documents.py @@ -1,10 +1,10 @@ import asyncio +import io import json import math import uuid from datetime import datetime, timezone -import aiofiles import pdfplumber from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query, UploadFile from fastapi.responses import StreamingResponse @@ -29,7 +29,7 @@ from app.schemas.document import ( from app.schemas.share import DocumentShareCreate, DocumentShareOut, SharedDocumentOut from app.services.ai_client import AIServiceError, classify_document from app.services.config_reader import load_doc_config -from app.services.storage import delete_file, get_upload_path, save_upload +from app.services.storage import delete_file, download_file, save_upload router = APIRouter() @@ -118,10 +118,10 @@ def _doc_with_categories( ) -def _extract_pdf_text(file_path: str) -> str: +def _extract_pdf_text(pdf_bytes: bytes) -> str: """Synchronous — must be called via asyncio.to_thread.""" text_parts = [] - with pdfplumber.open(file_path) as pdf: + with pdfplumber.open(io.BytesIO(pdf_bytes)) as pdf: for page in pdf.pages: page_text = page.extract_text() if page_text: @@ -146,7 +146,8 @@ async def process_document(doc_id: str) -> None: await db.commit() try: - text = await asyncio.to_thread(_extract_pdf_text, doc.file_path) + pdf_bytes = await download_file(doc.storage_key) + text = await asyncio.to_thread(_extract_pdf_text, pdf_bytes) result = await classify_document(text) doc.raw_text = text[:500_000] # cap stored text at 500k chars @@ -187,13 +188,13 @@ async def upload_document( ) doc_id = str(uuid.uuid4()) - dest = await save_upload(file_data, user_id, doc_id) + storage_key = await save_upload(file_data, user_id, doc_id) doc = Document( id=doc_id, user_id=user_id, filename=file.filename or "upload.pdf", - file_path=str(dest), + storage_key=storage_key, file_size=len(file_data), status="pending", ) @@ -578,7 +579,7 @@ async def delete_document( if not can_delete_via_share and not can_delete_as_group_admin: raise HTTPException(status_code=403, detail="Not allowed to delete this document") - delete_file(doc.file_path) + await delete_file(doc.storage_key) await db.delete(doc) await db.commit() @@ -609,13 +610,13 @@ async def download_file( if doc is None: raise HTTPException(status_code=404, detail="Document not found") - async def file_generator(): - async with aiofiles.open(doc.file_path, "rb") as f: - while chunk := await f.read(64 * 1024): - yield chunk + try: + pdf_bytes = await download_file(doc.storage_key) + except FileNotFoundError: + raise HTTPException(status_code=404, detail="File not found in storage") return StreamingResponse( - file_generator(), + iter([pdf_bytes]), media_type="application/pdf", headers={"Content-Disposition": f'inline; filename="{doc.filename}"'}, ) diff --git a/features/doc-service/app/services/file_watcher.py b/features/doc-service/app/services/file_watcher.py index adb06f7..99e04fe 100644 --- a/features/doc-service/app/services/file_watcher.py +++ b/features/doc-service/app/services/file_watcher.py @@ -3,7 +3,7 @@ File-system watcher for the watch directory. Uses the watchdog library to monitor a configured directory for new PDF files. When a PDF is detected, it is automatically ingested into the document service -(copied to /data/documents, a DB record is created, and the AI pipeline runs). +(uploaded to storage-service, a DB record is created, and the AI pipeline runs). Key design decisions: - No-remove policy: on_deleted and on_moved events are intentionally ignored. @@ -82,13 +82,13 @@ async def ingest_file(path_str: str, watch_root: Path, config: dict) -> None: logger.warning("[watcher] Cannot read %s: %s", path_str, exc) return - # Save a copy to /data/documents/watch/{doc_id}.pdf + # Upload to storage-service under documents/watch/{doc_id}.pdf doc_id = existing.id if existing is not None else str(uuid.uuid4()) - dest = await save_upload(file_data, WATCH_USER_ID, doc_id) + storage_key = await save_upload(file_data, WATCH_USER_ID, doc_id) if existing is not None: # Re-ingest a previously failed document - existing.file_path = str(dest) + existing.storage_key = storage_key existing.file_size = len(file_data) existing.status = "pending" existing.error_message = None @@ -100,7 +100,7 @@ async def ingest_file(path_str: str, watch_root: Path, config: dict) -> None: source="watch", watch_path=path_str, filename=path.name, - file_path=str(dest), + storage_key=storage_key, file_size=len(file_data), status="pending", ) diff --git a/features/doc-service/app/services/storage.py b/features/doc-service/app/services/storage.py index 45ea418..c0fb45d 100644 --- a/features/doc-service/app/services/storage.py +++ b/features/doc-service/app/services/storage.py @@ -1,27 +1,61 @@ -import asyncio -from pathlib import Path +""" +Storage client for the storage-service HTTP API. -import aiofiles +All persistent file I/O goes through storage-service:8020. +The bucket for all document PDFs is 'documents'. +Keys follow the pattern: + uploaded: {user_id}/{doc_id}.pdf + watch-ingested: watch/{doc_id}.pdf +""" +import logging + +import httpx from app.core.config import settings +logger = logging.getLogger(__name__) -def get_upload_path(user_id: str, doc_id: str) -> Path: - """Return /data/documents/{user_id}/{doc_id}.pdf, creating the directory if needed.""" - user_dir = Path(settings.DATA_DIR) / user_id - user_dir.mkdir(parents=True, exist_ok=True) - return user_dir / f"{doc_id}.pdf" +_BUCKET = "documents" -async def save_upload(file_data: bytes, user_id: str, doc_id: str) -> Path: - dest = get_upload_path(user_id, doc_id) - async with aiofiles.open(dest, "wb") as f: - await f.write(file_data) - return dest +def _storage_url(key: str) -> str: + return f"{settings.STORAGE_SERVICE_URL}/objects/{_BUCKET}/{key}" -def delete_file(file_path: str) -> None: +def build_storage_key(user_id: str, doc_id: str) -> str: + """Return the canonical storage key for a document.""" + return f"{user_id}/{doc_id}.pdf" + + +async def save_upload(file_data: bytes, user_id: str, doc_id: str) -> str: + """Upload bytes to storage-service. Returns the storage key.""" + key = build_storage_key(user_id, doc_id) + async with httpx.AsyncClient(timeout=30.0) as client: + resp = await client.put( + _storage_url(key), + content=file_data, + headers={"Content-Type": "application/octet-stream"}, + ) + resp.raise_for_status() + return key + + +async def download_file(storage_key: str) -> bytes: + """Download bytes from storage-service by storage key.""" + async with httpx.AsyncClient(timeout=60.0) as client: + resp = await client.get(_storage_url(storage_key)) + if resp.status_code == 404: + raise FileNotFoundError(f"Object not found: {storage_key}") + resp.raise_for_status() + return resp.content + + +async def delete_file(storage_key: str) -> None: + """Delete an object from storage-service. Swallows errors — deletion failure must not 500.""" try: - Path(file_path).unlink(missing_ok=True) - except OSError: - pass # log but do not raise — deletion failure must not 500 + async with httpx.AsyncClient(timeout=10.0) as client: + resp = await client.delete(_storage_url(storage_key)) + if resp.status_code not in (204, 404): + logger.warning("storage-service DELETE returned %s for key %s", resp.status_code, storage_key) + except Exception as exc: + logger.warning("Could not delete %s from storage-service: %s", storage_key, exc)