""" File-system watcher for the watch directory. Uses the watchdog library to monitor a configured directory for new PDF files. When a PDF is detected, it is automatically ingested into the document service (uploaded to storage-service, a DB record is created, and the AI pipeline runs). Key design decisions: - No-remove policy: on_deleted and on_moved events are intentionally ignored. The watcher never deletes, moves, or modifies files on the watched volume. - Watch documents use user_id="watch" as a sentinel so they are visible to all authenticated users in the document list. - Subfolder names map to categories: a file at invoices/bill.pdf is assigned to an "Invoices" category (auto-created if needed; folder name is converted to PascalCase-with-dashes: "vendor-invoices" → "Vendor-Invoices"). - Suggestions: if ai_folder_suggestion or ai_rename_suggestion are enabled, the relevant fields are set on the document after AI processing so users can confirm/reject from the UI. - Thread → async bridge: watchdog runs in a daemon thread; asyncio coroutines are dispatched from that thread via run_coroutine_threadsafe. """ import asyncio import json import logging import re import uuid from pathlib import Path from watchdog.events import FileSystemEventHandler from watchdog.observers.polling import PollingObserver from app.database import AsyncSessionLocal from app.models.category import DocumentCategory from app.models.category_assignment import CategoryAssignment from app.models.document import Document from app.services.storage import save_upload logger = logging.getLogger(__name__) # Must match _WATCH_USER_ID in app/routers/documents.py WATCH_USER_ID = "watch" # ── Ingestion logic ─────────────────────────────────────────────────────────── async def ingest_file(path_str: str, watch_root: Path, config: dict) -> None: """ Ingest a single PDF file from the watch directory. Idempotent: skips files that already have a non-failed document record. """ from sqlalchemy import select path = Path(path_str) if not path.exists() or not path.is_file(): return async with AsyncSessionLocal() as db: # Idempotency check — skip if already tracked (and not failed) existing_result = await db.execute( select(Document).where(Document.watch_path == path_str) ) existing = existing_result.scalar_one_or_none() if existing is not None and existing.status != "failed": return # Determine category from the first subfolder component try: rel = path.relative_to(watch_root) folder_name = ( "-".join(p.capitalize() for p in re.split(r"[-_\s]+", rel.parts[0]) if p) if len(rel.parts) > 1 else None ) except ValueError: folder_name = None # Read file bytes try: file_data = path.read_bytes() except OSError as exc: logger.warning("[watcher] Cannot read %s: %s", path_str, exc) return # Upload to storage-service under documents/watch/{doc_id}.pdf doc_id = existing.id if existing is not None else str(uuid.uuid4()) storage_key = await save_upload(file_data, WATCH_USER_ID, doc_id) if existing is not None: # Re-ingest a previously failed document existing.storage_key = storage_key existing.file_size = len(file_data) existing.status = "pending" existing.error_message = None await db.commit() else: doc = Document( id=doc_id, user_id=WATCH_USER_ID, source="watch", watch_path=path_str, filename=path.name, storage_key=storage_key, file_size=len(file_data), status="pending", ) db.add(doc) await db.commit() # Auto-assign category from subfolder name if folder_name: cat_result = await db.execute( select(DocumentCategory).where( DocumentCategory.user_id == WATCH_USER_ID, DocumentCategory.name == folder_name, ) ) cat = cat_result.scalar_one_or_none() if cat is None: cat = DocumentCategory(user_id=WATCH_USER_ID, name=folder_name[:128]) db.add(cat) await db.commit() await db.refresh(cat) exists_assign = await db.execute( select(CategoryAssignment).where( CategoryAssignment.document_id == doc_id, CategoryAssignment.category_id == cat.id, ) ) if exists_assign.scalar_one_or_none() is None: db.add(CategoryAssignment(document_id=doc_id, category_id=cat.id)) await db.commit() # Run AI pipeline (opens its own session internally) from app.routers.documents import process_document await process_document(doc_id) # Set AI suggestions if enabled if config.get("ai_folder_suggestion") or config.get("ai_rename_suggestion"): await _apply_suggestions(doc_id, config) async def _apply_suggestions(doc_id: str, config: dict) -> None: """Populate suggested_folder / suggested_filename after AI processing.""" from sqlalchemy import select async with AsyncSessionLocal() as db: result = await db.execute(select(Document).where(Document.id == doc_id)) doc = result.scalar_one_or_none() if doc is None or doc.status != "done" or not doc.extracted_data: return try: extracted = json.loads(doc.extracted_data) except Exception: return changed = False if config.get("ai_folder_suggestion"): suggestions = extracted.get("suggested_categories", []) if suggestions: doc.suggested_folder = str(suggestions[0])[:128] changed = True if config.get("ai_rename_suggestion"): title = extracted.get("title") if title: doc.suggested_filename = str(title)[:500] changed = True if changed: await db.commit() # ── Watchdog event handler ──────────────────────────────────────────────────── class _PdfEventHandler(FileSystemEventHandler): def __init__( self, watch_root: Path, loop: asyncio.AbstractEventLoop, config: dict, ) -> None: super().__init__() self._watch_root = watch_root self._loop = loop self._config = config def _dispatch_ingest(self, path_str: str) -> None: if path_str.lower().endswith(".pdf"): asyncio.run_coroutine_threadsafe( ingest_file(path_str, self._watch_root, self._config), self._loop, ) def on_created(self, event): # type: ignore[override] if not event.is_directory: self._dispatch_ingest(event.src_path) def on_moved(self, event): # type: ignore[override] # Handles atomic rename/move (e.g. Nextcloud or Syncthing completing a sync) if not event.is_directory: self._dispatch_ingest(event.dest_path) # on_deleted / on_modified: intentionally not overridden — no-remove policy # ── Service ─────────────────────────────────────────────────────────────────── class FileWatcherService: """Manages the watchdog Observer lifecycle within the FastAPI lifespan.""" def __init__(self, loop: asyncio.AbstractEventLoop) -> None: self._loop = loop self._observer: Observer | None = None self._watch_root: Path | None = None self._config: dict = {} async def start(self, watch_path: str, config: dict) -> None: self._watch_root = Path(watch_path) self._config = config if not self._watch_root.exists(): logger.warning( "[watcher] Watch path %s does not exist — file watching disabled", watch_path, ) return handler = _PdfEventHandler(self._watch_root, self._loop, config) self._observer = PollingObserver() self._observer.schedule(handler, watch_path, recursive=True) self._observer.start() logger.info("[watcher] started, watching %s", watch_path) # Run startup scan as a background task so startup is not blocked asyncio.create_task(self._scan_existing()) async def _scan_existing(self) -> None: """Ingest any PDFs already present in the watch directory.""" if self._watch_root is None: return logger.info("[watcher] scanning existing files in %s", self._watch_root) count = 0 for pdf_path in sorted(self._watch_root.rglob("*.pdf")): try: await ingest_file(str(pdf_path), self._watch_root, self._config) count += 1 except Exception as exc: logger.warning("[watcher] scan error for %s: %s", pdf_path, exc) logger.info("[watcher] startup scan complete — processed %d file(s)", count) async def stop(self) -> None: if self._observer is not None: self._observer.stop() await asyncio.to_thread(self._observer.join) self._observer = None logger.info("[watcher] stopped")