""" AUTH-08: TOTP replay prevention — same valid code used twice within 90 seconds must be rejected on the second use. This is a distinct behavioral gap from the rate-limit test (11 calls → 429). The replay test: one valid code → second use of the *same* code → rejected. Uses the FakeRedis from test_auth_totp.py pattern to keep tests hermetic. """ from __future__ import annotations from datetime import datetime, timezone from unittest.mock import MagicMock import pytest import pytest_asyncio import pyotp from httpx import ASGITransport, AsyncClient from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from db.models import User # ── FakeRedis (mirrors test_auth_totp.py) ──────────────────────────────────── class FakeRedis: """In-process fake Redis: stores keys with optional TTL expiry.""" def __init__(self): self._store: dict = {} async def get(self, key): entry = self._store.get(key) if entry is None: return None val, exp = entry if exp is not None and datetime.now(timezone.utc).timestamp() > exp: del self._store[key] return None return val async def incr(self, key): entry = self._store.get(key) if entry is None: self._store[key] = (1, None) return 1 val, exp = entry new_val = val + 1 self._store[key] = (new_val, exp) return new_val async def expire(self, key, seconds): if key in self._store: val, _ = self._store[key] deadline = datetime.now(timezone.utc).timestamp() + seconds self._store[key] = (val, deadline) async def set(self, key, value, ex=None): deadline = None if ex is not None: deadline = datetime.now(timezone.utc).timestamp() + ex self._store[key] = (value, deadline) async def close(self): pass VALID_PASSWORD = "StrongPass12!" @pytest_asyncio.fixture async def totp_replay_client(db_session: AsyncSession): """Async test client with DB override and fresh FakeRedis.""" from deps.db import get_db from main import app from api.auth import limiter as auth_limiter app.dependency_overrides[get_db] = lambda: db_session fake_redis = FakeRedis() app.state.redis = fake_redis try: auth_limiter._storage.reset() except Exception: pass async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c: yield c app.dependency_overrides.clear() async def _setup_totp_user(client, db_session, handle="replayuser", email="replay@example.com"): """Register a user, provision TOTP, and return (access_token, totp_secret, fake_redis). Directly manipulates the DB to set totp_enabled=True without consuming any TOTP code via the API — this leaves the Redis replay-prevention store clean, so the first login attempt with a fresh code has never been marked used. """ from sqlalchemy import select as _select from services.auth import hash_password as _hash from db.models import User, Quota import uuid as _uuid import pyotp as _pyotp # Create user directly in the DB (avoids HIBP check in test environment) user_id = _uuid.uuid4() secret = _pyotp.random_base32() user = User( id=user_id, handle=handle, email=email, password_hash=_hash(VALID_PASSWORD), role="user", is_active=True, password_must_change=False, totp_enabled=True, totp_secret=secret, ) quota = Quota(user_id=user_id, limit_bytes=104857600, used_bytes=0) db_session.add(user) db_session.add(quota) await db_session.commit() # Obtain an access token for this user without going through login from services.auth import create_access_token as _create_token token = _create_token(str(user_id), "user") return token, secret # ── Tests ───────────────────────────────────────────────────────────────────── @pytest.mark.asyncio async def test_totp_replay_rejected_same_code(totp_replay_client, db_session): """AUTH-08: Same TOTP code used twice must be rejected on second use. First login with TOTP succeeds. The second login with the *identical* code within the same 90-second window must return 401 (code already used). """ client = totp_replay_client _token, secret = await _setup_totp_user(client, db_session) # First login step: password only — get requires_totp challenge r1 = await client.post( "/api/auth/login", json={"email": "replay@example.com", "password": VALID_PASSWORD}, ) assert r1.status_code == 200, f"First login step failed: {r1.text}" # Should ask for TOTP assert r1.json().get("requires_totp") is True, ( f"Expected TOTP challenge, got: {r1.json()}" ) # Get a valid TOTP code totp = pyotp.TOTP(secret) valid_code = totp.now() # First use of the code: must succeed and return access_token r2 = await client.post( "/api/auth/login", json={ "email": "replay@example.com", "password": VALID_PASSWORD, "totp_code": valid_code, }, ) assert r2.status_code == 200, f"First TOTP use failed: {r2.text}" assert "access_token" in r2.json(), ( f"Expected access_token on first TOTP use, got: {r2.json()}" ) # Second use of the SAME code: must be rejected (Redis replay prevention) r3 = await client.post( "/api/auth/login", json={ "email": "replay@example.com", "password": VALID_PASSWORD, "totp_code": valid_code, # identical code — replay attempt }, ) assert r3.status_code == 401, ( f"AUTH-08 VIOLATED: Second use of same TOTP code within 90s window was accepted " f"(status {r3.status_code}). Redis replay prevention is not working. " f"Response: {r3.text}" ) @pytest.mark.asyncio async def test_totp_replay_service_layer(db_session): """AUTH-08: verify_totp() service returns False when same code presented twice. Tests the service layer directly (not via HTTP) to isolate Redis-based replay prevention from the login rate limiter and other middleware. """ import uuid as _uuid from db.models import User, Quota from services.auth import hash_password, provision_totp, verify_totp # Create a user user_id = _uuid.uuid4() user = User( id=user_id, handle=f"srv_replay_{user_id.hex[:6]}", email=f"srv_replay_{user_id.hex[:6]}@example.com", password_hash=hash_password(VALID_PASSWORD), role="user", is_active=True, password_must_change=False, ) quota = Quota(user_id=user_id, limit_bytes=104857600, used_bytes=0) db_session.add(user) db_session.add(quota) await db_session.commit() # Provision a TOTP secret secret, _ = await provision_totp(db_session, user_id) # Fresh FakeRedis for this service-layer test fake_redis = FakeRedis() # Generate a valid code totp = pyotp.TOTP(secret) code = totp.now() # First verification must succeed result1 = await verify_totp(db_session, user_id, code, fake_redis) assert result1 is True, ( f"First TOTP verification should succeed, got False. " f"Code: {code}, Secret: {secret}" ) # Second verification with same code must fail (replay prevention) result2 = await verify_totp(db_session, user_id, code, fake_redis) assert result2 is False, ( f"AUTH-08 VIOLATED: verify_totp() accepted the same code a second time. " f"Redis key 'totp_used:{user_id}:{code}' should block the second use." )