""" Classification orchestrator. Loads settings, selects AI provider, classifies document, auto-creates suggested topics. Updated in Plan 05: classify_document and suggest_topics_for_document now accept an AsyncSession as their first argument so they can be called from the Celery task wrapper and from API route handlers that already hold a session. Updated in Plan 03-03: classify_document uses load_topics_for_user (D-17) to scope topic lookup to the document owner's namespace, and creates AI-suggested topics in the user's namespace via create_topic(user_id=doc.user_id) (D-11). """ from __future__ import annotations import uuid as _uuid from sqlalchemy.ext.asyncio import AsyncSession from db.models import Document from services import storage from ai import get_provider MAX_AI_CHARS = 8_000 async def classify_document( session: AsyncSession, doc_id: str, topic_names: list[str] | None = None, ) -> list[str]: """ Classify a document by its ID. Returns the list of assigned topic names. If topic_names is provided, restrict classification to those topics. Auto-creates any newly suggested topics in the document owner's namespace (D-11). """ meta = await storage.get_metadata(session, doc_id) if meta is None: raise ValueError(f"Document {doc_id} not found") settings = storage.load_settings() system_prompt = settings.get("system_prompt", "") provider = get_provider(settings) # Load the Document ORM object to get the owner's user_id (D-11, D-17) try: uid = _uuid.UUID(doc_id) except ValueError: uid = None doc = await session.get(Document, uid) if uid is not None else None doc_user_id = doc.user_id if doc is not None else None # Use namespace-scoped topic list if not specified (D-17) if topic_names is None: if doc_user_id is not None: all_topics = await storage.load_topics_for_user(session, user_id=doc_user_id) else: # Fallback for documents without a user (legacy / test data) all_topics = await storage.load_topics(session) topic_names = [t["name"] for t in all_topics] text = meta.get("extracted_text", "") result = await provider.classify(text[:MAX_AI_CHARS], topic_names, system_prompt) # Collect all topic names to persist (assigned + suggested) all_new_names = set(result.suggested_new_topics) | set(result.topics) # Auto-create any topic not already in the registry — in the user's namespace (D-11) existing_names = {t.lower() for t in topic_names} for name in all_new_names: if name.strip() and name.lower() not in existing_names: await storage.create_topic(session, name.strip(), user_id=doc_user_id) # Final list: everything the AI assigned or suggested final_topics = [t for t in list(set(result.topics + result.suggested_new_topics)) if t.strip()] await storage.update_document_topics(session, doc_id, final_topics) return final_topics async def suggest_topics_for_document(session: AsyncSession, doc_id: str) -> list[str]: """Return AI-suggested topic names without modifying the document.""" meta = await storage.get_metadata(session, doc_id) if meta is None: raise ValueError(f"Document {doc_id} not found") settings = storage.load_settings() system_prompt = settings.get("system_prompt", "") provider = get_provider(settings) text = meta.get("extracted_text", "") return await provider.suggest_topics(text[:MAX_AI_CHARS], system_prompt)