18a638bc3a
- Fix: list_plugins imported _REGISTRY as a direct reference to the empty list that existed at import time; register_services() replaces _REGISTRY with a new list so the imported reference was always []. Added get_registry() helper so callers access the live list via the module namespace. GET /api/plugins now correctly returns accessible plugins for the current user. - Fix: switch watchdog from InotifyObserver to PollingObserver. Inotify events from the macOS host are not forwarded through the Docker bind mount, so new files were only detected via the startup scan. PollingObserver (1s default interval) works reliably on all platforms including macOS+Docker bind mounts. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
257 lines
9.5 KiB
Python
257 lines
9.5 KiB
Python
"""
|
|
File-system watcher for the watch directory.
|
|
|
|
Uses the watchdog library to monitor a configured directory for new PDF files.
|
|
When a PDF is detected, it is automatically ingested into the document service
|
|
(copied to /data/documents, a DB record is created, and the AI pipeline runs).
|
|
|
|
Key design decisions:
|
|
- No-remove policy: on_deleted and on_moved events are intentionally ignored.
|
|
The watcher never deletes, moves, or modifies files on the watched volume.
|
|
- Watch documents use user_id="watch" as a sentinel so they are visible to
|
|
all authenticated users in the document list.
|
|
- Subfolder names map to categories: a file at invoices/bill.pdf is assigned
|
|
to a "invoices" category (auto-created if needed).
|
|
- Suggestions: if ai_folder_suggestion or ai_rename_suggestion are enabled,
|
|
the relevant fields are set on the document after AI processing so users
|
|
can confirm/reject from the UI.
|
|
- Thread → async bridge: watchdog runs in a daemon thread; asyncio coroutines
|
|
are dispatched from that thread via run_coroutine_threadsafe.
|
|
"""
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import uuid
|
|
from pathlib import Path
|
|
|
|
from watchdog.events import FileSystemEventHandler
|
|
from watchdog.observers.polling import PollingObserver
|
|
|
|
from app.database import AsyncSessionLocal
|
|
from app.models.category import DocumentCategory
|
|
from app.models.category_assignment import CategoryAssignment
|
|
from app.models.document import Document
|
|
from app.services.storage import save_upload
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Must match _WATCH_USER_ID in app/routers/documents.py
|
|
WATCH_USER_ID = "watch"
|
|
|
|
|
|
# ── Ingestion logic ───────────────────────────────────────────────────────────
|
|
|
|
|
|
async def ingest_file(path_str: str, watch_root: Path, config: dict) -> None:
|
|
"""
|
|
Ingest a single PDF file from the watch directory.
|
|
|
|
Idempotent: skips files that already have a non-failed document record.
|
|
"""
|
|
from sqlalchemy import select
|
|
|
|
path = Path(path_str)
|
|
if not path.exists() or not path.is_file():
|
|
return
|
|
|
|
async with AsyncSessionLocal() as db:
|
|
# Idempotency check — skip if already tracked (and not failed)
|
|
existing_result = await db.execute(
|
|
select(Document).where(Document.watch_path == path_str)
|
|
)
|
|
existing = existing_result.scalar_one_or_none()
|
|
if existing is not None and existing.status != "failed":
|
|
return
|
|
|
|
# Determine category from the first subfolder component
|
|
try:
|
|
rel = path.relative_to(watch_root)
|
|
folder_name = rel.parts[0] if len(rel.parts) > 1 else None
|
|
except ValueError:
|
|
folder_name = None
|
|
|
|
# Read file bytes
|
|
try:
|
|
file_data = path.read_bytes()
|
|
except OSError as exc:
|
|
logger.warning("[watcher] Cannot read %s: %s", path_str, exc)
|
|
return
|
|
|
|
# Save a copy to /data/documents/watch/{doc_id}.pdf
|
|
doc_id = existing.id if existing is not None else str(uuid.uuid4())
|
|
dest = await save_upload(file_data, WATCH_USER_ID, doc_id)
|
|
|
|
if existing is not None:
|
|
# Re-ingest a previously failed document
|
|
existing.file_path = str(dest)
|
|
existing.file_size = len(file_data)
|
|
existing.status = "pending"
|
|
existing.error_message = None
|
|
await db.commit()
|
|
else:
|
|
doc = Document(
|
|
id=doc_id,
|
|
user_id=WATCH_USER_ID,
|
|
source="watch",
|
|
watch_path=path_str,
|
|
filename=path.name,
|
|
file_path=str(dest),
|
|
file_size=len(file_data),
|
|
status="pending",
|
|
)
|
|
db.add(doc)
|
|
await db.commit()
|
|
|
|
# Auto-assign category from subfolder name
|
|
if folder_name:
|
|
cat_result = await db.execute(
|
|
select(DocumentCategory).where(
|
|
DocumentCategory.user_id == WATCH_USER_ID,
|
|
DocumentCategory.name == folder_name,
|
|
)
|
|
)
|
|
cat = cat_result.scalar_one_or_none()
|
|
if cat is None:
|
|
cat = DocumentCategory(user_id=WATCH_USER_ID, name=folder_name[:128])
|
|
db.add(cat)
|
|
await db.commit()
|
|
await db.refresh(cat)
|
|
|
|
exists_assign = await db.execute(
|
|
select(CategoryAssignment).where(
|
|
CategoryAssignment.document_id == doc_id,
|
|
CategoryAssignment.category_id == cat.id,
|
|
)
|
|
)
|
|
if exists_assign.scalar_one_or_none() is None:
|
|
db.add(CategoryAssignment(document_id=doc_id, category_id=cat.id))
|
|
await db.commit()
|
|
|
|
# Run AI pipeline (opens its own session internally)
|
|
from app.routers.documents import process_document
|
|
await process_document(doc_id)
|
|
|
|
# Set AI suggestions if enabled
|
|
if config.get("ai_folder_suggestion") or config.get("ai_rename_suggestion"):
|
|
await _apply_suggestions(doc_id, config)
|
|
|
|
|
|
async def _apply_suggestions(doc_id: str, config: dict) -> None:
|
|
"""Populate suggested_folder / suggested_filename after AI processing."""
|
|
from sqlalchemy import select
|
|
|
|
async with AsyncSessionLocal() as db:
|
|
result = await db.execute(select(Document).where(Document.id == doc_id))
|
|
doc = result.scalar_one_or_none()
|
|
if doc is None or doc.status != "done" or not doc.extracted_data:
|
|
return
|
|
|
|
try:
|
|
extracted = json.loads(doc.extracted_data)
|
|
except Exception:
|
|
return
|
|
|
|
changed = False
|
|
if config.get("ai_folder_suggestion"):
|
|
suggestions = extracted.get("suggested_categories", [])
|
|
if suggestions:
|
|
doc.suggested_folder = str(suggestions[0])[:128]
|
|
changed = True
|
|
|
|
if config.get("ai_rename_suggestion"):
|
|
title = extracted.get("title")
|
|
if title:
|
|
doc.suggested_filename = str(title)[:500]
|
|
changed = True
|
|
|
|
if changed:
|
|
await db.commit()
|
|
|
|
|
|
# ── Watchdog event handler ────────────────────────────────────────────────────
|
|
|
|
|
|
class _PdfEventHandler(FileSystemEventHandler):
|
|
def __init__(
|
|
self,
|
|
watch_root: Path,
|
|
loop: asyncio.AbstractEventLoop,
|
|
config: dict,
|
|
) -> None:
|
|
super().__init__()
|
|
self._watch_root = watch_root
|
|
self._loop = loop
|
|
self._config = config
|
|
|
|
def _dispatch_ingest(self, path_str: str) -> None:
|
|
if path_str.lower().endswith(".pdf"):
|
|
asyncio.run_coroutine_threadsafe(
|
|
ingest_file(path_str, self._watch_root, self._config),
|
|
self._loop,
|
|
)
|
|
|
|
def on_created(self, event): # type: ignore[override]
|
|
if not event.is_directory:
|
|
self._dispatch_ingest(event.src_path)
|
|
|
|
def on_moved(self, event): # type: ignore[override]
|
|
# Handles atomic rename/move (e.g. Nextcloud or Syncthing completing a sync)
|
|
if not event.is_directory:
|
|
self._dispatch_ingest(event.dest_path)
|
|
|
|
# on_deleted / on_modified: intentionally not overridden — no-remove policy
|
|
|
|
|
|
# ── Service ───────────────────────────────────────────────────────────────────
|
|
|
|
|
|
class FileWatcherService:
|
|
"""Manages the watchdog Observer lifecycle within the FastAPI lifespan."""
|
|
|
|
def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
|
|
self._loop = loop
|
|
self._observer: Observer | None = None
|
|
self._watch_root: Path | None = None
|
|
self._config: dict = {}
|
|
|
|
async def start(self, watch_path: str, config: dict) -> None:
|
|
self._watch_root = Path(watch_path)
|
|
self._config = config
|
|
|
|
if not self._watch_root.exists():
|
|
logger.warning(
|
|
"[watcher] Watch path %s does not exist — file watching disabled",
|
|
watch_path,
|
|
)
|
|
return
|
|
|
|
handler = _PdfEventHandler(self._watch_root, self._loop, config)
|
|
self._observer = PollingObserver()
|
|
self._observer.schedule(handler, watch_path, recursive=True)
|
|
self._observer.start()
|
|
logger.info("[watcher] started, watching %s", watch_path)
|
|
|
|
# Run startup scan as a background task so startup is not blocked
|
|
asyncio.create_task(self._scan_existing())
|
|
|
|
async def _scan_existing(self) -> None:
|
|
"""Ingest any PDFs already present in the watch directory."""
|
|
if self._watch_root is None:
|
|
return
|
|
logger.info("[watcher] scanning existing files in %s", self._watch_root)
|
|
count = 0
|
|
for pdf_path in sorted(self._watch_root.rglob("*.pdf")):
|
|
try:
|
|
await ingest_file(str(pdf_path), self._watch_root, self._config)
|
|
count += 1
|
|
except Exception as exc:
|
|
logger.warning("[watcher] scan error for %s: %s", pdf_path, exc)
|
|
logger.info("[watcher] startup scan complete — processed %d file(s)", count)
|
|
|
|
async def stop(self) -> None:
|
|
if self._observer is not None:
|
|
self._observer.stop()
|
|
await asyncio.to_thread(self._observer.join)
|
|
self._observer = None
|
|
logger.info("[watcher] stopped")
|