Files
Business-Management/features/doc-service/app/services/file_watcher.py
T
curo1305 18a638bc3a Fix plugin list bug and switch watcher to PollingObserver
- Fix: list_plugins imported _REGISTRY as a direct reference to the
  empty list that existed at import time; register_services() replaces
  _REGISTRY with a new list so the imported reference was always [].
  Added get_registry() helper so callers access the live list via the
  module namespace. GET /api/plugins now correctly returns accessible
  plugins for the current user.

- Fix: switch watchdog from InotifyObserver to PollingObserver. Inotify
  events from the macOS host are not forwarded through the Docker bind
  mount, so new files were only detected via the startup scan. PollingObserver
  (1s default interval) works reliably on all platforms including
  macOS+Docker bind mounts.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-18 02:25:16 +02:00

257 lines
9.5 KiB
Python

"""
File-system watcher for the watch directory.
Uses the watchdog library to monitor a configured directory for new PDF files.
When a PDF is detected, it is automatically ingested into the document service
(copied to /data/documents, a DB record is created, and the AI pipeline runs).
Key design decisions:
- No-remove policy: on_deleted and on_moved events are intentionally ignored.
The watcher never deletes, moves, or modifies files on the watched volume.
- Watch documents use user_id="watch" as a sentinel so they are visible to
all authenticated users in the document list.
- Subfolder names map to categories: a file at invoices/bill.pdf is assigned
to a "invoices" category (auto-created if needed).
- Suggestions: if ai_folder_suggestion or ai_rename_suggestion are enabled,
the relevant fields are set on the document after AI processing so users
can confirm/reject from the UI.
- Thread → async bridge: watchdog runs in a daemon thread; asyncio coroutines
are dispatched from that thread via run_coroutine_threadsafe.
"""
import asyncio
import json
import logging
import uuid
from pathlib import Path
from watchdog.events import FileSystemEventHandler
from watchdog.observers.polling import PollingObserver
from app.database import AsyncSessionLocal
from app.models.category import DocumentCategory
from app.models.category_assignment import CategoryAssignment
from app.models.document import Document
from app.services.storage import save_upload
logger = logging.getLogger(__name__)
# Must match _WATCH_USER_ID in app/routers/documents.py
WATCH_USER_ID = "watch"
# ── Ingestion logic ───────────────────────────────────────────────────────────
async def ingest_file(path_str: str, watch_root: Path, config: dict) -> None:
"""
Ingest a single PDF file from the watch directory.
Idempotent: skips files that already have a non-failed document record.
"""
from sqlalchemy import select
path = Path(path_str)
if not path.exists() or not path.is_file():
return
async with AsyncSessionLocal() as db:
# Idempotency check — skip if already tracked (and not failed)
existing_result = await db.execute(
select(Document).where(Document.watch_path == path_str)
)
existing = existing_result.scalar_one_or_none()
if existing is not None and existing.status != "failed":
return
# Determine category from the first subfolder component
try:
rel = path.relative_to(watch_root)
folder_name = rel.parts[0] if len(rel.parts) > 1 else None
except ValueError:
folder_name = None
# Read file bytes
try:
file_data = path.read_bytes()
except OSError as exc:
logger.warning("[watcher] Cannot read %s: %s", path_str, exc)
return
# Save a copy to /data/documents/watch/{doc_id}.pdf
doc_id = existing.id if existing is not None else str(uuid.uuid4())
dest = await save_upload(file_data, WATCH_USER_ID, doc_id)
if existing is not None:
# Re-ingest a previously failed document
existing.file_path = str(dest)
existing.file_size = len(file_data)
existing.status = "pending"
existing.error_message = None
await db.commit()
else:
doc = Document(
id=doc_id,
user_id=WATCH_USER_ID,
source="watch",
watch_path=path_str,
filename=path.name,
file_path=str(dest),
file_size=len(file_data),
status="pending",
)
db.add(doc)
await db.commit()
# Auto-assign category from subfolder name
if folder_name:
cat_result = await db.execute(
select(DocumentCategory).where(
DocumentCategory.user_id == WATCH_USER_ID,
DocumentCategory.name == folder_name,
)
)
cat = cat_result.scalar_one_or_none()
if cat is None:
cat = DocumentCategory(user_id=WATCH_USER_ID, name=folder_name[:128])
db.add(cat)
await db.commit()
await db.refresh(cat)
exists_assign = await db.execute(
select(CategoryAssignment).where(
CategoryAssignment.document_id == doc_id,
CategoryAssignment.category_id == cat.id,
)
)
if exists_assign.scalar_one_or_none() is None:
db.add(CategoryAssignment(document_id=doc_id, category_id=cat.id))
await db.commit()
# Run AI pipeline (opens its own session internally)
from app.routers.documents import process_document
await process_document(doc_id)
# Set AI suggestions if enabled
if config.get("ai_folder_suggestion") or config.get("ai_rename_suggestion"):
await _apply_suggestions(doc_id, config)
async def _apply_suggestions(doc_id: str, config: dict) -> None:
"""Populate suggested_folder / suggested_filename after AI processing."""
from sqlalchemy import select
async with AsyncSessionLocal() as db:
result = await db.execute(select(Document).where(Document.id == doc_id))
doc = result.scalar_one_or_none()
if doc is None or doc.status != "done" or not doc.extracted_data:
return
try:
extracted = json.loads(doc.extracted_data)
except Exception:
return
changed = False
if config.get("ai_folder_suggestion"):
suggestions = extracted.get("suggested_categories", [])
if suggestions:
doc.suggested_folder = str(suggestions[0])[:128]
changed = True
if config.get("ai_rename_suggestion"):
title = extracted.get("title")
if title:
doc.suggested_filename = str(title)[:500]
changed = True
if changed:
await db.commit()
# ── Watchdog event handler ────────────────────────────────────────────────────
class _PdfEventHandler(FileSystemEventHandler):
def __init__(
self,
watch_root: Path,
loop: asyncio.AbstractEventLoop,
config: dict,
) -> None:
super().__init__()
self._watch_root = watch_root
self._loop = loop
self._config = config
def _dispatch_ingest(self, path_str: str) -> None:
if path_str.lower().endswith(".pdf"):
asyncio.run_coroutine_threadsafe(
ingest_file(path_str, self._watch_root, self._config),
self._loop,
)
def on_created(self, event): # type: ignore[override]
if not event.is_directory:
self._dispatch_ingest(event.src_path)
def on_moved(self, event): # type: ignore[override]
# Handles atomic rename/move (e.g. Nextcloud or Syncthing completing a sync)
if not event.is_directory:
self._dispatch_ingest(event.dest_path)
# on_deleted / on_modified: intentionally not overridden — no-remove policy
# ── Service ───────────────────────────────────────────────────────────────────
class FileWatcherService:
"""Manages the watchdog Observer lifecycle within the FastAPI lifespan."""
def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
self._loop = loop
self._observer: Observer | None = None
self._watch_root: Path | None = None
self._config: dict = {}
async def start(self, watch_path: str, config: dict) -> None:
self._watch_root = Path(watch_path)
self._config = config
if not self._watch_root.exists():
logger.warning(
"[watcher] Watch path %s does not exist — file watching disabled",
watch_path,
)
return
handler = _PdfEventHandler(self._watch_root, self._loop, config)
self._observer = PollingObserver()
self._observer.schedule(handler, watch_path, recursive=True)
self._observer.start()
logger.info("[watcher] started, watching %s", watch_path)
# Run startup scan as a background task so startup is not blocked
asyncio.create_task(self._scan_existing())
async def _scan_existing(self) -> None:
"""Ingest any PDFs already present in the watch directory."""
if self._watch_root is None:
return
logger.info("[watcher] scanning existing files in %s", self._watch_root)
count = 0
for pdf_path in sorted(self._watch_root.rglob("*.pdf")):
try:
await ingest_file(str(pdf_path), self._watch_root, self._config)
count += 1
except Exception as exc:
logger.warning("[watcher] scan error for %s: %s", pdf_path, exc)
logger.info("[watcher] startup scan complete — processed %d file(s)", count)
async def stop(self) -> None:
if self._observer is not None:
self._observer.stop()
await asyncio.to_thread(self._observer.join)
self._observer = None
logger.info("[watcher] stopped")