chore: initial commit — existing single-user document scanner codebase

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
curo1305
2026-05-22 08:53:28 +02:00
parent 6fed5ba531
commit 7a34807fa0
71 changed files with 16408 additions and 0 deletions
View File
+59
View File
@@ -0,0 +1,59 @@
"""
Classification orchestrator.
Loads settings, selects AI provider, classifies document, auto-creates suggested topics.
"""
from services import storage
from ai import get_provider
MAX_AI_CHARS = 8_000
async def classify_document(doc_id: str, topic_names: list[str] | None = None) -> list[str]:
"""
Classify a document by its ID. Returns the list of assigned topic names.
If topic_names is provided, restrict classification to those topics.
Auto-creates any newly suggested topics.
"""
meta = storage.get_metadata(doc_id)
if meta is None:
raise ValueError(f"Document {doc_id} not found")
settings = storage.load_settings()
system_prompt = settings.get("system_prompt", "")
provider = get_provider(settings)
# Use all known topics if not specified
if topic_names is None:
all_topics = storage.load_topics()
topic_names = [t["name"] for t in all_topics]
text = meta.get("extracted_text", "")
result = await provider.classify(text[:MAX_AI_CHARS], topic_names, system_prompt)
# Collect all topic names to persist (assigned + suggested)
all_new_names = set(result.suggested_new_topics) | set(result.topics)
# Auto-create any topic not already in the registry
existing_names = {t.lower() for t in topic_names}
for name in all_new_names:
if name.strip() and name.lower() not in existing_names:
storage.create_topic(name.strip())
# Final list: everything the AI assigned or suggested
final_topics = [t for t in list(set(result.topics + result.suggested_new_topics)) if t.strip()]
storage.update_document_topics(doc_id, final_topics)
return final_topics
async def suggest_topics_for_document(doc_id: str) -> list[str]:
"""Return AI-suggested topic names without modifying the document."""
meta = storage.get_metadata(doc_id)
if meta is None:
raise ValueError(f"Document {doc_id} not found")
settings = storage.load_settings()
system_prompt = settings.get("system_prompt", "")
provider = get_provider(settings)
text = meta.get("extracted_text", "")
return await provider.suggest_topics(text[:MAX_AI_CHARS], system_prompt)
+71
View File
@@ -0,0 +1,71 @@
"""
Text extraction dispatcher.
Supports: PDF (PyMuPDF), DOCX (python-docx), plain text, images (pytesseract).
"""
from pathlib import Path
MAX_STORED_CHARS = 50_000
def extract_text(file_path: str, mime_type: str) -> str:
path = Path(file_path)
try:
if mime_type == "application/pdf" or path.suffix.lower() == ".pdf":
return _extract_pdf(path)
elif mime_type in (
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"application/msword",
) or path.suffix.lower() in (".docx", ".doc"):
return _extract_docx(path)
elif mime_type and mime_type.startswith("image/"):
return _extract_image(path)
else:
return _extract_text_file(path)
except Exception as e:
return f"[Extraction error: {e}]"
def _extract_pdf(path: Path) -> str:
import fitz # PyMuPDF
doc = fitz.open(str(path))
pages = []
for page in doc:
pages.append(page.get_text())
doc.close()
return _truncate("\n".join(pages))
def _extract_docx(path: Path) -> str:
from docx import Document
doc = Document(str(path))
paragraphs = [p.text for p in doc.paragraphs if p.text.strip()]
return _truncate("\n".join(paragraphs))
def _extract_image(path: Path) -> str:
try:
from PIL import Image
import pytesseract
img = Image.open(str(path))
text = pytesseract.image_to_string(img)
return _truncate(text)
except ImportError:
return "[OCR unavailable: pytesseract or Pillow not installed]"
except Exception as e:
return f"[OCR error: {e}]"
def _extract_text_file(path: Path) -> str:
for enc in ("utf-8", "latin-1", "cp1252"):
try:
return _truncate(path.read_text(encoding=enc))
except UnicodeDecodeError:
continue
return "[Could not decode text file]"
def _truncate(text: str) -> str:
text = text.strip()
if len(text) > MAX_STORED_CHARS:
text = text[:MAX_STORED_CHARS] + "\n[... truncated ...]"
return text
+187
View File
@@ -0,0 +1,187 @@
import json
import uuid
import shutil
from datetime import datetime, timezone
from pathlib import Path
from filelock import FileLock
from config import UPLOADS_DIR, METADATA_DIR, TOPICS_FILE, SETTINGS_FILE, DEFAULT_SETTINGS
# ── File locks ────────────────────────────────────────────────────────────────
_topics_lock = FileLock(str(TOPICS_FILE) + ".lock")
_settings_lock = FileLock(str(SETTINGS_FILE) + ".lock")
# ── Documents ─────────────────────────────────────────────────────────────────
def save_upload(file_bytes: bytes, original_name: str, mime_type: str) -> dict:
doc_id = str(uuid.uuid4())
suffix = Path(original_name).suffix.lower()
filename = f"{doc_id}{suffix}"
dest = UPLOADS_DIR / filename
dest.write_bytes(file_bytes)
return {"id": doc_id, "filename": filename, "path": str(dest)}
def save_metadata(meta: dict) -> None:
path = METADATA_DIR / f"{meta['id']}.json"
lock = FileLock(str(path) + ".lock")
with lock:
path.write_text(json.dumps(meta, indent=2, ensure_ascii=False))
def get_metadata(doc_id: str) -> dict | None:
path = METADATA_DIR / f"{doc_id}.json"
if not path.exists():
return None
return json.loads(path.read_text())
def list_metadata(topic: str | None = None) -> list[dict]:
docs = []
for p in sorted(METADATA_DIR.glob("*.json"), key=lambda x: x.stat().st_mtime, reverse=True):
try:
meta = json.loads(p.read_text())
except Exception:
continue
if topic and topic not in meta.get("topics", []):
continue
docs.append(meta)
return docs
def delete_document(doc_id: str) -> bool:
meta_path = METADATA_DIR / f"{doc_id}.json"
if not meta_path.exists():
return False
meta = json.loads(meta_path.read_text())
upload_path = UPLOADS_DIR / meta.get("filename", "")
if upload_path.exists():
upload_path.unlink()
meta_path.unlink()
lock_path = Path(str(meta_path) + ".lock")
if lock_path.exists():
lock_path.unlink()
return True
def update_document_topics(doc_id: str, topics: list[str]) -> dict | None:
meta = get_metadata(doc_id)
if meta is None:
return None
meta["topics"] = topics
meta["classified_at"] = datetime.now(timezone.utc).isoformat()
save_metadata(meta)
return meta
def remove_topic_from_all_documents(topic_name: str) -> int:
"""Remove a topic name from all documents. Returns number of docs updated."""
count = 0
for p in METADATA_DIR.glob("*.json"):
try:
meta = json.loads(p.read_text())
except Exception:
continue
if topic_name in meta.get("topics", []):
meta["topics"] = [t for t in meta["topics"] if t != topic_name]
lock = FileLock(str(p) + ".lock")
with lock:
p.write_text(json.dumps(meta, indent=2, ensure_ascii=False))
count += 1
return count
# ── Topics ────────────────────────────────────────────────────────────────────
def load_topics() -> list[dict]:
with _topics_lock:
data = json.loads(TOPICS_FILE.read_text())
return data.get("topics", [])
def save_topics(topics: list[dict]) -> None:
with _topics_lock:
TOPICS_FILE.write_text(json.dumps({"topics": topics}, indent=2))
def get_topic(topic_id: str) -> dict | None:
return next((t for t in load_topics() if t["id"] == topic_id), None)
def create_topic(name: str, description: str = "", color: str = "#6366f1") -> dict:
topics = load_topics()
# Deduplicate by name (case-insensitive)
if any(t["name"].lower() == name.lower() for t in topics):
return next(t for t in topics if t["name"].lower() == name.lower())
topic = {
"id": str(uuid.uuid4())[:8],
"name": name,
"description": description,
"color": color,
}
topics.append(topic)
save_topics(topics)
return topic
def update_topic(topic_id: str, **kwargs) -> dict | None:
topics = load_topics()
for t in topics:
if t["id"] == topic_id:
t.update({k: v for k, v in kwargs.items() if v is not None})
save_topics(topics)
return t
return None
def delete_topic(topic_id: str) -> str | None:
topics = load_topics()
topic = next((t for t in topics if t["id"] == topic_id), None)
if not topic:
return None
name = topic["name"]
save_topics([t for t in topics if t["id"] != topic_id])
remove_topic_from_all_documents(name)
return name
def topic_doc_counts() -> dict[str, int]:
counts: dict[str, int] = {}
for p in METADATA_DIR.glob("*.json"):
try:
meta = json.loads(p.read_text())
except Exception:
continue
for t in meta.get("topics", []):
counts[t] = counts.get(t, 0) + 1
return counts
# ── Settings ──────────────────────────────────────────────────────────────────
def load_settings() -> dict:
with _settings_lock:
return json.loads(SETTINGS_FILE.read_text())
def save_settings(settings: dict) -> None:
with _settings_lock:
SETTINGS_FILE.write_text(json.dumps(settings, indent=2))
def mask_api_key(key: str) -> str:
if not key or len(key) <= 4:
return "****"
return "****" + key[-4:]
def settings_masked(settings: dict) -> dict:
import copy
s = copy.deepcopy(settings)
for prov in ("anthropic", "openai"):
key = s.get("providers", {}).get(prov, {}).get("api_key", "")
if key:
s["providers"][prov]["api_key"] = mask_api_key(key)
return s