Files
kite/backend/api/admin.py
T
curo1305 5950a3f5c2 feat(03-03): wire get_current_user into /api/topics/*; add load_topics_for_user; POST /api/admin/topics
- api/topics.py: add get_current_user dep to all 5 handlers (list, create, update, delete, suggest)
- list_topics: uses load_topics_for_user (system topics + user's own) with user-scoped doc counts
- create_topic: passes user_id=current_user.id (never creates system topics via regular endpoint)
- update_topic/delete_topic: ownership assertion — system topics and other users' topics return 404
- api/admin.py: add SystemTopicCreate model + POST /api/admin/topics (user_id=NULL, admin-only)
- services/storage.py: add or_ import; load_topics_for_user (D-17); create_topic gains user_id param with namespace-scoped dedup; topic_doc_counts gains optional user_id for user-scoped counts; add load_topics_for_user to __all__
- services/classifier.py: replace load_topics with load_topics_for_user(doc.user_id); pass user_id=doc.user_id to create_topic for AI-suggested topics (D-11)
- Tests: update all topic tests to pass auth headers; implement test_topic_namespace, test_admin_create_system_topic, test_regular_user_cannot_create_system_topic, test_topics_require_auth
2026-05-23 20:15:44 +02:00

412 lines
14 KiB
Python

"""
Admin API endpoints for DocuVault.
All handlers require get_current_admin (SEC-07, D-08) — no handler uses
get_current_user alone.
Implements:
GET /api/admin/users — list all users (ADMIN-01)
POST /api/admin/users — create user (ADMIN-01)
PATCH /api/admin/users/{id}/status — deactivate/reactivate (ADMIN-02)
POST /api/admin/users/{id}/password-reset — initiate reset email (ADMIN-03)
GET /api/admin/users/{id}/quota — view quota (ADMIN-04)
PATCH /api/admin/users/{id}/quota — adjust quota (ADMIN-04)
PATCH /api/admin/users/{id}/ai-config — assign AI provider/model (ADMIN-05)
Security invariants:
- Every handler injects Depends(get_current_admin) — verified by grep count
- _user_to_dict() whitelist helper prevents accidental field leakage (T-02-27)
- No impersonation endpoint — ADMIN-07 enforced by omission (T-02-28)
- Admin-created users: password_must_change=True (ADMIN-01, T-02-32)
- Deactivation of sole admin prevented (T-02-29)
- Password reset sends email via Celery; does not return token (T-02-30)
"""
from __future__ import annotations
import re
import uuid
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel, EmailStr, field_validator
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from db.models import Quota, RefreshToken, Topic, User
from deps.auth import get_current_admin
from deps.db import get_db
from services.auth import hash_password, revoke_all_refresh_tokens
router = APIRouter(prefix="/api/admin", tags=["admin"])
# ── Constants ─────────────────────────────────────────────────────────────────
_DEFAULT_QUOTA_BYTES = 104857600 # 100 MB free-tier default (D-06)
_PASSWORD_DETAIL = (
"Password must be at least 12 characters and include uppercase, "
"lowercase, a number, and a special character."
)
# ── Safe response helper ──────────────────────────────────────────────────────
def _user_to_dict(user: User) -> dict:
"""Return a safe subset of User fields — never includes password_hash,
credentials_enc, totp_secret, or any document content (T-02-27, SEC-07).
"""
return {
"id": str(user.id),
"handle": user.handle,
"email": user.email,
"role": user.role,
"is_active": user.is_active,
"totp_enabled": user.totp_enabled,
"ai_provider": user.ai_provider,
"ai_model": user.ai_model,
"password_must_change": user.password_must_change,
"created_at": user.created_at.isoformat() if user.created_at else None,
}
# ── Password strength helper ──────────────────────────────────────────────────
def _validate_password_strength(password: str) -> None:
"""Raise ValueError with the spec message if password fails any strength rule.
Rules (AUTH-01): min 12 chars, has uppercase, has lowercase, has digit,
has special char (non-alphanumeric).
"""
if len(password) < 12:
raise ValueError(_PASSWORD_DETAIL)
if not re.search(r"[A-Z]", password):
raise ValueError(_PASSWORD_DETAIL)
if not re.search(r"[a-z]", password):
raise ValueError(_PASSWORD_DETAIL)
if not re.search(r"[0-9]", password):
raise ValueError(_PASSWORD_DETAIL)
if not re.search(r"[^A-Za-z0-9]", password):
raise ValueError(_PASSWORD_DETAIL)
# ── Request models ────────────────────────────────────────────────────────────
class UserCreate(BaseModel):
handle: str
email: EmailStr
password: str
role: str = "user"
@field_validator("password")
@classmethod
def password_strength(cls, v: str) -> str:
try:
_validate_password_strength(v)
except ValueError as exc:
raise ValueError(str(exc)) from exc
return v
class UserStatusUpdate(BaseModel):
is_active: bool
class QuotaUpdate(BaseModel):
limit_bytes: int
@field_validator("limit_bytes")
@classmethod
def must_be_positive(cls, v: int) -> int:
if v <= 0:
raise ValueError("limit_bytes must be greater than 0")
return v
class AiConfigUpdate(BaseModel):
ai_provider: Optional[str] = None
ai_model: Optional[str] = None
class SystemTopicCreate(BaseModel):
"""Request model for admin system topic creation (D-09)."""
name: str
description: str = ""
color: str = "#6366f1"
# ── Endpoints ─────────────────────────────────────────────────────────────────
@router.get("/users")
async def list_users(
session: AsyncSession = Depends(get_db),
_admin: User = Depends(get_current_admin),
) -> dict:
"""List all users, ordered by created_at DESC.
Response shape: { items: [...safe user fields...] }
Never includes password_hash, credentials_enc, or document content (T-02-27).
"""
result = await session.execute(
select(User).order_by(User.created_at.desc())
)
users = result.scalars().all()
return {"items": [_user_to_dict(u) for u in users]}
@router.post("/users", status_code=status.HTTP_201_CREATED)
async def create_user(
body: UserCreate,
session: AsyncSession = Depends(get_db),
_admin: User = Depends(get_current_admin),
) -> dict:
"""Admin creates a new user account (ADMIN-01).
- password_must_change=True forces the user to change their password on
first login (T-02-32, D-06).
- Quota row initialized at 100 MB (D-06).
- Returns 409 if email or handle is already taken.
"""
# Check uniqueness
existing_email = await session.execute(
select(User).where(User.email == str(body.email))
)
if existing_email.scalar_one_or_none() is not None:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Email already registered",
)
existing_handle = await session.execute(
select(User).where(User.handle == body.handle)
)
if existing_handle.scalar_one_or_none() is not None:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Handle already taken",
)
new_user = User(
id=uuid.uuid4(),
handle=body.handle,
email=str(body.email),
password_hash=hash_password(body.password),
role=body.role,
is_active=True,
totp_enabled=False,
password_must_change=True, # ADMIN-01: force password change on first login
)
session.add(new_user)
quota = Quota(
user_id=new_user.id,
limit_bytes=_DEFAULT_QUOTA_BYTES,
used_bytes=0,
)
session.add(quota)
await session.flush()
return {
"id": str(new_user.id),
"handle": new_user.handle,
"email": new_user.email,
"role": new_user.role,
"created_at": new_user.created_at.isoformat() if new_user.created_at else None,
}
@router.patch("/users/{user_id}/status")
async def update_user_status(
user_id: uuid.UUID,
body: UserStatusUpdate,
session: AsyncSession = Depends(get_db),
_admin: User = Depends(get_current_admin),
) -> dict:
"""Deactivate or reactivate a user account (ADMIN-02).
- Prevents deactivating the last active admin (T-02-29).
- On deactivation: all refresh tokens are revoked (family revocation).
"""
user = await session.get(User, user_id)
if user is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
# Guard: cannot deactivate the only remaining active admin (T-02-29)
if not body.is_active and user.role == "admin":
count_result = await session.execute(
select(func.count(User.id)).where(
User.role == "admin",
User.is_active.is_(True),
)
)
active_admin_count = count_result.scalar_one()
if active_admin_count <= 1:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot deactivate the only admin",
)
user.is_active = body.is_active
if not body.is_active:
# Revoke all refresh tokens on deactivation
await revoke_all_refresh_tokens(session, user.id)
session.add(user)
await session.flush()
return {
"id": str(user.id),
"handle": user.handle,
"email": user.email,
"is_active": user.is_active,
}
@router.post("/users/{user_id}/password-reset", status_code=status.HTTP_202_ACCEPTED)
async def initiate_password_reset(
user_id: uuid.UUID,
session: AsyncSession = Depends(get_db),
_admin: User = Depends(get_current_admin),
) -> dict:
"""Admin initiates a password reset for a user (ADMIN-03).
Sends the reset email via Celery. Does NOT:
- return a reset token (T-02-30)
- grant admin access to the account
- log in as the target user (ADMIN-07 — no impersonation)
Returns 202 immediately regardless of email delivery status.
"""
user = await session.get(User, user_id)
if user is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
from services.auth import create_password_reset_token # noqa: PLC0415
from config import settings as _settings # noqa: PLC0415
reset_token = create_password_reset_token(str(user.id))
reset_link = f"{_settings.frontend_url}/password-reset/confirm?token={reset_token}"
# Deferred import to avoid circular imports (same pattern as document_tasks)
from tasks.email_tasks import send_reset_email # noqa: PLC0415
send_reset_email.delay(user.email, reset_link)
return {"message": "Password reset email sent"}
@router.get("/users/{user_id}/quota")
async def get_user_quota(
user_id: uuid.UUID,
session: AsyncSession = Depends(get_db),
_admin: User = Depends(get_current_admin),
) -> dict:
"""Return quota details for a user (ADMIN-04).
Quota info is admin-visible operational data — no PII, no document content
(T-02-31 disposition: accept).
"""
quota = await session.get(Quota, user_id)
if quota is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Quota not found")
return {
"user_id": str(quota.user_id),
"limit_bytes": quota.limit_bytes,
"used_bytes": quota.used_bytes,
"limit_mb": quota.limit_bytes // 1048576,
"used_mb": quota.used_bytes // 1048576,
}
@router.patch("/users/{user_id}/quota")
async def update_user_quota(
user_id: uuid.UUID,
body: QuotaUpdate,
session: AsyncSession = Depends(get_db),
_admin: User = Depends(get_current_admin),
) -> dict:
"""Adjust a user's storage quota (ADMIN-04).
If the new limit is below current usage, still applies the change but
returns warning=True with an explanatory message. Uploads will be blocked
but existing documents are preserved.
"""
quota = await session.get(Quota, user_id)
if quota is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Quota not found")
warning = body.limit_bytes < quota.used_bytes
warning_message = (
"New limit is below current usage. Uploads will be blocked but existing documents are preserved."
if warning
else None
)
quota.limit_bytes = body.limit_bytes
session.add(quota)
await session.flush()
response: dict = {
"user_id": str(quota.user_id),
"limit_bytes": quota.limit_bytes,
"used_bytes": quota.used_bytes,
"warning": warning,
}
if warning_message:
response["message"] = warning_message
return response
@router.patch("/users/{user_id}/ai-config")
async def update_ai_config(
user_id: uuid.UUID,
body: AiConfigUpdate,
session: AsyncSession = Depends(get_db),
_admin: User = Depends(get_current_admin),
) -> dict:
"""Assign AI provider and model for a user (ADMIN-05).
Users cannot change their own AI provider or model (PROJECT.md Key Decision).
Only admins have this capability.
"""
user = await session.get(User, user_id)
if user is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
user.ai_provider = body.ai_provider
user.ai_model = body.ai_model
session.add(user)
await session.flush()
return {
"id": str(user.id),
"email": user.email,
"ai_provider": user.ai_provider,
"ai_model": user.ai_model,
}
@router.post("/topics", status_code=status.HTTP_201_CREATED)
async def create_system_topic(
body: SystemTopicCreate,
session: AsyncSession = Depends(get_db),
_admin: User = Depends(get_current_admin),
) -> dict:
"""Create a system topic visible to all users (D-09, DOC-04).
System topics have user_id = NULL, making them visible to every user as
defaults in their topic namespace. Only admins can create system topics.
Regular users create per-user topics via POST /api/topics.
Deduplication: case-insensitive match within the system namespace (user_id IS NULL).
Returns the existing system topic if one with the same name already exists.
"""
from services import storage # noqa: PLC0415
topic = await storage.create_topic(
session, body.name, body.description, body.color, user_id=None
)
return topic