diff --git a/backend/deps/auth.py b/backend/deps/auth.py new file mode 100644 index 0000000..7a48bf8 --- /dev/null +++ b/backend/deps/auth.py @@ -0,0 +1,92 @@ +""" +FastAPI authentication dependency chain for DocuVault. + +Provides two reusable FastAPI dependencies: + - get_current_user: validates the Bearer JWT and returns the User ORM object + - get_current_admin: requires user.role == 'admin' (T-02-07) + +Usage in route handlers: + from deps.auth import get_current_user, get_current_admin + from db.models import User + + @router.get("/me") + async def get_me(current_user: User = Depends(get_current_user)): + return {"id": str(current_user.id), "email": current_user.email} + + @router.get("/admin/users") + async def list_users( + _admin: User = Depends(get_current_admin), + session: AsyncSession = Depends(get_db), + ): + ... +""" +import uuid + +from fastapi import Depends, HTTPException, status +from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer +from sqlalchemy.ext.asyncio import AsyncSession + +from db.models import User +from deps.db import get_db +from services import auth as auth_service + +# HTTPBearer parses the Authorization: Bearer header. +# auto_error=True (default) raises 403 if no Authorization header is present. +security = HTTPBearer() + + +async def get_current_user( + credentials: HTTPAuthorizationCredentials = Depends(security), + session: AsyncSession = Depends(get_db), +) -> User: + """Validate the Bearer JWT and return the active User ORM object. + + Raises HTTP 401 if: + - Token is missing, expired, or tampered (handled by HTTPBearer + decode) + - User does not exist in the database + - User account is deactivated (is_active=False) + """ + try: + payload = auth_service.decode_access_token(credentials.credentials) + except ValueError as exc: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid or expired token", + headers={"WWW-Authenticate": "Bearer"}, + ) from exc + + try: + user_uuid = uuid.UUID(payload["sub"]) + except (KeyError, ValueError) as exc: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid token subject", + headers={"WWW-Authenticate": "Bearer"}, + ) from exc + + user = await session.get(User, user_uuid) + if user is None or not user.is_active: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="User not found or deactivated", + headers={"WWW-Authenticate": "Bearer"}, + ) + + return user + + +async def get_current_admin( + user: User = Depends(get_current_user), +) -> User: + """Require admin role; raises HTTP 403 otherwise (T-02-07). + + Admin impersonation is architecturally excluded (ADMIN-07, T-02-08): + no code path sets a JWT sub to a different user's id. This dependency + only checks that the authenticated user's role is 'admin'. + """ + if user.role != "admin": + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Admin access required", + ) + return user diff --git a/backend/tests/test_auth_deps.py b/backend/tests/test_auth_deps.py new file mode 100644 index 0000000..fd133fa --- /dev/null +++ b/backend/tests/test_auth_deps.py @@ -0,0 +1,160 @@ +""" +Tests for backend/deps/auth.py — FastAPI dependency chain. + +Tests verify: + - get_current_user returns the User ORM object when a valid Bearer token is provided + - get_current_user raises HTTP 401 for expired/tampered tokens + - get_current_user raises HTTP 401 when user.is_active is False + - get_current_admin raises HTTP 403 when user.role == "user" + - get_current_admin returns user when user.role == "admin" +""" +import uuid + +import pytest +import pytest_asyncio +from httpx import ASGITransport, AsyncClient +from fastapi import FastAPI, Depends +from sqlalchemy.ext.asyncio import AsyncSession + + +# ── Minimal test app with /test/me and /test/admin routes ───────────────────── + +def make_test_app(): + """Create a minimal FastAPI app that exercises the auth deps.""" + from deps.auth import get_current_user, get_current_admin + from db.models import User + + test_app = FastAPI() + + @test_app.get("/test/me") + async def get_me(current_user: User = Depends(get_current_user)): + return {"id": str(current_user.id), "role": current_user.role} + + @test_app.get("/test/admin") + async def admin_only(_admin: User = Depends(get_current_admin)): + return {"role": _admin.role} + + return test_app + + +@pytest_asyncio.fixture +async def auth_client(db_session: AsyncSession): + """Async HTTP test client for the auth-dep test app.""" + from deps.db import get_db + + app = make_test_app() + app.dependency_overrides[get_db] = lambda: db_session + + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c: + yield c + + app.dependency_overrides.clear() + + +async def _create_user(db_session, role: str = "user", is_active: bool = True): + """Helper: insert a minimal User row into the test DB.""" + from db.models import User + from services.auth import hash_password + + user = User( + id=uuid.uuid4(), + handle=f"user_{uuid.uuid4().hex[:6]}", + email=f"{uuid.uuid4().hex[:6]}@example.com", + password_hash=hash_password("testpassword"), + role=role, + is_active=is_active, + ) + db_session.add(user) + await db_session.commit() + return user + + +# ── Tests ──────────────────────────────────────────────────────────────────── + +@pytest.mark.asyncio +async def test_get_current_user_returns_user(auth_client, db_session): + """A valid Bearer token should return the user's id and role.""" + from services.auth import create_access_token + + user = await _create_user(db_session, role="user") + token = create_access_token(str(user.id), "user") + + resp = await auth_client.get( + "/test/me", headers={"Authorization": f"Bearer {token}"} + ) + assert resp.status_code == 200 + data = resp.json() + assert data["id"] == str(user.id) + assert data["role"] == "user" + + +@pytest.mark.asyncio +async def test_get_current_user_rejects_tampered_token(auth_client): + """A tampered Bearer token should return 401.""" + tampered = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJmYWtlIn0.invalidsig" + resp = await auth_client.get( + "/test/me", headers={"Authorization": f"Bearer {tampered}"} + ) + assert resp.status_code == 401 + + +@pytest.mark.asyncio +async def test_get_current_user_rejects_inactive_user(auth_client, db_session): + """A valid token for an inactive user should return 401.""" + from services.auth import create_access_token + + user = await _create_user(db_session, role="user", is_active=False) + token = create_access_token(str(user.id), "user") + + resp = await auth_client.get( + "/test/me", headers={"Authorization": f"Bearer {token}"} + ) + assert resp.status_code == 401 + + +@pytest.mark.asyncio +async def test_get_current_admin_rejects_non_admin(auth_client, db_session): + """A regular user (role='user') should get 403 on admin endpoint.""" + from services.auth import create_access_token + + user = await _create_user(db_session, role="user") + token = create_access_token(str(user.id), "user") + + resp = await auth_client.get( + "/test/admin", headers={"Authorization": f"Bearer {token}"} + ) + assert resp.status_code == 403 + assert "Admin" in resp.json()["detail"] + + +@pytest.mark.asyncio +async def test_get_current_admin_allows_admin(auth_client, db_session): + """An admin user (role='admin') should get 200 on admin endpoint.""" + from services.auth import create_access_token + + admin_user = await _create_user(db_session, role="admin") + token = create_access_token(str(admin_user.id), "admin") + + resp = await auth_client.get( + "/test/admin", headers={"Authorization": f"Bearer {token}"} + ) + assert resp.status_code == 200 + assert resp.json()["role"] == "admin" + + +@pytest.mark.asyncio +async def test_get_current_user_missing_token(auth_client): + """Missing Authorization header should return 401 or 403 (HTTPBearer default).""" + resp = await auth_client.get("/test/me") + assert resp.status_code in (401, 403) # HTTPBearer raises 403 in older fastapi, 401 in newer + + +def test_deps_auth_has_http_403(): + """deps/auth.py must contain an HTTPException(status.HTTP_403_FORBIDDEN) check.""" + import re + import os + path = os.path.join(os.path.dirname(__file__), "..", "deps", "auth.py") + with open(path) as f: + source = f.read() + assert re.search(r"HTTP_403_FORBIDDEN", source), \ + "deps/auth.py must raise HTTP 403 for non-admin access"