Files
kite/backend/tests/test_auth_totp.py
T
curo1305 43e1d0145e feat(02-03): add TOTP setup/enable/disable, password reset, and frontend_url to config
- GET /api/auth/totp/setup: returns provisioning_uri + secret (400 if already enabled)
- POST /api/auth/totp/enable: rate-limited 10/min, verifies TOTP code with Redis replay prevention, returns 10 backup codes
- DELETE /api/auth/totp: disables TOTP, clears secret, deletes backup codes
- POST /api/auth/password-reset: always returns 202 (anti-enumeration), enqueues Celery email task
- POST /api/auth/password-reset/confirm: validates token, strength, HIBP; updates password; no auto-login (AUTH-05)
- config.py: added frontend_url setting for password reset link construction
- test_auth_totp.py: all 11 tests passing (GREEN)
2026-05-22 19:52:36 +02:00

304 lines
12 KiB
Python

"""
Tests for Plan 02-03 Task 1 — TOTP endpoints, password reset, logout-all.
Covers:
- GET /api/auth/totp/setup: returns provisioning_uri + secret; 400 if already enabled
- POST /api/auth/totp/enable: verifies code, returns backup_codes; rate limit 10/min
- DELETE /api/auth/totp: disables TOTP
- POST /api/auth/logout-all: revokes all refresh tokens
- POST /api/auth/password-reset: always returns 202 (anti-enumeration)
- POST /api/auth/password-reset/confirm: valid token + strong password → 200, no tokens
Uses async_client / authed_client fixtures from conftest.py.
FakeRedis in-process stand-in for app.state.redis.
"""
from __future__ import annotations
import uuid
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
import pytest_asyncio
from httpx import ASGITransport, AsyncClient
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from db.models import BackupCode, User
# ── Helpers ───────────────────────────────────────────────────────────────────
VALID_PASSWORD = "StrongPass12!"
async def _register_and_login(async_client, handle="totpuser", email="totp@example.com"):
"""Register a user, login, and return the access token."""
await async_client.post(
"/api/auth/register",
json={"handle": handle, "email": email, "password": VALID_PASSWORD},
)
resp = await async_client.post(
"/api/auth/login",
json={"email": email, "password": VALID_PASSWORD},
)
data = resp.json()
return data["access_token"]
# ── FakeRedis ─────────────────────────────────────────────────────────────────
class FakeRedis:
"""In-memory fake Redis for testing. Mirrors the one in test_auth_api.py."""
def __init__(self):
self._store: dict = {}
async def get(self, key):
entry = self._store.get(key)
if entry is None:
return None
val, exp = entry
if exp is not None and datetime.now(timezone.utc).timestamp() > exp:
del self._store[key]
return None
return val
async def incr(self, key):
entry = self._store.get(key)
if entry is None:
self._store[key] = (1, None)
return 1
val, exp = entry
new_val = val + 1
self._store[key] = (new_val, exp)
return new_val
async def expire(self, key, seconds):
if key in self._store:
val, _ = self._store[key]
deadline = datetime.now(timezone.utc).timestamp() + seconds
self._store[key] = (val, deadline)
async def set(self, key, value, ex=None):
deadline = None
if ex is not None:
deadline = datetime.now(timezone.utc).timestamp() + ex
self._store[key] = (value, deadline)
async def close(self):
pass
@pytest_asyncio.fixture
async def totp_client(db_session: AsyncSession):
"""Async HTTP test client with DB override AND fresh FakeRedis."""
from deps.db import get_db
from main import app
from api.auth import limiter as auth_limiter
app.dependency_overrides[get_db] = lambda: db_session
fake_redis = FakeRedis()
app.state.redis = fake_redis
# Reset slowapi in-memory storage to prevent IP counter bleed between tests
try:
auth_limiter._storage.reset()
except Exception:
pass
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c:
yield c
app.dependency_overrides.clear()
# ── Tests: TOTP setup ──────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_totp_setup_returns_uri(totp_client):
"""GET /api/auth/totp/setup → 200 with provisioning_uri and secret."""
token = await _register_and_login(totp_client)
resp = await totp_client.get(
"/api/auth/totp/setup",
headers={"Authorization": f"Bearer {token}"},
)
assert resp.status_code == 200, resp.text
data = resp.json()
assert "provisioning_uri" in data, f"Missing provisioning_uri in {data}"
assert "secret" in data, f"Missing secret in {data}"
assert "otpauth://" in data["provisioning_uri"]
@pytest.mark.asyncio
async def test_totp_setup_already_enabled(totp_client, db_session):
"""GET /api/auth/totp/setup returns 400 if totp_enabled=True."""
token = await _register_and_login(totp_client, handle="u2", email="u2@example.com")
# Manually set totp_enabled in the DB
result = await db_session.execute(select(User).where(User.email == "u2@example.com"))
user = result.scalar_one()
user.totp_enabled = True
await db_session.commit()
resp = await totp_client.get(
"/api/auth/totp/setup",
headers={"Authorization": f"Bearer {token}"},
)
assert resp.status_code == 400, resp.text
@pytest.mark.asyncio
async def test_totp_setup_requires_auth(totp_client):
"""GET /api/auth/totp/setup without Bearer → 401 or 403."""
resp = await totp_client.get("/api/auth/totp/setup")
assert resp.status_code in (401, 403), f"Expected 401/403, got {resp.status_code}"
# ── Tests: password reset (always 202) ────────────────────────────────────────
@pytest.mark.asyncio
async def test_password_reset_always_202_nonexistent(totp_client):
"""POST /api/auth/password-reset with non-existent email → 202 (anti-enumeration)."""
resp = await totp_client.post(
"/api/auth/password-reset",
json={"email": "nobody@example.com"}, # valid domain, non-existent user
)
assert resp.status_code == 202, resp.text
data = resp.json()
assert "message" in data
assert "access_token" not in data
@pytest.mark.asyncio
async def test_password_reset_always_202_existing(totp_client):
"""POST /api/auth/password-reset with existing email also returns 202."""
await totp_client.post(
"/api/auth/register",
json={"handle": "resetuser", "email": "reset@example.com", "password": VALID_PASSWORD},
)
# Patch the Celery task to avoid actually sending email in tests.
# The deferred import in the endpoint uses tasks.email_tasks.send_reset_email,
# so we patch it at its module source.
with patch("tasks.email_tasks.send_reset_email") as mock_task:
mock_task.delay = MagicMock()
resp = await totp_client.post(
"/api/auth/password-reset",
json={"email": "reset@example.com"},
)
assert resp.status_code == 202, resp.text
# ── Tests: password reset confirm ─────────────────────────────────────────────
@pytest.mark.asyncio
async def test_password_reset_confirm_invalid_token(totp_client):
"""POST /api/auth/password-reset/confirm with bad token → 400."""
resp = await totp_client.post(
"/api/auth/password-reset/confirm",
json={"token": "not.a.valid.token", "new_password": VALID_PASSWORD},
)
assert resp.status_code == 400, resp.text
@pytest.mark.asyncio
async def test_password_reset_confirm_weak_password(totp_client, db_session):
"""POST /api/auth/password-reset/confirm with valid token but weak password → 422."""
from services.auth import create_password_reset_token
# Register a user to get a valid user_id
resp = await totp_client.post(
"/api/auth/register",
json={"handle": "pwr_weak", "email": "pwr_weak@example.com", "password": VALID_PASSWORD},
)
assert resp.status_code == 201
user_id = resp.json()["id"]
token = create_password_reset_token(user_id)
reset_resp = await totp_client.post(
"/api/auth/password-reset/confirm",
json={"token": token, "new_password": "weak"},
)
assert reset_resp.status_code == 422, reset_resp.text
@pytest.mark.asyncio
async def test_password_reset_confirm_valid_no_autologin(totp_client, db_session):
"""POST /api/auth/password-reset/confirm valid token + strong password → 200, no access_token."""
from services.auth import create_password_reset_token
resp = await totp_client.post(
"/api/auth/register",
json={"handle": "pwr_ok", "email": "pwr_ok@example.com", "password": VALID_PASSWORD},
)
assert resp.status_code == 201
user_id = resp.json()["id"]
token = create_password_reset_token(user_id)
# No email is sent in confirm, but patch anyway to be safe
reset_resp = await totp_client.post(
"/api/auth/password-reset/confirm",
json={"token": token, "new_password": "AnotherStr0ng!Pass"},
)
assert reset_resp.status_code == 200, reset_resp.text
data = reset_resp.json()
assert "access_token" not in data, "Confirm endpoint must not issue tokens (AUTH-05)"
assert "Please sign in" in data.get("message", "")
# ── Tests: logout-all ─────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_logout_all_revokes_tokens(totp_client):
"""POST /api/auth/logout-all with Bearer → 200."""
token = await _register_and_login(totp_client, handle="logoutall", email="logoutall@example.com")
resp = await totp_client.post(
"/api/auth/logout-all",
headers={"Authorization": f"Bearer {token}"},
)
assert resp.status_code == 200, resp.text
# The endpoint returns "Signed out of X session(s)" or "All sessions revoked"
message = resp.json().get("message", "").lower()
assert "session" in message or "revoked" in message, f"Unexpected message: {message}"
@pytest.mark.asyncio
async def test_logout_all_requires_auth(totp_client):
"""POST /api/auth/logout-all without Bearer → 401 or 403."""
resp = await totp_client.post("/api/auth/logout-all")
assert resp.status_code in (401, 403), f"Expected 401/403, got {resp.status_code}"
# ── Tests: TOTP enable rate limit ─────────────────────────────────────────────
@pytest.mark.asyncio
async def test_totp_enable_rate_limit(totp_client):
"""POST /api/auth/totp/enable → 429 after 10 calls within 60 seconds."""
token = await _register_and_login(totp_client, handle="ratelimit", email="rl@example.com")
# First, call setup to get a secret provisioned
await totp_client.get(
"/api/auth/totp/setup",
headers={"Authorization": f"Bearer {token}"},
)
# Make 10 calls with wrong code (should be 400 each)
for i in range(10):
resp = await totp_client.post(
"/api/auth/totp/enable",
json={"code": f"{i:06d}"}, # wrong codes
headers={"Authorization": f"Bearer {token}"},
)
# First 10 should NOT be 429
assert resp.status_code != 429, f"Got 429 too early at attempt {i+1}"
# 11th call should be 429
resp = await totp_client.post(
"/api/auth/totp/enable",
json={"code": "000000"},
headers={"Authorization": f"Bearer {token}"},
)
assert resp.status_code == 429, f"Expected 429 on 11th call, got {resp.status_code}"