feat(02-01): implement deps/auth.py FastAPI dependency chain with tests
- get_current_user: validates Bearer JWT via decode_access_token, loads User from DB raises HTTP 401 on invalid/expired token, missing user, or deactivated account - get_current_admin: wraps get_current_user, raises HTTP 403 on role != 'admin' (T-02-07) - Admin impersonation architecturally excluded (ADMIN-07, T-02-08) — no code path bypasses role check - tests/test_auth_deps.py: 7 tests covering happy path, tampered token, inactive user, 403 non-admin, 200 admin
This commit is contained in:
@@ -0,0 +1,92 @@
|
||||
"""
|
||||
FastAPI authentication dependency chain for DocuVault.
|
||||
|
||||
Provides two reusable FastAPI dependencies:
|
||||
- get_current_user: validates the Bearer JWT and returns the User ORM object
|
||||
- get_current_admin: requires user.role == 'admin' (T-02-07)
|
||||
|
||||
Usage in route handlers:
|
||||
from deps.auth import get_current_user, get_current_admin
|
||||
from db.models import User
|
||||
|
||||
@router.get("/me")
|
||||
async def get_me(current_user: User = Depends(get_current_user)):
|
||||
return {"id": str(current_user.id), "email": current_user.email}
|
||||
|
||||
@router.get("/admin/users")
|
||||
async def list_users(
|
||||
_admin: User = Depends(get_current_admin),
|
||||
session: AsyncSession = Depends(get_db),
|
||||
):
|
||||
...
|
||||
"""
|
||||
import uuid
|
||||
|
||||
from fastapi import Depends, HTTPException, status
|
||||
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from db.models import User
|
||||
from deps.db import get_db
|
||||
from services import auth as auth_service
|
||||
|
||||
# HTTPBearer parses the Authorization: Bearer <token> header.
|
||||
# auto_error=True (default) raises 403 if no Authorization header is present.
|
||||
security = HTTPBearer()
|
||||
|
||||
|
||||
async def get_current_user(
|
||||
credentials: HTTPAuthorizationCredentials = Depends(security),
|
||||
session: AsyncSession = Depends(get_db),
|
||||
) -> User:
|
||||
"""Validate the Bearer JWT and return the active User ORM object.
|
||||
|
||||
Raises HTTP 401 if:
|
||||
- Token is missing, expired, or tampered (handled by HTTPBearer + decode)
|
||||
- User does not exist in the database
|
||||
- User account is deactivated (is_active=False)
|
||||
"""
|
||||
try:
|
||||
payload = auth_service.decode_access_token(credentials.credentials)
|
||||
except ValueError as exc:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Invalid or expired token",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
) from exc
|
||||
|
||||
try:
|
||||
user_uuid = uuid.UUID(payload["sub"])
|
||||
except (KeyError, ValueError) as exc:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Invalid token subject",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
) from exc
|
||||
|
||||
user = await session.get(User, user_uuid)
|
||||
if user is None or not user.is_active:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="User not found or deactivated",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
|
||||
return user
|
||||
|
||||
|
||||
async def get_current_admin(
|
||||
user: User = Depends(get_current_user),
|
||||
) -> User:
|
||||
"""Require admin role; raises HTTP 403 otherwise (T-02-07).
|
||||
|
||||
Admin impersonation is architecturally excluded (ADMIN-07, T-02-08):
|
||||
no code path sets a JWT sub to a different user's id. This dependency
|
||||
only checks that the authenticated user's role is 'admin'.
|
||||
"""
|
||||
if user.role != "admin":
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail="Admin access required",
|
||||
)
|
||||
return user
|
||||
Reference in New Issue
Block a user