feat(02-01): implement deps/auth.py FastAPI dependency chain with tests
- get_current_user: validates Bearer JWT via decode_access_token, loads User from DB raises HTTP 401 on invalid/expired token, missing user, or deactivated account - get_current_admin: wraps get_current_user, raises HTTP 403 on role != 'admin' (T-02-07) - Admin impersonation architecturally excluded (ADMIN-07, T-02-08) — no code path bypasses role check - tests/test_auth_deps.py: 7 tests covering happy path, tampered token, inactive user, 403 non-admin, 200 admin
This commit is contained in:
@@ -0,0 +1,92 @@
|
|||||||
|
"""
|
||||||
|
FastAPI authentication dependency chain for DocuVault.
|
||||||
|
|
||||||
|
Provides two reusable FastAPI dependencies:
|
||||||
|
- get_current_user: validates the Bearer JWT and returns the User ORM object
|
||||||
|
- get_current_admin: requires user.role == 'admin' (T-02-07)
|
||||||
|
|
||||||
|
Usage in route handlers:
|
||||||
|
from deps.auth import get_current_user, get_current_admin
|
||||||
|
from db.models import User
|
||||||
|
|
||||||
|
@router.get("/me")
|
||||||
|
async def get_me(current_user: User = Depends(get_current_user)):
|
||||||
|
return {"id": str(current_user.id), "email": current_user.email}
|
||||||
|
|
||||||
|
@router.get("/admin/users")
|
||||||
|
async def list_users(
|
||||||
|
_admin: User = Depends(get_current_admin),
|
||||||
|
session: AsyncSession = Depends(get_db),
|
||||||
|
):
|
||||||
|
...
|
||||||
|
"""
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
from fastapi import Depends, HTTPException, status
|
||||||
|
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
||||||
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
|
from db.models import User
|
||||||
|
from deps.db import get_db
|
||||||
|
from services import auth as auth_service
|
||||||
|
|
||||||
|
# HTTPBearer parses the Authorization: Bearer <token> header.
|
||||||
|
# auto_error=True (default) raises 403 if no Authorization header is present.
|
||||||
|
security = HTTPBearer()
|
||||||
|
|
||||||
|
|
||||||
|
async def get_current_user(
|
||||||
|
credentials: HTTPAuthorizationCredentials = Depends(security),
|
||||||
|
session: AsyncSession = Depends(get_db),
|
||||||
|
) -> User:
|
||||||
|
"""Validate the Bearer JWT and return the active User ORM object.
|
||||||
|
|
||||||
|
Raises HTTP 401 if:
|
||||||
|
- Token is missing, expired, or tampered (handled by HTTPBearer + decode)
|
||||||
|
- User does not exist in the database
|
||||||
|
- User account is deactivated (is_active=False)
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
payload = auth_service.decode_access_token(credentials.credentials)
|
||||||
|
except ValueError as exc:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||||
|
detail="Invalid or expired token",
|
||||||
|
headers={"WWW-Authenticate": "Bearer"},
|
||||||
|
) from exc
|
||||||
|
|
||||||
|
try:
|
||||||
|
user_uuid = uuid.UUID(payload["sub"])
|
||||||
|
except (KeyError, ValueError) as exc:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||||
|
detail="Invalid token subject",
|
||||||
|
headers={"WWW-Authenticate": "Bearer"},
|
||||||
|
) from exc
|
||||||
|
|
||||||
|
user = await session.get(User, user_uuid)
|
||||||
|
if user is None or not user.is_active:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||||
|
detail="User not found or deactivated",
|
||||||
|
headers={"WWW-Authenticate": "Bearer"},
|
||||||
|
)
|
||||||
|
|
||||||
|
return user
|
||||||
|
|
||||||
|
|
||||||
|
async def get_current_admin(
|
||||||
|
user: User = Depends(get_current_user),
|
||||||
|
) -> User:
|
||||||
|
"""Require admin role; raises HTTP 403 otherwise (T-02-07).
|
||||||
|
|
||||||
|
Admin impersonation is architecturally excluded (ADMIN-07, T-02-08):
|
||||||
|
no code path sets a JWT sub to a different user's id. This dependency
|
||||||
|
only checks that the authenticated user's role is 'admin'.
|
||||||
|
"""
|
||||||
|
if user.role != "admin":
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=status.HTTP_403_FORBIDDEN,
|
||||||
|
detail="Admin access required",
|
||||||
|
)
|
||||||
|
return user
|
||||||
@@ -0,0 +1,160 @@
|
|||||||
|
"""
|
||||||
|
Tests for backend/deps/auth.py — FastAPI dependency chain.
|
||||||
|
|
||||||
|
Tests verify:
|
||||||
|
- get_current_user returns the User ORM object when a valid Bearer token is provided
|
||||||
|
- get_current_user raises HTTP 401 for expired/tampered tokens
|
||||||
|
- get_current_user raises HTTP 401 when user.is_active is False
|
||||||
|
- get_current_admin raises HTTP 403 when user.role == "user"
|
||||||
|
- get_current_admin returns user when user.role == "admin"
|
||||||
|
"""
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import pytest_asyncio
|
||||||
|
from httpx import ASGITransport, AsyncClient
|
||||||
|
from fastapi import FastAPI, Depends
|
||||||
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
|
|
||||||
|
# ── Minimal test app with /test/me and /test/admin routes ─────────────────────
|
||||||
|
|
||||||
|
def make_test_app():
|
||||||
|
"""Create a minimal FastAPI app that exercises the auth deps."""
|
||||||
|
from deps.auth import get_current_user, get_current_admin
|
||||||
|
from db.models import User
|
||||||
|
|
||||||
|
test_app = FastAPI()
|
||||||
|
|
||||||
|
@test_app.get("/test/me")
|
||||||
|
async def get_me(current_user: User = Depends(get_current_user)):
|
||||||
|
return {"id": str(current_user.id), "role": current_user.role}
|
||||||
|
|
||||||
|
@test_app.get("/test/admin")
|
||||||
|
async def admin_only(_admin: User = Depends(get_current_admin)):
|
||||||
|
return {"role": _admin.role}
|
||||||
|
|
||||||
|
return test_app
|
||||||
|
|
||||||
|
|
||||||
|
@pytest_asyncio.fixture
|
||||||
|
async def auth_client(db_session: AsyncSession):
|
||||||
|
"""Async HTTP test client for the auth-dep test app."""
|
||||||
|
from deps.db import get_db
|
||||||
|
|
||||||
|
app = make_test_app()
|
||||||
|
app.dependency_overrides[get_db] = lambda: db_session
|
||||||
|
|
||||||
|
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c:
|
||||||
|
yield c
|
||||||
|
|
||||||
|
app.dependency_overrides.clear()
|
||||||
|
|
||||||
|
|
||||||
|
async def _create_user(db_session, role: str = "user", is_active: bool = True):
|
||||||
|
"""Helper: insert a minimal User row into the test DB."""
|
||||||
|
from db.models import User
|
||||||
|
from services.auth import hash_password
|
||||||
|
|
||||||
|
user = User(
|
||||||
|
id=uuid.uuid4(),
|
||||||
|
handle=f"user_{uuid.uuid4().hex[:6]}",
|
||||||
|
email=f"{uuid.uuid4().hex[:6]}@example.com",
|
||||||
|
password_hash=hash_password("testpassword"),
|
||||||
|
role=role,
|
||||||
|
is_active=is_active,
|
||||||
|
)
|
||||||
|
db_session.add(user)
|
||||||
|
await db_session.commit()
|
||||||
|
return user
|
||||||
|
|
||||||
|
|
||||||
|
# ── Tests ────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_get_current_user_returns_user(auth_client, db_session):
|
||||||
|
"""A valid Bearer token should return the user's id and role."""
|
||||||
|
from services.auth import create_access_token
|
||||||
|
|
||||||
|
user = await _create_user(db_session, role="user")
|
||||||
|
token = create_access_token(str(user.id), "user")
|
||||||
|
|
||||||
|
resp = await auth_client.get(
|
||||||
|
"/test/me", headers={"Authorization": f"Bearer {token}"}
|
||||||
|
)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
data = resp.json()
|
||||||
|
assert data["id"] == str(user.id)
|
||||||
|
assert data["role"] == "user"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_get_current_user_rejects_tampered_token(auth_client):
|
||||||
|
"""A tampered Bearer token should return 401."""
|
||||||
|
tampered = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJmYWtlIn0.invalidsig"
|
||||||
|
resp = await auth_client.get(
|
||||||
|
"/test/me", headers={"Authorization": f"Bearer {tampered}"}
|
||||||
|
)
|
||||||
|
assert resp.status_code == 401
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_get_current_user_rejects_inactive_user(auth_client, db_session):
|
||||||
|
"""A valid token for an inactive user should return 401."""
|
||||||
|
from services.auth import create_access_token
|
||||||
|
|
||||||
|
user = await _create_user(db_session, role="user", is_active=False)
|
||||||
|
token = create_access_token(str(user.id), "user")
|
||||||
|
|
||||||
|
resp = await auth_client.get(
|
||||||
|
"/test/me", headers={"Authorization": f"Bearer {token}"}
|
||||||
|
)
|
||||||
|
assert resp.status_code == 401
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_get_current_admin_rejects_non_admin(auth_client, db_session):
|
||||||
|
"""A regular user (role='user') should get 403 on admin endpoint."""
|
||||||
|
from services.auth import create_access_token
|
||||||
|
|
||||||
|
user = await _create_user(db_session, role="user")
|
||||||
|
token = create_access_token(str(user.id), "user")
|
||||||
|
|
||||||
|
resp = await auth_client.get(
|
||||||
|
"/test/admin", headers={"Authorization": f"Bearer {token}"}
|
||||||
|
)
|
||||||
|
assert resp.status_code == 403
|
||||||
|
assert "Admin" in resp.json()["detail"]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_get_current_admin_allows_admin(auth_client, db_session):
|
||||||
|
"""An admin user (role='admin') should get 200 on admin endpoint."""
|
||||||
|
from services.auth import create_access_token
|
||||||
|
|
||||||
|
admin_user = await _create_user(db_session, role="admin")
|
||||||
|
token = create_access_token(str(admin_user.id), "admin")
|
||||||
|
|
||||||
|
resp = await auth_client.get(
|
||||||
|
"/test/admin", headers={"Authorization": f"Bearer {token}"}
|
||||||
|
)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
assert resp.json()["role"] == "admin"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_get_current_user_missing_token(auth_client):
|
||||||
|
"""Missing Authorization header should return 401 or 403 (HTTPBearer default)."""
|
||||||
|
resp = await auth_client.get("/test/me")
|
||||||
|
assert resp.status_code in (401, 403) # HTTPBearer raises 403 in older fastapi, 401 in newer
|
||||||
|
|
||||||
|
|
||||||
|
def test_deps_auth_has_http_403():
|
||||||
|
"""deps/auth.py must contain an HTTPException(status.HTTP_403_FORBIDDEN) check."""
|
||||||
|
import re
|
||||||
|
import os
|
||||||
|
path = os.path.join(os.path.dirname(__file__), "..", "deps", "auth.py")
|
||||||
|
with open(path) as f:
|
||||||
|
source = f.read()
|
||||||
|
assert re.search(r"HTTP_403_FORBIDDEN", source), \
|
||||||
|
"deps/auth.py must raise HTTP 403 for non-admin access"
|
||||||
Reference in New Issue
Block a user