from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from jose import JWTError from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.core.security import decode_access_token from app.database import get_db from app.models.user import User oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login") async def get_current_user( token: str = Depends(oauth2_scheme), db: AsyncSession = Depends(get_db), ) -> User: credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: user_id = decode_access_token(token) except JWTError: raise credentials_exception result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user or not user.is_active: raise credentials_exception return user async def get_current_admin( current_user: User = Depends(get_current_user), ) -> User: if not current_user.is_superuser: # Return 404 instead of 403 — reveals neither the existence of the # endpoint nor that the caller lacks permission. raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Not found", ) return current_user def get_service_admin(service_id: str): """ Dependency factory that grants access to service-specific admin endpoints. Access is granted if the user is a global superuser OR a member of the '{service_id}-admin' group. Returns 404 (not 403) to hide both the endpoint existence and the permission model. Usage: @router.get("/ai") async def get_ai_settings(_: User = Depends(get_service_admin("ai-service"))): """ async def _dep( current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ) -> User: if current_user.is_superuser: return current_user if await check_plugin_access(service_id, current_user, db): return current_user raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Not found") return _dep async def check_plugin_access( plugin_id: str, current_user: User, db: AsyncSession, ) -> bool: """ Return True if the user may access the given plugin's settings. Access is granted when any of these conditions holds: 1. The user is a superuser AND the manifest allows superuser access. 2. The user is a member of one of the groups listed in manifest.access.required_groups. Returns False (not raises) so callers can decide how to respond. """ from app.models.group import Group, GroupMembership from app.services.service_health import get_cached_manifest manifest = get_cached_manifest(plugin_id) if manifest is None: return False access = manifest.get("access", {}) if current_user.is_superuser and access.get("allow_superuser", True): return True for group_name in access.get("required_groups", []): result = await db.execute( select(GroupMembership) .join(Group, Group.id == GroupMembership.group_id) .where( Group.name == group_name, GroupMembership.user_id == current_user.id, ) ) if result.scalar_one_or_none() is not None: return True return False