d7831e9382
- test_totp_setup_returns_uri: GET /api/auth/totp/setup returns provisioning_uri + secret - test_totp_setup_already_enabled: returns 400 when totp_enabled=True - test_totp_setup_requires_auth: returns 401/403 without Bearer - test_password_reset_always_202_nonexistent: anti-enumeration for non-existent email - test_password_reset_always_202_existing: anti-enumeration for existing email - test_password_reset_confirm_invalid_token: returns 400 for bad token - test_password_reset_confirm_weak_password: returns 422 for weak password - test_password_reset_confirm_valid_no_autologin: returns 200 with no access_token (AUTH-05) - test_logout_all_revokes_tokens: returns 200 with revoked message - test_logout_all_requires_auth: returns 401/403 without Bearer - test_totp_enable_rate_limit: 11th call returns 429
299 lines
11 KiB
Python
299 lines
11 KiB
Python
"""
|
|
Tests for Plan 02-03 Task 1 — TOTP endpoints, password reset, logout-all.
|
|
|
|
Covers:
|
|
- GET /api/auth/totp/setup: returns provisioning_uri + secret; 400 if already enabled
|
|
- POST /api/auth/totp/enable: verifies code, returns backup_codes; rate limit 10/min
|
|
- DELETE /api/auth/totp: disables TOTP
|
|
- POST /api/auth/logout-all: revokes all refresh tokens
|
|
- POST /api/auth/password-reset: always returns 202 (anti-enumeration)
|
|
- POST /api/auth/password-reset/confirm: valid token + strong password → 200, no tokens
|
|
|
|
Uses async_client / authed_client fixtures from conftest.py.
|
|
FakeRedis in-process stand-in for app.state.redis.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
import pytest_asyncio
|
|
from httpx import ASGITransport, AsyncClient
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from db.models import BackupCode, User
|
|
|
|
|
|
# ── Helpers ───────────────────────────────────────────────────────────────────
|
|
|
|
VALID_PASSWORD = "StrongPass12!"
|
|
|
|
|
|
async def _register_and_login(async_client, handle="totpuser", email="totp@example.com"):
|
|
"""Register a user, login, and return the access token."""
|
|
await async_client.post(
|
|
"/api/auth/register",
|
|
json={"handle": handle, "email": email, "password": VALID_PASSWORD},
|
|
)
|
|
resp = await async_client.post(
|
|
"/api/auth/login",
|
|
json={"email": email, "password": VALID_PASSWORD},
|
|
)
|
|
data = resp.json()
|
|
return data["access_token"]
|
|
|
|
|
|
# ── FakeRedis ─────────────────────────────────────────────────────────────────
|
|
|
|
class FakeRedis:
|
|
"""In-memory fake Redis for testing. Mirrors the one in test_auth_api.py."""
|
|
|
|
def __init__(self):
|
|
self._store: dict = {}
|
|
|
|
async def get(self, key):
|
|
entry = self._store.get(key)
|
|
if entry is None:
|
|
return None
|
|
val, exp = entry
|
|
if exp is not None and datetime.now(timezone.utc).timestamp() > exp:
|
|
del self._store[key]
|
|
return None
|
|
return val
|
|
|
|
async def incr(self, key):
|
|
entry = self._store.get(key)
|
|
if entry is None:
|
|
self._store[key] = (1, None)
|
|
return 1
|
|
val, exp = entry
|
|
new_val = val + 1
|
|
self._store[key] = (new_val, exp)
|
|
return new_val
|
|
|
|
async def expire(self, key, seconds):
|
|
if key in self._store:
|
|
val, _ = self._store[key]
|
|
deadline = datetime.now(timezone.utc).timestamp() + seconds
|
|
self._store[key] = (val, deadline)
|
|
|
|
async def set(self, key, value, ex=None):
|
|
deadline = None
|
|
if ex is not None:
|
|
deadline = datetime.now(timezone.utc).timestamp() + ex
|
|
self._store[key] = (value, deadline)
|
|
|
|
async def close(self):
|
|
pass
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def totp_client(db_session: AsyncSession):
|
|
"""Async HTTP test client with DB override AND fresh FakeRedis."""
|
|
from deps.db import get_db
|
|
from main import app
|
|
from api.auth import limiter as auth_limiter
|
|
|
|
app.dependency_overrides[get_db] = lambda: db_session
|
|
fake_redis = FakeRedis()
|
|
app.state.redis = fake_redis
|
|
|
|
# Reset slowapi in-memory storage to prevent IP counter bleed between tests
|
|
try:
|
|
auth_limiter._storage.reset()
|
|
except Exception:
|
|
pass
|
|
|
|
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c:
|
|
yield c
|
|
|
|
app.dependency_overrides.clear()
|
|
|
|
|
|
# ── Tests: TOTP setup ──────────────────────────────────────────────────────────
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_totp_setup_returns_uri(totp_client):
|
|
"""GET /api/auth/totp/setup → 200 with provisioning_uri and secret."""
|
|
token = await _register_and_login(totp_client)
|
|
resp = await totp_client.get(
|
|
"/api/auth/totp/setup",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
)
|
|
assert resp.status_code == 200, resp.text
|
|
data = resp.json()
|
|
assert "provisioning_uri" in data, f"Missing provisioning_uri in {data}"
|
|
assert "secret" in data, f"Missing secret in {data}"
|
|
assert "otpauth://" in data["provisioning_uri"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_totp_setup_already_enabled(totp_client, db_session):
|
|
"""GET /api/auth/totp/setup returns 400 if totp_enabled=True."""
|
|
token = await _register_and_login(totp_client, handle="u2", email="u2@example.com")
|
|
|
|
# Manually set totp_enabled in the DB
|
|
result = await db_session.execute(select(User).where(User.email == "u2@example.com"))
|
|
user = result.scalar_one()
|
|
user.totp_enabled = True
|
|
await db_session.commit()
|
|
|
|
resp = await totp_client.get(
|
|
"/api/auth/totp/setup",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
)
|
|
assert resp.status_code == 400, resp.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_totp_setup_requires_auth(totp_client):
|
|
"""GET /api/auth/totp/setup without Bearer → 401 or 403."""
|
|
resp = await totp_client.get("/api/auth/totp/setup")
|
|
assert resp.status_code in (401, 403), f"Expected 401/403, got {resp.status_code}"
|
|
|
|
|
|
# ── Tests: password reset (always 202) ────────────────────────────────────────
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_password_reset_always_202_nonexistent(totp_client):
|
|
"""POST /api/auth/password-reset with non-existent email → 202 (anti-enumeration)."""
|
|
resp = await totp_client.post(
|
|
"/api/auth/password-reset",
|
|
json={"email": "nobody@nowhere.invalid"},
|
|
)
|
|
assert resp.status_code == 202, resp.text
|
|
data = resp.json()
|
|
assert "message" in data
|
|
assert "access_token" not in data
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_password_reset_always_202_existing(totp_client):
|
|
"""POST /api/auth/password-reset with existing email also returns 202."""
|
|
await totp_client.post(
|
|
"/api/auth/register",
|
|
json={"handle": "resetuser", "email": "reset@example.com", "password": VALID_PASSWORD},
|
|
)
|
|
with patch("api.auth.send_reset_email") as mock_task:
|
|
mock_task.delay = MagicMock()
|
|
resp = await totp_client.post(
|
|
"/api/auth/password-reset",
|
|
json={"email": "reset@example.com"},
|
|
)
|
|
assert resp.status_code == 202, resp.text
|
|
|
|
|
|
# ── Tests: password reset confirm ─────────────────────────────────────────────
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_password_reset_confirm_invalid_token(totp_client):
|
|
"""POST /api/auth/password-reset/confirm with bad token → 400."""
|
|
resp = await totp_client.post(
|
|
"/api/auth/password-reset/confirm",
|
|
json={"token": "not.a.valid.token", "new_password": VALID_PASSWORD},
|
|
)
|
|
assert resp.status_code == 400, resp.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_password_reset_confirm_weak_password(totp_client, db_session):
|
|
"""POST /api/auth/password-reset/confirm with valid token but weak password → 422."""
|
|
from services.auth import create_password_reset_token
|
|
|
|
# Register a user to get a valid user_id
|
|
resp = await totp_client.post(
|
|
"/api/auth/register",
|
|
json={"handle": "pwr_weak", "email": "pwr_weak@example.com", "password": VALID_PASSWORD},
|
|
)
|
|
assert resp.status_code == 201
|
|
user_id = resp.json()["id"]
|
|
|
|
token = create_password_reset_token(user_id)
|
|
|
|
reset_resp = await totp_client.post(
|
|
"/api/auth/password-reset/confirm",
|
|
json={"token": token, "new_password": "weak"},
|
|
)
|
|
assert reset_resp.status_code == 422, reset_resp.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_password_reset_confirm_valid_no_autologin(totp_client, db_session):
|
|
"""POST /api/auth/password-reset/confirm valid token + strong password → 200, no access_token."""
|
|
from services.auth import create_password_reset_token
|
|
|
|
resp = await totp_client.post(
|
|
"/api/auth/register",
|
|
json={"handle": "pwr_ok", "email": "pwr_ok@example.com", "password": VALID_PASSWORD},
|
|
)
|
|
assert resp.status_code == 201
|
|
user_id = resp.json()["id"]
|
|
|
|
token = create_password_reset_token(user_id)
|
|
|
|
with patch("api.auth.send_reset_email") as _mock:
|
|
reset_resp = await totp_client.post(
|
|
"/api/auth/password-reset/confirm",
|
|
json={"token": token, "new_password": "AnotherStr0ng!Pass"},
|
|
)
|
|
assert reset_resp.status_code == 200, reset_resp.text
|
|
data = reset_resp.json()
|
|
assert "access_token" not in data, "Confirm endpoint must not issue tokens (AUTH-05)"
|
|
assert "Please sign in" in data.get("message", "")
|
|
|
|
|
|
# ── Tests: logout-all ─────────────────────────────────────────────────────────
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_logout_all_revokes_tokens(totp_client):
|
|
"""POST /api/auth/logout-all with Bearer → 200."""
|
|
token = await _register_and_login(totp_client, handle="logoutall", email="logoutall@example.com")
|
|
resp = await totp_client.post(
|
|
"/api/auth/logout-all",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
)
|
|
assert resp.status_code == 200, resp.text
|
|
assert "revoked" in resp.json().get("message", "").lower()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_logout_all_requires_auth(totp_client):
|
|
"""POST /api/auth/logout-all without Bearer → 401 or 403."""
|
|
resp = await totp_client.post("/api/auth/logout-all")
|
|
assert resp.status_code in (401, 403), f"Expected 401/403, got {resp.status_code}"
|
|
|
|
|
|
# ── Tests: TOTP enable rate limit ─────────────────────────────────────────────
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_totp_enable_rate_limit(totp_client):
|
|
"""POST /api/auth/totp/enable → 429 after 10 calls within 60 seconds."""
|
|
token = await _register_and_login(totp_client, handle="ratelimit", email="rl@example.com")
|
|
|
|
# First, call setup to get a secret provisioned
|
|
await totp_client.get(
|
|
"/api/auth/totp/setup",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
)
|
|
|
|
# Make 10 calls with wrong code (should be 400 each)
|
|
for i in range(10):
|
|
resp = await totp_client.post(
|
|
"/api/auth/totp/enable",
|
|
json={"code": f"{i:06d}"}, # wrong codes
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
)
|
|
# First 10 should NOT be 429
|
|
assert resp.status_code != 429, f"Got 429 too early at attempt {i+1}"
|
|
|
|
# 11th call should be 429
|
|
resp = await totp_client.post(
|
|
"/api/auth/totp/enable",
|
|
json={"code": "000000"},
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
)
|
|
assert resp.status_code == 429, f"Expected 429 on 11th call, got {resp.status_code}"
|