From d7831e938291ce3f722065f6ecf2b6d00e2e241d Mon Sep 17 00:00:00 2001 From: curo1305 Date: Fri, 22 May 2026 19:50:51 +0200 Subject: [PATCH] test(02-03): add failing tests for TOTP endpoints, password reset, logout-all - test_totp_setup_returns_uri: GET /api/auth/totp/setup returns provisioning_uri + secret - test_totp_setup_already_enabled: returns 400 when totp_enabled=True - test_totp_setup_requires_auth: returns 401/403 without Bearer - test_password_reset_always_202_nonexistent: anti-enumeration for non-existent email - test_password_reset_always_202_existing: anti-enumeration for existing email - test_password_reset_confirm_invalid_token: returns 400 for bad token - test_password_reset_confirm_weak_password: returns 422 for weak password - test_password_reset_confirm_valid_no_autologin: returns 200 with no access_token (AUTH-05) - test_logout_all_revokes_tokens: returns 200 with revoked message - test_logout_all_requires_auth: returns 401/403 without Bearer - test_totp_enable_rate_limit: 11th call returns 429 --- backend/tests/test_auth_totp.py | 298 ++++++++++++++++++++++++++++++++ 1 file changed, 298 insertions(+) create mode 100644 backend/tests/test_auth_totp.py diff --git a/backend/tests/test_auth_totp.py b/backend/tests/test_auth_totp.py new file mode 100644 index 0000000..6fa771a --- /dev/null +++ b/backend/tests/test_auth_totp.py @@ -0,0 +1,298 @@ +""" +Tests for Plan 02-03 Task 1 — TOTP endpoints, password reset, logout-all. + +Covers: + - GET /api/auth/totp/setup: returns provisioning_uri + secret; 400 if already enabled + - POST /api/auth/totp/enable: verifies code, returns backup_codes; rate limit 10/min + - DELETE /api/auth/totp: disables TOTP + - POST /api/auth/logout-all: revokes all refresh tokens + - POST /api/auth/password-reset: always returns 202 (anti-enumeration) + - POST /api/auth/password-reset/confirm: valid token + strong password → 200, no tokens + +Uses async_client / authed_client fixtures from conftest.py. +FakeRedis in-process stand-in for app.state.redis. +""" +from __future__ import annotations + +import uuid +from datetime import datetime, timezone +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +import pytest_asyncio +from httpx import ASGITransport, AsyncClient +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from db.models import BackupCode, User + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + +VALID_PASSWORD = "StrongPass12!" + + +async def _register_and_login(async_client, handle="totpuser", email="totp@example.com"): + """Register a user, login, and return the access token.""" + await async_client.post( + "/api/auth/register", + json={"handle": handle, "email": email, "password": VALID_PASSWORD}, + ) + resp = await async_client.post( + "/api/auth/login", + json={"email": email, "password": VALID_PASSWORD}, + ) + data = resp.json() + return data["access_token"] + + +# ── FakeRedis ───────────────────────────────────────────────────────────────── + +class FakeRedis: + """In-memory fake Redis for testing. Mirrors the one in test_auth_api.py.""" + + def __init__(self): + self._store: dict = {} + + async def get(self, key): + entry = self._store.get(key) + if entry is None: + return None + val, exp = entry + if exp is not None and datetime.now(timezone.utc).timestamp() > exp: + del self._store[key] + return None + return val + + async def incr(self, key): + entry = self._store.get(key) + if entry is None: + self._store[key] = (1, None) + return 1 + val, exp = entry + new_val = val + 1 + self._store[key] = (new_val, exp) + return new_val + + async def expire(self, key, seconds): + if key in self._store: + val, _ = self._store[key] + deadline = datetime.now(timezone.utc).timestamp() + seconds + self._store[key] = (val, deadline) + + async def set(self, key, value, ex=None): + deadline = None + if ex is not None: + deadline = datetime.now(timezone.utc).timestamp() + ex + self._store[key] = (value, deadline) + + async def close(self): + pass + + +@pytest_asyncio.fixture +async def totp_client(db_session: AsyncSession): + """Async HTTP test client with DB override AND fresh FakeRedis.""" + from deps.db import get_db + from main import app + from api.auth import limiter as auth_limiter + + app.dependency_overrides[get_db] = lambda: db_session + fake_redis = FakeRedis() + app.state.redis = fake_redis + + # Reset slowapi in-memory storage to prevent IP counter bleed between tests + try: + auth_limiter._storage.reset() + except Exception: + pass + + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c: + yield c + + app.dependency_overrides.clear() + + +# ── Tests: TOTP setup ────────────────────────────────────────────────────────── + +@pytest.mark.asyncio +async def test_totp_setup_returns_uri(totp_client): + """GET /api/auth/totp/setup → 200 with provisioning_uri and secret.""" + token = await _register_and_login(totp_client) + resp = await totp_client.get( + "/api/auth/totp/setup", + headers={"Authorization": f"Bearer {token}"}, + ) + assert resp.status_code == 200, resp.text + data = resp.json() + assert "provisioning_uri" in data, f"Missing provisioning_uri in {data}" + assert "secret" in data, f"Missing secret in {data}" + assert "otpauth://" in data["provisioning_uri"] + + +@pytest.mark.asyncio +async def test_totp_setup_already_enabled(totp_client, db_session): + """GET /api/auth/totp/setup returns 400 if totp_enabled=True.""" + token = await _register_and_login(totp_client, handle="u2", email="u2@example.com") + + # Manually set totp_enabled in the DB + result = await db_session.execute(select(User).where(User.email == "u2@example.com")) + user = result.scalar_one() + user.totp_enabled = True + await db_session.commit() + + resp = await totp_client.get( + "/api/auth/totp/setup", + headers={"Authorization": f"Bearer {token}"}, + ) + assert resp.status_code == 400, resp.text + + +@pytest.mark.asyncio +async def test_totp_setup_requires_auth(totp_client): + """GET /api/auth/totp/setup without Bearer → 401 or 403.""" + resp = await totp_client.get("/api/auth/totp/setup") + assert resp.status_code in (401, 403), f"Expected 401/403, got {resp.status_code}" + + +# ── Tests: password reset (always 202) ──────────────────────────────────────── + +@pytest.mark.asyncio +async def test_password_reset_always_202_nonexistent(totp_client): + """POST /api/auth/password-reset with non-existent email → 202 (anti-enumeration).""" + resp = await totp_client.post( + "/api/auth/password-reset", + json={"email": "nobody@nowhere.invalid"}, + ) + assert resp.status_code == 202, resp.text + data = resp.json() + assert "message" in data + assert "access_token" not in data + + +@pytest.mark.asyncio +async def test_password_reset_always_202_existing(totp_client): + """POST /api/auth/password-reset with existing email also returns 202.""" + await totp_client.post( + "/api/auth/register", + json={"handle": "resetuser", "email": "reset@example.com", "password": VALID_PASSWORD}, + ) + with patch("api.auth.send_reset_email") as mock_task: + mock_task.delay = MagicMock() + resp = await totp_client.post( + "/api/auth/password-reset", + json={"email": "reset@example.com"}, + ) + assert resp.status_code == 202, resp.text + + +# ── Tests: password reset confirm ───────────────────────────────────────────── + +@pytest.mark.asyncio +async def test_password_reset_confirm_invalid_token(totp_client): + """POST /api/auth/password-reset/confirm with bad token → 400.""" + resp = await totp_client.post( + "/api/auth/password-reset/confirm", + json={"token": "not.a.valid.token", "new_password": VALID_PASSWORD}, + ) + assert resp.status_code == 400, resp.text + + +@pytest.mark.asyncio +async def test_password_reset_confirm_weak_password(totp_client, db_session): + """POST /api/auth/password-reset/confirm with valid token but weak password → 422.""" + from services.auth import create_password_reset_token + + # Register a user to get a valid user_id + resp = await totp_client.post( + "/api/auth/register", + json={"handle": "pwr_weak", "email": "pwr_weak@example.com", "password": VALID_PASSWORD}, + ) + assert resp.status_code == 201 + user_id = resp.json()["id"] + + token = create_password_reset_token(user_id) + + reset_resp = await totp_client.post( + "/api/auth/password-reset/confirm", + json={"token": token, "new_password": "weak"}, + ) + assert reset_resp.status_code == 422, reset_resp.text + + +@pytest.mark.asyncio +async def test_password_reset_confirm_valid_no_autologin(totp_client, db_session): + """POST /api/auth/password-reset/confirm valid token + strong password → 200, no access_token.""" + from services.auth import create_password_reset_token + + resp = await totp_client.post( + "/api/auth/register", + json={"handle": "pwr_ok", "email": "pwr_ok@example.com", "password": VALID_PASSWORD}, + ) + assert resp.status_code == 201 + user_id = resp.json()["id"] + + token = create_password_reset_token(user_id) + + with patch("api.auth.send_reset_email") as _mock: + reset_resp = await totp_client.post( + "/api/auth/password-reset/confirm", + json={"token": token, "new_password": "AnotherStr0ng!Pass"}, + ) + assert reset_resp.status_code == 200, reset_resp.text + data = reset_resp.json() + assert "access_token" not in data, "Confirm endpoint must not issue tokens (AUTH-05)" + assert "Please sign in" in data.get("message", "") + + +# ── Tests: logout-all ───────────────────────────────────────────────────────── + +@pytest.mark.asyncio +async def test_logout_all_revokes_tokens(totp_client): + """POST /api/auth/logout-all with Bearer → 200.""" + token = await _register_and_login(totp_client, handle="logoutall", email="logoutall@example.com") + resp = await totp_client.post( + "/api/auth/logout-all", + headers={"Authorization": f"Bearer {token}"}, + ) + assert resp.status_code == 200, resp.text + assert "revoked" in resp.json().get("message", "").lower() + + +@pytest.mark.asyncio +async def test_logout_all_requires_auth(totp_client): + """POST /api/auth/logout-all without Bearer → 401 or 403.""" + resp = await totp_client.post("/api/auth/logout-all") + assert resp.status_code in (401, 403), f"Expected 401/403, got {resp.status_code}" + + +# ── Tests: TOTP enable rate limit ───────────────────────────────────────────── + +@pytest.mark.asyncio +async def test_totp_enable_rate_limit(totp_client): + """POST /api/auth/totp/enable → 429 after 10 calls within 60 seconds.""" + token = await _register_and_login(totp_client, handle="ratelimit", email="rl@example.com") + + # First, call setup to get a secret provisioned + await totp_client.get( + "/api/auth/totp/setup", + headers={"Authorization": f"Bearer {token}"}, + ) + + # Make 10 calls with wrong code (should be 400 each) + for i in range(10): + resp = await totp_client.post( + "/api/auth/totp/enable", + json={"code": f"{i:06d}"}, # wrong codes + headers={"Authorization": f"Bearer {token}"}, + ) + # First 10 should NOT be 429 + assert resp.status_code != 429, f"Got 429 too early at attempt {i+1}" + + # 11th call should be 429 + resp = await totp_client.post( + "/api/auth/totp/enable", + json={"code": "000000"}, + headers={"Authorization": f"Bearer {token}"}, + ) + assert resp.status_code == 429, f"Expected 429 on 11th call, got {resp.status_code}"