""" File-system watcher for the watch directory. Uses the watchdog library to monitor a configured directory for new PDF files. When a PDF is detected, it is automatically ingested into the document service (copied to /data/documents, a DB record is created, and the AI pipeline runs). Key design decisions: - No-remove policy: on_deleted and on_moved events are intentionally ignored. The watcher never deletes, moves, or modifies files on the watched volume. - Watch documents use user_id="watch" as a sentinel so they are visible to all authenticated users in the document list. - Subfolder names map to categories: a file at invoices/bill.pdf is assigned to a "invoices" category (auto-created if needed). - Suggestions: if ai_folder_suggestion or ai_rename_suggestion are enabled, the relevant fields are set on the document after AI processing so users can confirm/reject from the UI. - Thread → async bridge: watchdog runs in a daemon thread; asyncio coroutines are dispatched from that thread via run_coroutine_threadsafe. """ import asyncio import json import logging import uuid from pathlib import Path from watchdog.events import FileSystemEventHandler from watchdog.observers import Observer from app.database import AsyncSessionLocal from app.models.category import DocumentCategory from app.models.category_assignment import CategoryAssignment from app.models.document import Document from app.services.storage import save_upload logger = logging.getLogger(__name__) # Must match _WATCH_USER_ID in app/routers/documents.py WATCH_USER_ID = "watch" # ── Ingestion logic ─────────────────────────────────────────────────────────── async def ingest_file(path_str: str, watch_root: Path, config: dict) -> None: """ Ingest a single PDF file from the watch directory. Idempotent: skips files that already have a non-failed document record. """ from sqlalchemy import select path = Path(path_str) if not path.exists() or not path.is_file(): return async with AsyncSessionLocal() as db: # Idempotency check — skip if already tracked (and not failed) existing_result = await db.execute( select(Document).where(Document.watch_path == path_str) ) existing = existing_result.scalar_one_or_none() if existing is not None and existing.status != "failed": return # Determine category from the first subfolder component try: rel = path.relative_to(watch_root) folder_name = rel.parts[0] if len(rel.parts) > 1 else None except ValueError: folder_name = None # Read file bytes try: file_data = path.read_bytes() except OSError as exc: logger.warning("[watcher] Cannot read %s: %s", path_str, exc) return # Save a copy to /data/documents/watch/{doc_id}.pdf doc_id = existing.id if existing is not None else str(uuid.uuid4()) dest = await save_upload(file_data, WATCH_USER_ID, doc_id) if existing is not None: # Re-ingest a previously failed document existing.file_path = str(dest) existing.file_size = len(file_data) existing.status = "pending" existing.error_message = None await db.commit() else: doc = Document( id=doc_id, user_id=WATCH_USER_ID, source="watch", watch_path=path_str, filename=path.name, file_path=str(dest), file_size=len(file_data), status="pending", ) db.add(doc) await db.commit() # Auto-assign category from subfolder name if folder_name: cat_result = await db.execute( select(DocumentCategory).where( DocumentCategory.user_id == WATCH_USER_ID, DocumentCategory.name == folder_name, ) ) cat = cat_result.scalar_one_or_none() if cat is None: cat = DocumentCategory(user_id=WATCH_USER_ID, name=folder_name[:128]) db.add(cat) await db.commit() await db.refresh(cat) exists_assign = await db.execute( select(CategoryAssignment).where( CategoryAssignment.document_id == doc_id, CategoryAssignment.category_id == cat.id, ) ) if exists_assign.scalar_one_or_none() is None: db.add(CategoryAssignment(document_id=doc_id, category_id=cat.id)) await db.commit() # Run AI pipeline (opens its own session internally) from app.routers.documents import process_document await process_document(doc_id) # Set AI suggestions if enabled if config.get("ai_folder_suggestion") or config.get("ai_rename_suggestion"): await _apply_suggestions(doc_id, config) async def _apply_suggestions(doc_id: str, config: dict) -> None: """Populate suggested_folder / suggested_filename after AI processing.""" from sqlalchemy import select async with AsyncSessionLocal() as db: result = await db.execute(select(Document).where(Document.id == doc_id)) doc = result.scalar_one_or_none() if doc is None or doc.status != "done" or not doc.extracted_data: return try: extracted = json.loads(doc.extracted_data) except Exception: return changed = False if config.get("ai_folder_suggestion"): suggestions = extracted.get("suggested_categories", []) if suggestions: doc.suggested_folder = str(suggestions[0])[:128] changed = True if config.get("ai_rename_suggestion"): title = extracted.get("title") if title: doc.suggested_filename = str(title)[:500] changed = True if changed: await db.commit() # ── Watchdog event handler ──────────────────────────────────────────────────── class _PdfEventHandler(FileSystemEventHandler): def __init__( self, watch_root: Path, loop: asyncio.AbstractEventLoop, config: dict, ) -> None: super().__init__() self._watch_root = watch_root self._loop = loop self._config = config def _dispatch_ingest(self, path_str: str) -> None: if path_str.lower().endswith(".pdf"): asyncio.run_coroutine_threadsafe( ingest_file(path_str, self._watch_root, self._config), self._loop, ) def on_created(self, event): # type: ignore[override] if not event.is_directory: self._dispatch_ingest(event.src_path) def on_moved(self, event): # type: ignore[override] # Handles atomic rename/move (e.g. Nextcloud or Syncthing completing a sync) if not event.is_directory: self._dispatch_ingest(event.dest_path) # on_deleted / on_modified: intentionally not overridden — no-remove policy # ── Service ─────────────────────────────────────────────────────────────────── class FileWatcherService: """Manages the watchdog Observer lifecycle within the FastAPI lifespan.""" def __init__(self, loop: asyncio.AbstractEventLoop) -> None: self._loop = loop self._observer: Observer | None = None self._watch_root: Path | None = None self._config: dict = {} async def start(self, watch_path: str, config: dict) -> None: self._watch_root = Path(watch_path) self._config = config if not self._watch_root.exists(): logger.warning( "[watcher] Watch path %s does not exist — file watching disabled", watch_path, ) return handler = _PdfEventHandler(self._watch_root, self._loop, config) self._observer = Observer() self._observer.schedule(handler, watch_path, recursive=True) self._observer.start() logger.info("[watcher] started, watching %s", watch_path) # Run startup scan as a background task so startup is not blocked asyncio.create_task(self._scan_existing()) async def _scan_existing(self) -> None: """Ingest any PDFs already present in the watch directory.""" if self._watch_root is None: return logger.info("[watcher] scanning existing files in %s", self._watch_root) count = 0 for pdf_path in sorted(self._watch_root.rglob("*.pdf")): try: await ingest_file(str(pdf_path), self._watch_root, self._config) count += 1 except Exception as exc: logger.warning("[watcher] scan error for %s: %s", pdf_path, exc) logger.info("[watcher] startup scan complete — processed %d file(s)", count) async def stop(self) -> None: if self._observer is not None: self._observer.stop() await asyncio.to_thread(self._observer.join) self._observer = None logger.info("[watcher] stopped")