Add admin user management with role-gated access

Backend:
- schemas/user.py: is_admin (validation_alias=is_superuser) on UserOut and
  UserAdminOut; UserAdminCreate extends UserCreate with is_admin flag
- deps.py: get_current_admin dependency — 403 for non-superusers
- routers/admin.py: GET/POST /api/admin/users, DELETE and PATCH /active per
  user; self-delete and self-deactivate blocked
- main.py: register /api/admin router
- scripts/seed.py: seed test user with is_superuser=True; promotes existing
  user if already created without the flag

Frontend:
- api/client.ts: UserData type with is_admin, admin API functions
- components/Nav.tsx: Admin link visible only when user.is_admin is true
- pages/AdminPage.tsx: user table with add-user form, delete, toggle active
- App.tsx: AdminRoute guard (403-redirects non-admins to /); /admin route

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
curo1305
2026-04-13 18:40:05 +02:00
parent d46191789d
commit 456681fdfa
11 changed files with 359 additions and 8 deletions
+11
View File
@@ -30,3 +30,14 @@ async def get_current_user(
if not user or not user.is_active:
raise credentials_exception
return user
async def get_current_admin(
current_user: User = Depends(get_current_user),
) -> User:
if not current_user.is_superuser:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin access required",
)
return current_user
+2 -1
View File
@@ -2,7 +2,7 @@ from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.core.config import settings
from app.routers import auth, profile, users
from app.routers import admin, auth, profile, users
app = FastAPI(title=settings.PROJECT_NAME, version="0.1.0")
@@ -17,6 +17,7 @@ app.add_middleware(
app.include_router(auth.router, prefix="/api/auth", tags=["auth"])
app.include_router(users.router, prefix="/api/users", tags=["users"])
app.include_router(profile.router, prefix="/api/profile", tags=["profile"])
app.include_router(admin.router, prefix="/api/admin", tags=["admin"])
@app.get("/api/health")
+80
View File
@@ -0,0 +1,80 @@
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.security import hash_password
from app.database import get_db
from app.deps import get_current_admin
from app.models.user import User
from app.schemas.user import UserAdminCreate, UserAdminOut
router = APIRouter()
@router.get("/users", response_model=list[UserAdminOut])
async def list_users(
_admin: User = Depends(get_current_admin),
db: AsyncSession = Depends(get_db),
) -> list[User]:
result = await db.execute(select(User).order_by(User.email))
return list(result.scalars().all())
@router.post("/users", response_model=UserAdminOut, status_code=status.HTTP_201_CREATED)
async def create_user(
body: UserAdminCreate,
_admin: User = Depends(get_current_admin),
db: AsyncSession = Depends(get_db),
) -> User:
existing = await db.execute(select(User).where(User.email == body.email))
if existing.scalar_one_or_none():
raise HTTPException(status_code=400, detail="Email already registered")
user = User(
email=body.email,
hashed_password=hash_password(body.password),
full_name=body.full_name,
is_superuser=body.is_admin,
)
db.add(user)
await db.commit()
await db.refresh(user)
return user
@router.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(
user_id: str,
admin: User = Depends(get_current_admin),
db: AsyncSession = Depends(get_db),
) -> None:
if user_id == admin.id:
raise HTTPException(status_code=400, detail="Cannot delete your own account")
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
await db.delete(user)
await db.commit()
@router.patch("/users/{user_id}/active", response_model=UserAdminOut)
async def toggle_active(
user_id: str,
admin: User = Depends(get_current_admin),
db: AsyncSession = Depends(get_db),
) -> User:
if user_id == admin.id:
raise HTTPException(status_code=400, detail="Cannot change your own active status")
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
user.is_active = not user.is_active
await db.commit()
await db.refresh(user)
return user
+23 -2
View File
@@ -1,6 +1,6 @@
import re
from pydantic import BaseModel, EmailStr, field_validator
from pydantic import BaseModel, EmailStr, Field, field_validator
from app.core.sanitize import normalize_email, sanitize_str
@@ -68,8 +68,29 @@ class UserOut(BaseModel):
email: str
full_name: str | None
is_active: bool
# validation_alias reads is_superuser from the ORM object; the JSON key
# in the response is the field name "is_admin" (not the alias).
is_admin: bool = Field(validation_alias="is_superuser", default=False)
model_config = {"from_attributes": True}
model_config = {"from_attributes": True, "populate_by_name": True}
# ── Admin-facing schemas ───────────────────────────────────────────────────────
class UserAdminOut(BaseModel):
"""Full user record returned to admin endpoints."""
id: str
email: str
full_name: str | None
is_active: bool
is_admin: bool = Field(validation_alias="is_superuser", default=False)
model_config = {"from_attributes": True, "populate_by_name": True}
class UserAdminCreate(UserCreate):
"""Admin creates a user and can optionally grant admin rights."""
is_admin: bool = False
class Token(BaseModel):
+12 -3
View File
@@ -16,18 +16,27 @@ TEST_NAME = "Test User"
async def seed() -> None:
async with AsyncSessionLocal() as db:
result = await db.execute(select(User).where(User.email == TEST_EMAIL))
if result.scalar_one_or_none():
print(f"[seed] test user already exists: {TEST_EMAIL}")
existing = result.scalar_one_or_none()
if existing:
# Ensure the dev test user is always an admin
if not existing.is_superuser:
existing.is_superuser = True
await db.commit()
print(f"[seed] promoted test user to admin: {TEST_EMAIL}")
else:
print(f"[seed] test user already exists: {TEST_EMAIL}")
return
user = User(
email=TEST_EMAIL,
hashed_password=hash_password(TEST_PASSWORD),
full_name=TEST_NAME,
is_superuser=True,
)
db.add(user)
await db.commit()
print(f"[seed] created test user — email: {TEST_EMAIL} pwd: {TEST_PASSWORD}")
print(f"[seed] created test admin — email: {TEST_EMAIL} pwd: {TEST_PASSWORD}")
if __name__ == "__main__":