Files
Business-Management/backend/app/deps.py
T
curo1305 c45236651b Add service admin groups, combined settings pages, single Settings button
- Auto-create {service-id}-admin groups at startup (group_bootstrap.py)
- get_service_admin() dep: grants access to superusers OR service group members
- /api/settings/ai and /api/settings/documents/limits now allow service admins
- AI service exposes /plugin/manifest (ai-service-admin access group)
- DocServiceSettingsPage: combined upload limits + watch directory on one page
- ServiceAdminRoute in frontend guards new /apps/documents/settings and /apps/ai/settings
- Single Settings button per app card (visible to admins and service group members)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-18 02:49:57 +02:00

112 lines
3.5 KiB
Python

from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.security import decode_access_token
from app.database import get_db
from app.models.user import User
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login")
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db),
) -> User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
user_id = decode_access_token(token)
except JWTError:
raise credentials_exception
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user or not user.is_active:
raise credentials_exception
return user
async def get_current_admin(
current_user: User = Depends(get_current_user),
) -> User:
if not current_user.is_superuser:
# Return 404 instead of 403 — reveals neither the existence of the
# endpoint nor that the caller lacks permission.
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Not found",
)
return current_user
def get_service_admin(service_id: str):
"""
Dependency factory that grants access to service-specific admin endpoints.
Access is granted if the user is a global superuser OR a member of the
'{service_id}-admin' group. Returns 404 (not 403) to hide both the
endpoint existence and the permission model.
Usage:
@router.get("/ai")
async def get_ai_settings(_: User = Depends(get_service_admin("ai-service"))):
"""
async def _dep(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> User:
if current_user.is_superuser:
return current_user
if await check_plugin_access(service_id, current_user, db):
return current_user
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Not found")
return _dep
async def check_plugin_access(
plugin_id: str,
current_user: User,
db: AsyncSession,
) -> bool:
"""
Return True if the user may access the given plugin's settings.
Access is granted when any of these conditions holds:
1. The user is a superuser AND the manifest allows superuser access.
2. The user is a member of one of the groups listed in manifest.access.required_groups.
Returns False (not raises) so callers can decide how to respond.
"""
from app.models.group import Group, GroupMembership
from app.services.service_health import get_cached_manifest
manifest = get_cached_manifest(plugin_id)
if manifest is None:
return False
access = manifest.get("access", {})
if current_user.is_superuser and access.get("allow_superuser", True):
return True
for group_name in access.get("required_groups", []):
result = await db.execute(
select(GroupMembership)
.join(Group, Group.id == GroupMembership.group_id)
.where(
Group.name == group_name,
GroupMembership.user_id == current_user.id,
)
)
if result.scalar_one_or_none() is not None:
return True
return False