Files
Business-Management/features/doc-service/app/services/file_watcher.py
T
curo1305 1c8b35399c fix: capitalize watch-folder names to PascalCase-with-dashes on ingest
Folder names like "invoices" and "vendor-invoices" are now converted to
"Invoices" and "Vendor-Invoices" when the watcher auto-creates categories,
matching the naming convention enforced on user-created categories.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-18 22:26:24 +02:00

262 lines
9.7 KiB
Python

"""
File-system watcher for the watch directory.
Uses the watchdog library to monitor a configured directory for new PDF files.
When a PDF is detected, it is automatically ingested into the document service
(copied to /data/documents, a DB record is created, and the AI pipeline runs).
Key design decisions:
- No-remove policy: on_deleted and on_moved events are intentionally ignored.
The watcher never deletes, moves, or modifies files on the watched volume.
- Watch documents use user_id="watch" as a sentinel so they are visible to
all authenticated users in the document list.
- Subfolder names map to categories: a file at invoices/bill.pdf is assigned
to an "Invoices" category (auto-created if needed; folder name is converted
to PascalCase-with-dashes: "vendor-invoices""Vendor-Invoices").
- Suggestions: if ai_folder_suggestion or ai_rename_suggestion are enabled,
the relevant fields are set on the document after AI processing so users
can confirm/reject from the UI.
- Thread → async bridge: watchdog runs in a daemon thread; asyncio coroutines
are dispatched from that thread via run_coroutine_threadsafe.
"""
import asyncio
import json
import logging
import re
import uuid
from pathlib import Path
from watchdog.events import FileSystemEventHandler
from watchdog.observers.polling import PollingObserver
from app.database import AsyncSessionLocal
from app.models.category import DocumentCategory
from app.models.category_assignment import CategoryAssignment
from app.models.document import Document
from app.services.storage import save_upload
logger = logging.getLogger(__name__)
# Must match _WATCH_USER_ID in app/routers/documents.py
WATCH_USER_ID = "watch"
# ── Ingestion logic ───────────────────────────────────────────────────────────
async def ingest_file(path_str: str, watch_root: Path, config: dict) -> None:
"""
Ingest a single PDF file from the watch directory.
Idempotent: skips files that already have a non-failed document record.
"""
from sqlalchemy import select
path = Path(path_str)
if not path.exists() or not path.is_file():
return
async with AsyncSessionLocal() as db:
# Idempotency check — skip if already tracked (and not failed)
existing_result = await db.execute(
select(Document).where(Document.watch_path == path_str)
)
existing = existing_result.scalar_one_or_none()
if existing is not None and existing.status != "failed":
return
# Determine category from the first subfolder component
try:
rel = path.relative_to(watch_root)
folder_name = (
"-".join(p.capitalize() for p in re.split(r"[-_\s]+", rel.parts[0]) if p)
if len(rel.parts) > 1 else None
)
except ValueError:
folder_name = None
# Read file bytes
try:
file_data = path.read_bytes()
except OSError as exc:
logger.warning("[watcher] Cannot read %s: %s", path_str, exc)
return
# Save a copy to /data/documents/watch/{doc_id}.pdf
doc_id = existing.id if existing is not None else str(uuid.uuid4())
dest = await save_upload(file_data, WATCH_USER_ID, doc_id)
if existing is not None:
# Re-ingest a previously failed document
existing.file_path = str(dest)
existing.file_size = len(file_data)
existing.status = "pending"
existing.error_message = None
await db.commit()
else:
doc = Document(
id=doc_id,
user_id=WATCH_USER_ID,
source="watch",
watch_path=path_str,
filename=path.name,
file_path=str(dest),
file_size=len(file_data),
status="pending",
)
db.add(doc)
await db.commit()
# Auto-assign category from subfolder name
if folder_name:
cat_result = await db.execute(
select(DocumentCategory).where(
DocumentCategory.user_id == WATCH_USER_ID,
DocumentCategory.name == folder_name,
)
)
cat = cat_result.scalar_one_or_none()
if cat is None:
cat = DocumentCategory(user_id=WATCH_USER_ID, name=folder_name[:128])
db.add(cat)
await db.commit()
await db.refresh(cat)
exists_assign = await db.execute(
select(CategoryAssignment).where(
CategoryAssignment.document_id == doc_id,
CategoryAssignment.category_id == cat.id,
)
)
if exists_assign.scalar_one_or_none() is None:
db.add(CategoryAssignment(document_id=doc_id, category_id=cat.id))
await db.commit()
# Run AI pipeline (opens its own session internally)
from app.routers.documents import process_document
await process_document(doc_id)
# Set AI suggestions if enabled
if config.get("ai_folder_suggestion") or config.get("ai_rename_suggestion"):
await _apply_suggestions(doc_id, config)
async def _apply_suggestions(doc_id: str, config: dict) -> None:
"""Populate suggested_folder / suggested_filename after AI processing."""
from sqlalchemy import select
async with AsyncSessionLocal() as db:
result = await db.execute(select(Document).where(Document.id == doc_id))
doc = result.scalar_one_or_none()
if doc is None or doc.status != "done" or not doc.extracted_data:
return
try:
extracted = json.loads(doc.extracted_data)
except Exception:
return
changed = False
if config.get("ai_folder_suggestion"):
suggestions = extracted.get("suggested_categories", [])
if suggestions:
doc.suggested_folder = str(suggestions[0])[:128]
changed = True
if config.get("ai_rename_suggestion"):
title = extracted.get("title")
if title:
doc.suggested_filename = str(title)[:500]
changed = True
if changed:
await db.commit()
# ── Watchdog event handler ────────────────────────────────────────────────────
class _PdfEventHandler(FileSystemEventHandler):
def __init__(
self,
watch_root: Path,
loop: asyncio.AbstractEventLoop,
config: dict,
) -> None:
super().__init__()
self._watch_root = watch_root
self._loop = loop
self._config = config
def _dispatch_ingest(self, path_str: str) -> None:
if path_str.lower().endswith(".pdf"):
asyncio.run_coroutine_threadsafe(
ingest_file(path_str, self._watch_root, self._config),
self._loop,
)
def on_created(self, event): # type: ignore[override]
if not event.is_directory:
self._dispatch_ingest(event.src_path)
def on_moved(self, event): # type: ignore[override]
# Handles atomic rename/move (e.g. Nextcloud or Syncthing completing a sync)
if not event.is_directory:
self._dispatch_ingest(event.dest_path)
# on_deleted / on_modified: intentionally not overridden — no-remove policy
# ── Service ───────────────────────────────────────────────────────────────────
class FileWatcherService:
"""Manages the watchdog Observer lifecycle within the FastAPI lifespan."""
def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
self._loop = loop
self._observer: Observer | None = None
self._watch_root: Path | None = None
self._config: dict = {}
async def start(self, watch_path: str, config: dict) -> None:
self._watch_root = Path(watch_path)
self._config = config
if not self._watch_root.exists():
logger.warning(
"[watcher] Watch path %s does not exist — file watching disabled",
watch_path,
)
return
handler = _PdfEventHandler(self._watch_root, self._loop, config)
self._observer = PollingObserver()
self._observer.schedule(handler, watch_path, recursive=True)
self._observer.start()
logger.info("[watcher] started, watching %s", watch_path)
# Run startup scan as a background task so startup is not blocked
asyncio.create_task(self._scan_existing())
async def _scan_existing(self) -> None:
"""Ingest any PDFs already present in the watch directory."""
if self._watch_root is None:
return
logger.info("[watcher] scanning existing files in %s", self._watch_root)
count = 0
for pdf_path in sorted(self._watch_root.rglob("*.pdf")):
try:
await ingest_file(str(pdf_path), self._watch_root, self._config)
count += 1
except Exception as exc:
logger.warning("[watcher] scan error for %s: %s", pdf_path, exc)
logger.info("[watcher] startup scan complete — processed %d file(s)", count)
async def stop(self) -> None:
if self._observer is not None:
self._observer.stop()
await asyncio.to_thread(self._observer.join)
self._observer = None
logger.info("[watcher] stopped")