""" Admin API endpoints for DocuVault. All handlers require get_current_admin (SEC-07, D-08) — no handler uses get_current_user alone. Implements: GET /api/admin/users — list all users (ADMIN-01) POST /api/admin/users — create user (ADMIN-01) PATCH /api/admin/users/{id}/status — deactivate/reactivate (ADMIN-02) POST /api/admin/users/{id}/password-reset — initiate reset email (ADMIN-03) GET /api/admin/users/{id}/quota — view quota (ADMIN-04) PATCH /api/admin/users/{id}/quota — adjust quota (ADMIN-04) PATCH /api/admin/users/{id}/ai-config — assign AI provider/model (ADMIN-05) Security invariants: - Every handler injects Depends(get_current_admin) — verified by grep count - _user_to_dict() whitelist helper prevents accidental field leakage (T-02-27) - No impersonation endpoint — ADMIN-07 enforced by omission (T-02-28) - Admin-created users: password_must_change=True (ADMIN-01, T-02-32) - Deactivation of sole admin prevented (T-02-29) - Password reset sends email via Celery; does not return token (T-02-30) """ from __future__ import annotations import re import uuid from datetime import datetime from typing import Optional from fastapi import APIRouter, Depends, HTTPException, Request, status from pydantic import BaseModel, EmailStr, field_validator from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from db.models import CloudConnection, Document, Quota, RefreshToken, Topic, User from deps.auth import get_current_admin from deps.db import get_db from services.audit import write_audit_log from services.auth import hash_password, revoke_all_refresh_tokens, verify_password from storage import get_storage_backend, get_storage_backend_for_document router = APIRouter(prefix="/api/admin", tags=["admin"]) # ── Constants ───────────────────────────────────────────────────────────────── _DEFAULT_QUOTA_BYTES = 104857600 # 100 MB free-tier default (D-06) _PASSWORD_DETAIL = ( "Password must be at least 12 characters and include uppercase, " "lowercase, a number, and a special character." ) # ── Safe response helper ────────────────────────────────────────────────────── def _user_to_dict(user: User) -> dict: """Return a safe subset of User fields — never includes password_hash, credentials_enc, totp_secret, or any document content (T-02-27, SEC-07). """ return { "id": str(user.id), "handle": user.handle, "email": user.email, "role": user.role, "is_active": user.is_active, "totp_enabled": user.totp_enabled, "ai_provider": user.ai_provider, "ai_model": user.ai_model, "password_must_change": user.password_must_change, "created_at": user.created_at.isoformat() if user.created_at else None, } # ── Password strength helper ────────────────────────────────────────────────── def _validate_password_strength(password: str) -> None: """Raise ValueError with the spec message if password fails any strength rule. Rules (AUTH-01): min 12 chars, has uppercase, has lowercase, has digit, has special char (non-alphanumeric). """ if len(password) < 12: raise ValueError(_PASSWORD_DETAIL) if not re.search(r"[A-Z]", password): raise ValueError(_PASSWORD_DETAIL) if not re.search(r"[a-z]", password): raise ValueError(_PASSWORD_DETAIL) if not re.search(r"[0-9]", password): raise ValueError(_PASSWORD_DETAIL) if not re.search(r"[^A-Za-z0-9]", password): raise ValueError(_PASSWORD_DETAIL) # ── Request models ──────────────────────────────────────────────────────────── class UserCreate(BaseModel): handle: str email: EmailStr password: str role: str = "user" @field_validator("password") @classmethod def password_strength(cls, v: str) -> str: try: _validate_password_strength(v) except ValueError as exc: raise ValueError(str(exc)) from exc return v class UserStatusUpdate(BaseModel): is_active: bool class QuotaUpdate(BaseModel): limit_bytes: int @field_validator("limit_bytes") @classmethod def must_be_positive(cls, v: int) -> int: if v <= 0: raise ValueError("limit_bytes must be greater than 0") return v class AiConfigUpdate(BaseModel): ai_provider: Optional[str] = None ai_model: Optional[str] = None class SystemTopicCreate(BaseModel): """Request model for admin system topic creation (D-09).""" name: str description: str = "" color: str = "#6366f1" class UserDeleteConfirm(BaseModel): """Admin password confirmation required before hard-deleting a user (ADMIN-02, T-05-11-01).""" admin_password: str # ── SEC-08: Safe CloudConnection response model ─────────────────────────────── class CloudConnectionOut(BaseModel): """SEC-08: credentials_enc deliberately excluded from this response model. Any admin or user endpoint returning CloudConnection ORM objects MUST use this model to prevent accidental exposure of encrypted credentials. Safe-by-default: whitelist of allowed fields (not blacklist). Note: id is declared as str and coerced via validator so UUID ORM values serialize correctly without json_encoders (Rule 1 fix — T-05-06 test suite). """ id: str provider: str display_name: str status: str connected_at: datetime model_config = {"from_attributes": True} @field_validator("id", mode="before") @classmethod def coerce_id_to_str(cls, v) -> str: """Coerce UUID objects to str so the model validates from ORM instances.""" return str(v) # ── Endpoints ───────────────────────────────────────────────────────────────── @router.get("/users") async def list_users( session: AsyncSession = Depends(get_db), _admin: User = Depends(get_current_admin), ) -> dict: """List all users, ordered by created_at DESC. Response shape: { items: [...safe user fields...] } Never includes password_hash, credentials_enc, or document content (T-02-27). """ result = await session.execute( select(User).order_by(User.created_at.desc()) ) users = result.scalars().all() return {"items": [_user_to_dict(u) for u in users]} @router.post("/users", status_code=status.HTTP_201_CREATED) async def create_user( request: Request, body: UserCreate, session: AsyncSession = Depends(get_db), _admin: User = Depends(get_current_admin), ) -> dict: """Admin creates a new user account (ADMIN-01). - password_must_change=True forces the user to change their password on first login (T-02-32, D-06). - Quota row initialized at 100 MB (D-06). - Returns 409 if email or handle is already taken. """ # Check uniqueness existing_email = await session.execute( select(User).where(User.email == str(body.email)) ) if existing_email.scalar_one_or_none() is not None: raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="Email already registered", ) existing_handle = await session.execute( select(User).where(User.handle == body.handle) ) if existing_handle.scalar_one_or_none() is not None: raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="Handle already taken", ) new_user = User( id=uuid.uuid4(), handle=body.handle, email=str(body.email), password_hash=hash_password(body.password), role=body.role, is_active=True, totp_enabled=False, password_must_change=True, # ADMIN-01: force password change on first login ) session.add(new_user) quota = Quota( user_id=new_user.id, limit_bytes=_DEFAULT_QUOTA_BYTES, used_bytes=0, ) session.add(quota) # D-13: admin user created event _ip = request.headers.get("X-Forwarded-For") or (request.client.host if request.client else None) await write_audit_log( session, event_type="admin.user_created", user_id=new_user.id, actor_id=_admin.id, resource_id=new_user.id, ip_address=_ip, ) await session.commit() return { "id": str(new_user.id), "handle": new_user.handle, "email": new_user.email, "role": new_user.role, "created_at": new_user.created_at.isoformat() if new_user.created_at else None, } @router.patch("/users/{user_id}/status") async def update_user_status( user_id: uuid.UUID, body: UserStatusUpdate, request: Request, session: AsyncSession = Depends(get_db), _admin: User = Depends(get_current_admin), ) -> dict: """Deactivate or reactivate a user account (ADMIN-02). - Prevents deactivating the last active admin (T-02-29). - On deactivation: all refresh tokens are revoked (family revocation). """ user = await session.get(User, user_id) if user is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found") # Guard: cannot deactivate the only remaining active admin (T-02-29) if not body.is_active and user.role == "admin": count_result = await session.execute( select(func.count(User.id)).where( User.role == "admin", User.is_active.is_(True), ) ) active_admin_count = count_result.scalar_one() if active_admin_count <= 1: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot deactivate the only admin", ) _ip = request.headers.get("X-Forwarded-For") or (request.client.host if request.client else None) user.is_active = body.is_active if not body.is_active: # Revoke all refresh tokens on deactivation await revoke_all_refresh_tokens(session, user.id) session.add(user) # D-13: user deactivated/activated event _event = "admin.user_deactivated" if not body.is_active else "admin.user_activated" await write_audit_log( session, event_type=_event, user_id=user.id, actor_id=_admin.id, resource_id=user.id, ip_address=_ip, ) await session.commit() return { "id": str(user.id), "handle": user.handle, "email": user.email, "is_active": user.is_active, } @router.post("/users/{user_id}/password-reset", status_code=status.HTTP_202_ACCEPTED) async def initiate_password_reset( user_id: uuid.UUID, session: AsyncSession = Depends(get_db), _admin: User = Depends(get_current_admin), ) -> dict: """Admin initiates a password reset for a user (ADMIN-03). Sends the reset email via Celery. Does NOT: - return a reset token (T-02-30) - grant admin access to the account - log in as the target user (ADMIN-07 — no impersonation) Returns 202 immediately regardless of email delivery status. """ user = await session.get(User, user_id) if user is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found") from services.auth import create_password_reset_token # noqa: PLC0415 from config import settings as _settings # noqa: PLC0415 reset_token = create_password_reset_token(str(user.id)) reset_link = f"{_settings.frontend_url}/password-reset/confirm?token={reset_token}" # Deferred import to avoid circular imports (same pattern as document_tasks) from tasks.email_tasks import send_reset_email # noqa: PLC0415 send_reset_email.delay(user.email, reset_link) return {"message": "Password reset email sent"} @router.get("/users/{user_id}/quota") async def get_user_quota( user_id: uuid.UUID, session: AsyncSession = Depends(get_db), _admin: User = Depends(get_current_admin), ) -> dict: """Return quota details for a user (ADMIN-04). Quota info is admin-visible operational data — no PII, no document content (T-02-31 disposition: accept). """ quota = await session.get(Quota, user_id) if quota is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Quota not found") return { "user_id": str(quota.user_id), "limit_bytes": quota.limit_bytes, "used_bytes": quota.used_bytes, "limit_mb": quota.limit_bytes // 1048576, "used_mb": quota.used_bytes // 1048576, } @router.patch("/users/{user_id}/quota") async def update_user_quota( user_id: uuid.UUID, body: QuotaUpdate, request: Request, session: AsyncSession = Depends(get_db), _admin: User = Depends(get_current_admin), ) -> dict: """Adjust a user's storage quota (ADMIN-04). If the new limit is below current usage, still applies the change but returns warning=True with an explanatory message. Uploads will be blocked but existing documents are preserved. """ quota = await session.get(Quota, user_id) if quota is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Quota not found") warning = body.limit_bytes < quota.used_bytes warning_message = ( "New limit is below current usage. Uploads will be blocked but existing documents are preserved." if warning else None ) _ip = request.headers.get("X-Forwarded-For") or (request.client.host if request.client else None) old_limit = quota.limit_bytes quota.limit_bytes = body.limit_bytes session.add(quota) # D-13: quota changed event await write_audit_log( session, event_type="admin.quota_changed", user_id=user_id, actor_id=_admin.id, resource_id=None, ip_address=_ip, metadata_={"old_bytes": old_limit, "new_bytes": body.limit_bytes}, ) await session.commit() response: dict = { "user_id": str(quota.user_id), "limit_bytes": quota.limit_bytes, "used_bytes": quota.used_bytes, "warning": warning, } if warning_message: response["message"] = warning_message return response @router.patch("/users/{user_id}/ai-config") async def update_ai_config( user_id: uuid.UUID, body: AiConfigUpdate, request: Request, session: AsyncSession = Depends(get_db), _admin: User = Depends(get_current_admin), ) -> dict: """Assign AI provider and model for a user (ADMIN-05). Users cannot change their own AI provider or model (PROJECT.md Key Decision). Only admins have this capability. """ user = await session.get(User, user_id) if user is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found") _ip = request.headers.get("X-Forwarded-For") or (request.client.host if request.client else None) user.ai_provider = body.ai_provider user.ai_model = body.ai_model session.add(user) # D-13: AI provider assigned event await write_audit_log( session, event_type="admin.ai_provider_assigned", user_id=user_id, actor_id=_admin.id, resource_id=None, ip_address=_ip, metadata_={"provider": body.ai_provider, "model": body.ai_model}, ) await session.commit() return { "id": str(user.id), "email": user.email, "ai_provider": user.ai_provider, "ai_model": user.ai_model, } @router.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_user( user_id: uuid.UUID, body: UserDeleteConfirm, request: Request, session: AsyncSession = Depends(get_db), _admin: User = Depends(get_current_admin), ) -> None: """Delete a user account and clean up all their MinIO objects (SEC-09, D-19). Security invariants: - Admin password verified via Argon2 before any deletion (T-05-11-01) - Cannot delete admin accounts (T-04-07-04) - MinIO objects are deleted BEFORE DB records are removed (SEC-09) - MinIO deletion is best-effort (try/except) — DB row is deleted regardless - Audit log written with event_type="admin.user_deleted" """ # T-05-11-01: Verify admin password before performing any destructive action. # Fail fast — no DB reads for the target user until the admin is confirmed. if not verify_password(body.admin_password, _admin.password_hash): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Invalid admin password", ) user = await session.get(User, user_id) if user is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found") # T-04-07-04: Cannot delete admin accounts if user.role == "admin": raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot delete admin accounts", ) _ip = request.headers.get("X-Forwarded-For") or (request.client.host if request.client else None) # SEC-09 (cloud): purge cloud-stored documents and credentials BEFORE DB delete. # Must run before MinIO cleanup so that credentials are still available to build # the cloud backend instances for delete_object calls. cloud_conns_result = await session.execute( select(CloudConnection).where(CloudConnection.user_id == user_id) ) cloud_conns = cloud_conns_result.scalars().all() for conn in cloud_conns: # Delete cloud objects stored in this provider for this user cloud_docs_result = await session.execute( select(Document).where( Document.user_id == user_id, Document.storage_backend == conn.provider, ) ) for doc in cloud_docs_result.scalars().all(): try: backend = await get_storage_backend_for_document(doc, user, session) await backend.delete_object(doc.object_key) except Exception: pass # Best-effort cloud object cleanup; deletion proceeds regardless # Purge the credentials row (FK cascade would also remove it, but explicit # deletion here guarantees credentials_enc is gone before commit — SEC-09) await session.delete(conn) if cloud_conns: await session.flush() # Flush connection deletes before user delete await write_audit_log( session, event_type="cloud.credentials_purged", user_id=user_id, actor_id=_admin.id, resource_id=user_id, ip_address=_ip, metadata_={"providers": [c.provider for c in cloud_conns]}, ) # SEC-09 (minio): collect all user documents and delete MinIO objects BEFORE DB delete docs_result = await session.execute( select(Document).where(Document.user_id == user_id) ) user_docs = docs_result.scalars().all() storage = get_storage_backend() for doc in user_docs: try: await storage.delete_object(doc.object_key) except Exception: pass # Best-effort MinIO cleanup; DB deletion proceeds regardless # D-13: audit log BEFORE deleting the user row (user FK still valid at flush time) await write_audit_log( session, event_type="admin.user_deleted", user_id=user_id, actor_id=_admin.id, resource_id=user_id, ip_address=_ip, ) await session.flush() # Delete user record (CASCADE removes quota, documents, refresh_tokens, etc.) await session.delete(user) await session.commit() @router.post("/topics", status_code=status.HTTP_201_CREATED) async def create_system_topic( body: SystemTopicCreate, session: AsyncSession = Depends(get_db), _admin: User = Depends(get_current_admin), ) -> dict: """Create a system topic visible to all users (D-09, DOC-04). System topics have user_id = NULL, making them visible to every user as defaults in their topic namespace. Only admins can create system topics. Regular users create per-user topics via POST /api/topics. Deduplication: case-insensitive match within the system namespace (user_id IS NULL). Returns the existing system topic if one with the same name already exists. """ from services import storage # noqa: PLC0415 topic = await storage.create_topic( session, body.name, body.description, body.color, user_id=None ) return topic