Files
curo1305 3a66aeeec5 fix: rename download_file import to storage_download to avoid shadow
The route handler async def download_file() shadowed the storage import
of the same name, causing the endpoint to call itself recursively.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-21 11:48:04 +02:00

859 lines
30 KiB
Python

import asyncio
import io
import json
import math
import uuid
from datetime import datetime, timezone
import pdfplumber
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query, UploadFile
from fastapi.responses import StreamingResponse
from sqlalchemy import func, or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.database import AsyncSessionLocal, get_db
from app.deps import get_user_admin_groups, get_user_groups, get_user_id, get_user_is_admin
from app.models.category import DocumentCategory
from app.models.category_assignment import CategoryAssignment
from app.models.document import Document
from app.models.document_share import DocumentShare
from app.schemas.document import (
DocumentOut,
DocumentPage,
DocumentStatusOut,
DocumentTypeUpdate,
TagsUpdate,
TitleUpdate,
)
from app.schemas.share import DocumentShareCreate, DocumentShareOut, SharedDocumentOut
from app.services.ai_client import AIServiceError, classify_document
from app.services.config_reader import load_doc_config
from app.services.storage import delete_file, download_file as storage_download, save_upload
router = APIRouter()
_DEFAULT_MAX_BYTES = 20 * 1024 * 1024
# Sentinel user_id used for watch-directory-ingested documents.
# These documents are visible to all authenticated users.
_WATCH_USER_ID = "watch"
# ── Helpers ───────────────────────────────────────────────────────────────────
async def _get_user_doc(doc_id: str, user_id: str, db: AsyncSession) -> Document:
"""Fetch a document owned by user_id OR a watch-ingested document (visible to all)."""
result = await db.execute(
select(Document)
.where(
Document.id == doc_id,
or_(Document.user_id == user_id, Document.user_id == _WATCH_USER_ID),
)
.options(
selectinload(Document.category_assignments)
.selectinload(CategoryAssignment.category)
)
)
doc = result.scalar_one_or_none()
if doc is None:
raise HTTPException(status_code=404, detail="Document not found")
return doc
async def _get_share_counts(doc_ids: list[str], db: AsyncSession) -> dict[str, int]:
"""Return a mapping of doc_id → share count for the given document IDs."""
if not doc_ids:
return {}
rows = await db.execute(
select(DocumentShare.document_id, func.count(DocumentShare.id))
.where(DocumentShare.document_id.in_(doc_ids))
.group_by(DocumentShare.document_id)
)
return {row[0]: row[1] for row in rows.all()}
async def _get_deletable_doc_ids(
doc_ids: list[str], user_groups: list[str], db: AsyncSession
) -> set[str]:
"""Return doc IDs for which the user has delete permission via a group share."""
if not doc_ids or not user_groups:
return set()
rows = await db.execute(
select(DocumentShare.document_id)
.where(
DocumentShare.document_id.in_(doc_ids),
DocumentShare.group_id.in_(user_groups),
DocumentShare.can_delete.is_(True),
)
)
return {row[0] for row in rows.all()}
def _doc_with_categories(
doc: Document, share_count: int = 0, viewer_can_delete: bool = False
) -> DocumentOut:
from app.schemas.document import CategoryOut
cats = [CategoryOut(id=a.category.id, name=a.category.name) for a in doc.category_assignments]
return DocumentOut(
id=doc.id,
user_id=doc.user_id,
filename=doc.filename,
title=doc.title,
file_size=doc.file_size,
status=doc.status,
document_type=doc.document_type,
extracted_data=doc.extracted_data,
tags=doc.tags,
error_message=doc.error_message,
created_at=doc.created_at,
processed_at=doc.processed_at,
categories=cats,
source=doc.source,
watch_path=doc.watch_path,
suggested_folder=doc.suggested_folder,
suggested_filename=doc.suggested_filename,
share_count=share_count,
viewer_can_delete=viewer_can_delete,
)
def _extract_pdf_text(pdf_bytes: bytes) -> str:
"""Synchronous — must be called via asyncio.to_thread."""
text_parts = []
with pdfplumber.open(io.BytesIO(pdf_bytes)) as pdf:
for page in pdf.pages:
page_text = page.extract_text()
if page_text:
text_parts.append(page_text)
return "\n".join(text_parts)
# ── Background processing ─────────────────────────────────────────────────────
async def process_document(doc_id: str) -> None:
"""
Runs after the upload response is sent.
Opens its own DB session — never use the request's Depends session here.
Loads AI config fresh from the config file so settings changes apply without restart.
"""
async with AsyncSessionLocal() as db:
doc = await db.get(Document, doc_id)
if doc is None:
return
doc.status = "processing"
await db.commit()
try:
pdf_bytes = await storage_download(doc.storage_key)
text = await asyncio.to_thread(_extract_pdf_text, pdf_bytes)
result = await classify_document(text)
doc.raw_text = text[:500_000] # cap stored text at 500k chars
doc.extracted_data = json.dumps(result)
doc.title = result.get("title") or None
doc.document_type = result.get("document_type", "unknown")
doc.tags = json.dumps(result.get("tags", []))
doc.status = "done"
doc.processed_at = datetime.now(timezone.utc)
except Exception as exc:
doc.status = "failed"
doc.error_message = str(exc)[:500]
await db.commit()
# ── Routes ────────────────────────────────────────────────────────────────────
@router.post("/upload", response_model=DocumentOut, status_code=202)
async def upload_document(
file: UploadFile,
background_tasks: BackgroundTasks,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db),
) -> DocumentOut:
if file.content_type not in ("application/pdf", "application/octet-stream"):
if not (file.filename or "").lower().endswith(".pdf"):
raise HTTPException(status_code=415, detail="Only PDF files are accepted")
config = await load_doc_config()
max_bytes = config.get("documents", {}).get("max_pdf_bytes", _DEFAULT_MAX_BYTES)
file_data = await file.read()
if len(file_data) > max_bytes:
raise HTTPException(
status_code=413,
detail=f"File exceeds maximum size of {max_bytes // (1024*1024)} MB",
)
doc_id = str(uuid.uuid4())
storage_key = await save_upload(file_data, user_id, doc_id)
doc = Document(
id=doc_id,
user_id=user_id,
filename=file.filename or "upload.pdf",
storage_key=storage_key,
file_size=len(file_data),
status="pending",
)
db.add(doc)
await db.commit()
background_tasks.add_task(process_document, doc_id)
# Re-query with selectinload so category_assignments is eagerly loaded.
doc = await _get_user_doc(doc_id, user_id, db)
return _doc_with_categories(doc)
_SORT_COLUMNS = {
"created_at": Document.created_at,
"processed_at": Document.processed_at,
"filename": Document.filename,
"title": Document.title,
"file_size": Document.file_size,
"status": Document.status,
"document_type": Document.document_type,
}
@router.get("", response_model=DocumentPage)
async def list_documents(
page: int = Query(default=1, ge=1),
per_page: int = Query(default=20, ge=1, le=100),
sort: str = Query(default="created_at"),
order: str = Query(default="desc", pattern="^(asc|desc)$"),
status: str | None = Query(default=None),
document_type: str | None = Query(default=None),
search: str | None = Query(default=None),
category_id: str | None = Query(default=None),
user_id: str = Depends(get_user_id),
user_groups: list[str] = Depends(get_user_groups),
is_admin: bool = Depends(get_user_is_admin),
db: AsyncSession = Depends(get_db),
) -> DocumentPage:
sort_col = _SORT_COLUMNS.get(sort, Document.created_at)
sort_expr = sort_col.desc() if order == "desc" else sort_col.asc()
# Watch-ingested documents (user_id = "watch") are visible to all users.
conditions = [or_(Document.user_id == user_id, Document.user_id == _WATCH_USER_ID)]
if status:
conditions.append(Document.status == status)
if document_type:
conditions.append(Document.document_type == document_type)
if search:
like = f"%{search}%"
conditions.append(
or_(
Document.title.ilike(like),
Document.filename.ilike(like),
Document.tags.ilike(like),
Document.document_type.ilike(like),
)
)
if category_id:
subq = select(CategoryAssignment.document_id).where(
CategoryAssignment.category_id == category_id
)
conditions.append(Document.id.in_(subq))
count_result = await db.execute(
select(func.count(Document.id)).where(*conditions)
)
total = count_result.scalar_one()
items_result = await db.execute(
select(Document)
.where(*conditions)
.options(
selectinload(Document.category_assignments)
.selectinload(CategoryAssignment.category)
)
.order_by(sort_expr)
.offset((page - 1) * per_page)
.limit(per_page)
)
docs = items_result.scalars().all()
doc_ids = [d.id for d in docs]
share_counts = await _get_share_counts(doc_ids, db)
if is_admin:
deletable_ids = set(doc_ids)
else:
deletable_ids = {d.id for d in docs if d.user_id == user_id}
deletable_ids |= await _get_deletable_doc_ids(doc_ids, user_groups, db)
items = [
_doc_with_categories(d, share_counts.get(d.id, 0), viewer_can_delete=d.id in deletable_ids)
for d in docs
]
return DocumentPage(
items=items,
total=total,
page=page,
pages=max(1, math.ceil(total / per_page)),
)
# NOTE: This route must be registered BEFORE /{doc_id} to avoid path collision.
@router.get("/shared-with-me", response_model=DocumentPage)
async def list_shared_with_me(
page: int = Query(default=1, ge=1),
per_page: int = Query(default=20, ge=1, le=100),
sort: str = Query(default="created_at"),
order: str = Query(default="desc", pattern="^(asc|desc)$"),
search: str | None = Query(default=None),
document_type: str | None = Query(default=None),
user_id: str = Depends(get_user_id),
user_groups: list[str] = Depends(get_user_groups),
db: AsyncSession = Depends(get_db),
) -> DocumentPage:
"""Return documents shared with the current user via any of their groups.
Excludes documents the user owns (those appear in their regular list).
"""
if not user_groups:
return DocumentPage(items=[], total=0, page=page, pages=1)
sort_col = _SORT_COLUMNS.get(sort, Document.created_at)
sort_expr = sort_col.desc() if order == "desc" else sort_col.asc()
shared_doc_ids_subq = (
select(DocumentShare.document_id)
.where(DocumentShare.group_id.in_(user_groups))
.scalar_subquery()
)
conditions = [
Document.id.in_(shared_doc_ids_subq),
Document.user_id != user_id, # exclude own docs
]
if document_type:
conditions.append(Document.document_type == document_type)
if search:
like = f"%{search}%"
conditions.append(
or_(
Document.title.ilike(like),
Document.filename.ilike(like),
Document.tags.ilike(like),
Document.document_type.ilike(like),
)
)
count_result = await db.execute(
select(func.count(Document.id)).where(*conditions)
)
total = count_result.scalar_one()
items_result = await db.execute(
select(Document)
.where(*conditions)
.options(
selectinload(Document.category_assignments)
.selectinload(CategoryAssignment.category)
)
.order_by(sort_expr)
.offset((page - 1) * per_page)
.limit(per_page)
)
docs = items_result.scalars().all()
# For each doc, find which share (group) brought it in (pick first match)
share_rows_result = await db.execute(
select(DocumentShare)
.where(
DocumentShare.document_id.in_([d.id for d in docs]),
DocumentShare.group_id.in_(user_groups),
)
)
share_rows = share_rows_result.scalars().all()
# Map doc_id → first share row found
share_map: dict[str, DocumentShare] = {}
for share in share_rows:
if share.document_id not in share_map:
share_map[share.document_id] = share
from app.schemas.document import CategoryOut
items: list[SharedDocumentOut] = []
for doc in docs:
cats = [CategoryOut(id=a.category.id, name=a.category.name) for a in doc.category_assignments]
share = share_map.get(doc.id)
items.append(
SharedDocumentOut(
id=doc.id,
user_id=doc.user_id,
filename=doc.filename,
title=doc.title,
file_size=doc.file_size,
status=doc.status,
document_type=doc.document_type,
extracted_data=doc.extracted_data,
tags=doc.tags,
error_message=doc.error_message,
created_at=doc.created_at,
processed_at=doc.processed_at,
categories=cats,
source=doc.source,
shared_by_user_id=share.shared_by_user_id if share else "",
shared_via_group_id=share.group_id if share else "",
viewer_can_delete=bool(share and share.can_delete),
)
)
return DocumentPage(
items=items, # type: ignore[arg-type]
total=total,
page=page,
pages=max(1, math.ceil(total / per_page)),
)
@router.get("/{doc_id}", response_model=DocumentOut)
async def get_document(
doc_id: str,
user_id: str = Depends(get_user_id),
user_groups: list[str] = Depends(get_user_groups),
is_admin: bool = Depends(get_user_is_admin),
db: AsyncSession = Depends(get_db),
) -> DocumentOut:
doc = await _get_user_doc(doc_id, user_id, db)
counts = await _get_share_counts([doc.id], db)
if is_admin:
viewer_can_delete = True
elif doc.user_id == user_id:
viewer_can_delete = True
else:
viewer_can_delete = bool(await _get_deletable_doc_ids([doc.id], user_groups, db))
return _doc_with_categories(doc, counts.get(doc.id, 0), viewer_can_delete=viewer_can_delete)
@router.get("/{doc_id}/status", response_model=DocumentStatusOut)
async def get_document_status(
doc_id: str,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db),
) -> Document:
result = await db.execute(
select(Document).where(
Document.id == doc_id,
or_(Document.user_id == user_id, Document.user_id == _WATCH_USER_ID),
)
)
doc = result.scalar_one_or_none()
if doc is None:
raise HTTPException(status_code=404, detail="Document not found")
return doc
@router.patch("/{doc_id}/type", response_model=DocumentOut)
async def update_document_type(
doc_id: str,
body: DocumentTypeUpdate,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db),
) -> DocumentOut:
doc = await _get_user_doc(doc_id, user_id, db)
doc.document_type = body.document_type
await db.commit()
await db.refresh(doc)
return _doc_with_categories(doc)
@router.patch("/{doc_id}/tags", response_model=DocumentOut)
async def update_document_tags(
doc_id: str,
body: TagsUpdate,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db),
) -> DocumentOut:
doc = await _get_user_doc(doc_id, user_id, db)
# Normalise: strip whitespace, drop empties, deduplicate while preserving order
seen: set[str] = set()
clean: list[str] = []
for t in body.tags:
t = t.strip()
if t and t.lower() not in seen:
seen.add(t.lower())
clean.append(t)
doc.tags = json.dumps(clean)
await db.commit()
doc = await _get_user_doc(doc_id, user_id, db)
return _doc_with_categories(doc)
@router.patch("/{doc_id}/title", response_model=DocumentOut)
async def update_document_title(
doc_id: str,
body: TitleUpdate,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db),
) -> DocumentOut:
doc = await _get_user_doc(doc_id, user_id, db)
doc.title = body.title.strip() or None
await db.commit()
doc = await _get_user_doc(doc_id, user_id, db)
return _doc_with_categories(doc)
@router.post("/{doc_id}/reprocess", response_model=DocumentOut)
async def reprocess_document(
doc_id: str,
background_tasks: BackgroundTasks,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db),
) -> DocumentOut:
doc = await _get_user_doc(doc_id, user_id, db)
if doc.status in ("pending", "processing"):
raise HTTPException(status_code=409, detail="Document is already being processed")
doc.status = "pending"
doc.error_message = None
await db.commit()
background_tasks.add_task(process_document, doc_id)
doc = await _get_user_doc(doc_id, user_id, db)
return _doc_with_categories(doc)
@router.delete("/{doc_id}", status_code=204)
async def delete_document(
doc_id: str,
user_id: str = Depends(get_user_id),
is_admin: bool = Depends(get_user_is_admin),
user_groups: list[str] = Depends(get_user_groups),
user_admin_groups: list[str] = Depends(get_user_admin_groups),
db: AsyncSession = Depends(get_db),
) -> None:
if is_admin:
# Admins can delete any document — fetch unconditionally.
result = await db.execute(select(Document).where(Document.id == doc_id))
else:
# Fetch the document (owner, watch, or shared with user's groups)
result = await db.execute(
select(Document).where(
Document.id == doc_id,
or_(
Document.user_id == user_id,
Document.user_id == _WATCH_USER_ID,
Document.id.in_(
select(DocumentShare.document_id).where(
DocumentShare.group_id.in_(user_groups)
)
) if user_groups else False,
),
)
)
doc = result.scalar_one_or_none()
if doc is None:
raise HTTPException(status_code=404, detail="Document not found")
is_owner = doc.user_id == user_id
if not is_owner and not is_admin:
# Check: shared with a group where the user has can_delete=True
can_delete_via_share = False
if user_groups:
share_result = await db.execute(
select(DocumentShare).where(
DocumentShare.document_id == doc_id,
DocumentShare.group_id.in_(user_groups),
DocumentShare.can_delete.is_(True),
)
)
can_delete_via_share = share_result.scalar_one_or_none() is not None
# Check: user is a group admin for a group the doc is shared with
can_delete_as_group_admin = False
if user_admin_groups:
admin_share_result = await db.execute(
select(DocumentShare).where(
DocumentShare.document_id == doc_id,
DocumentShare.group_id.in_(user_admin_groups),
)
)
can_delete_as_group_admin = admin_share_result.scalar_one_or_none() is not None
if not can_delete_via_share and not can_delete_as_group_admin:
raise HTTPException(status_code=403, detail="Not allowed to delete this document")
await delete_file(doc.storage_key)
await db.delete(doc)
await db.commit()
@router.get("/{doc_id}/file")
async def download_file(
doc_id: str,
user_id: str = Depends(get_user_id),
user_groups: list[str] = Depends(get_user_groups),
db: AsyncSession = Depends(get_db),
) -> StreamingResponse:
# Allow access if: owner, watch doc, or shared with any of user's groups
result = await db.execute(
select(Document).where(
Document.id == doc_id,
or_(
Document.user_id == user_id,
Document.user_id == _WATCH_USER_ID,
Document.id.in_(
select(DocumentShare.document_id).where(
DocumentShare.group_id.in_(user_groups)
)
) if user_groups else False,
),
)
)
doc = result.scalar_one_or_none()
if doc is None:
raise HTTPException(status_code=404, detail="Document not found")
try:
pdf_bytes = await storage_download(doc.storage_key)
except FileNotFoundError:
raise HTTPException(status_code=404, detail="File not found in storage")
return StreamingResponse(
iter([pdf_bytes]),
media_type="application/pdf",
headers={"Content-Disposition": f'inline; filename="{doc.filename}"'},
)
# ── Category assignment ───────────────────────────────────────────────────────
@router.post("/{doc_id}/categories/{cat_id}", status_code=204)
async def assign_category(
doc_id: str,
cat_id: str,
user_id: str = Depends(get_user_id),
user_groups: list[str] = Depends(get_user_groups),
db: AsyncSession = Depends(get_db),
) -> None:
doc_result = await db.execute(
select(Document).where(
Document.id == doc_id,
or_(Document.user_id == user_id, Document.user_id == _WATCH_USER_ID),
)
)
if doc_result.scalar_one_or_none() is None:
raise HTTPException(status_code=404, detail="Document not found")
# Accept personal, group (user is a member), or system categories
cat_result = await db.execute(
select(DocumentCategory).where(
DocumentCategory.id == cat_id,
or_(
DocumentCategory.user_id == user_id,
DocumentCategory.group_id.in_(user_groups) if user_groups else False,
DocumentCategory.scope == "system",
),
)
)
if cat_result.scalar_one_or_none() is None:
raise HTTPException(status_code=404, detail="Category not found")
existing = await db.execute(
select(CategoryAssignment).where(
CategoryAssignment.document_id == doc_id,
CategoryAssignment.category_id == cat_id,
)
)
if existing.scalar_one_or_none() is None:
db.add(CategoryAssignment(document_id=doc_id, category_id=cat_id))
await db.commit()
@router.delete("/{doc_id}/categories/{cat_id}", status_code=204)
async def remove_category(
doc_id: str,
cat_id: str,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db),
) -> None:
# Only the document owner may remove a category assignment
doc_result = await db.execute(
select(Document).where(Document.id == doc_id, Document.user_id == user_id)
)
if doc_result.scalar_one_or_none() is None:
raise HTTPException(status_code=403, detail="Only the document owner can remove category assignments")
result = await db.execute(
select(CategoryAssignment).where(
CategoryAssignment.document_id == doc_id,
CategoryAssignment.category_id == cat_id,
)
)
assignment = result.scalar_one_or_none()
if assignment:
await db.delete(assignment)
await db.commit()
# ── AI suggestion confirmation ────────────────────────────────────────────────
@router.post("/{doc_id}/suggestions/folder/confirm", status_code=204)
async def confirm_folder_suggestion(
doc_id: str,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db),
) -> None:
doc = await _get_user_doc(doc_id, user_id, db)
if not doc.suggested_folder:
raise HTTPException(status_code=400, detail="No folder suggestion pending")
cat_result = await db.execute(
select(DocumentCategory).where(
DocumentCategory.user_id == _WATCH_USER_ID,
DocumentCategory.name == doc.suggested_folder,
)
)
cat = cat_result.scalar_one_or_none()
if cat is None:
cat = DocumentCategory(user_id=_WATCH_USER_ID, name=doc.suggested_folder[:128])
db.add(cat)
await db.commit()
await db.refresh(cat)
exists = await db.execute(
select(CategoryAssignment).where(
CategoryAssignment.document_id == doc_id,
CategoryAssignment.category_id == cat.id,
)
)
if exists.scalar_one_or_none() is None:
db.add(CategoryAssignment(document_id=doc_id, category_id=cat.id))
doc.suggested_folder = None
await db.commit()
@router.post("/{doc_id}/suggestions/folder/reject", status_code=204)
async def reject_folder_suggestion(
doc_id: str,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db),
) -> None:
doc = await _get_user_doc(doc_id, user_id, db)
doc.suggested_folder = None
await db.commit()
@router.post("/{doc_id}/suggestions/filename/confirm", status_code=204)
async def confirm_filename_suggestion(
doc_id: str,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db),
) -> None:
doc = await _get_user_doc(doc_id, user_id, db)
if not doc.suggested_filename:
raise HTTPException(status_code=400, detail="No filename suggestion pending")
doc.title = doc.suggested_filename
doc.suggested_filename = None
await db.commit()
@router.post("/{doc_id}/suggestions/filename/reject", status_code=204)
async def reject_filename_suggestion(
doc_id: str,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db),
) -> None:
doc = await _get_user_doc(doc_id, user_id, db)
doc.suggested_filename = None
await db.commit()
# ── Document sharing ──────────────────────────────────────────────────────────
@router.get("/{doc_id}/shares", response_model=list[DocumentShareOut])
async def list_document_shares(
doc_id: str,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db),
) -> list[DocumentShare]:
"""List all group shares for a document. Owner only."""
result = await db.execute(
select(Document).where(Document.id == doc_id, Document.user_id == user_id)
)
if result.scalar_one_or_none() is None:
raise HTTPException(status_code=404, detail="Document not found")
shares_result = await db.execute(
select(DocumentShare).where(DocumentShare.document_id == doc_id)
)
return shares_result.scalars().all()
@router.post("/{doc_id}/shares", response_model=DocumentShareOut, status_code=201)
async def add_document_share(
doc_id: str,
body: DocumentShareCreate,
user_id: str = Depends(get_user_id),
user_groups: list[str] = Depends(get_user_groups),
db: AsyncSession = Depends(get_db),
) -> DocumentShare:
"""Share a document with a group. The sharing user must own the document
and must be a member of the target group."""
result = await db.execute(
select(Document).where(Document.id == doc_id, Document.user_id == user_id)
)
if result.scalar_one_or_none() is None:
raise HTTPException(status_code=404, detail="Document not found")
if body.group_id not in user_groups:
raise HTTPException(
status_code=403,
detail="You can only share with groups you belong to",
)
# Idempotent — return existing share if already shared with this group
existing = await db.execute(
select(DocumentShare).where(
DocumentShare.document_id == doc_id,
DocumentShare.group_id == body.group_id,
)
)
share = existing.scalar_one_or_none()
if share is not None:
return share
share = DocumentShare(
document_id=doc_id,
group_id=body.group_id,
shared_by_user_id=user_id,
can_delete=body.can_delete,
)
db.add(share)
await db.commit()
await db.refresh(share)
return share
@router.delete("/{doc_id}/shares/{group_id}", status_code=204)
async def remove_document_share(
doc_id: str,
group_id: str,
user_id: str = Depends(get_user_id),
db: AsyncSession = Depends(get_db),
) -> None:
"""Remove a group share. Owner only."""
result = await db.execute(
select(Document).where(Document.id == doc_id, Document.user_id == user_id)
)
if result.scalar_one_or_none() is None:
raise HTTPException(status_code=404, detail="Document not found")
share_result = await db.execute(
select(DocumentShare).where(
DocumentShare.document_id == doc_id,
DocumentShare.group_id == group_id,
)
)
share = share_result.scalar_one_or_none()
if share:
await db.delete(share)
await db.commit()