test(phase-02): add Nyquist validation tests — fill SEC-05, AUTH-08, SEC-03 and frontend gaps
8 test files, 60 new tests (14 backend + 46 frontend). All green. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,239 @@
|
||||
"""
|
||||
AUTH-08: TOTP replay prevention — same valid code used twice within 90 seconds
|
||||
must be rejected on the second use.
|
||||
|
||||
This is a distinct behavioral gap from the rate-limit test (11 calls → 429).
|
||||
The replay test: one valid code → second use of the *same* code → rejected.
|
||||
Uses the FakeRedis from test_auth_totp.py pattern to keep tests hermetic.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timezone
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
import pytest_asyncio
|
||||
import pyotp
|
||||
from httpx import ASGITransport, AsyncClient
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from db.models import User
|
||||
|
||||
|
||||
# ── FakeRedis (mirrors test_auth_totp.py) ────────────────────────────────────
|
||||
|
||||
class FakeRedis:
|
||||
"""In-process fake Redis: stores keys with optional TTL expiry."""
|
||||
|
||||
def __init__(self):
|
||||
self._store: dict = {}
|
||||
|
||||
async def get(self, key):
|
||||
entry = self._store.get(key)
|
||||
if entry is None:
|
||||
return None
|
||||
val, exp = entry
|
||||
if exp is not None and datetime.now(timezone.utc).timestamp() > exp:
|
||||
del self._store[key]
|
||||
return None
|
||||
return val
|
||||
|
||||
async def incr(self, key):
|
||||
entry = self._store.get(key)
|
||||
if entry is None:
|
||||
self._store[key] = (1, None)
|
||||
return 1
|
||||
val, exp = entry
|
||||
new_val = val + 1
|
||||
self._store[key] = (new_val, exp)
|
||||
return new_val
|
||||
|
||||
async def expire(self, key, seconds):
|
||||
if key in self._store:
|
||||
val, _ = self._store[key]
|
||||
deadline = datetime.now(timezone.utc).timestamp() + seconds
|
||||
self._store[key] = (val, deadline)
|
||||
|
||||
async def set(self, key, value, ex=None):
|
||||
deadline = None
|
||||
if ex is not None:
|
||||
deadline = datetime.now(timezone.utc).timestamp() + ex
|
||||
self._store[key] = (value, deadline)
|
||||
|
||||
async def close(self):
|
||||
pass
|
||||
|
||||
|
||||
VALID_PASSWORD = "StrongPass12!"
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def totp_replay_client(db_session: AsyncSession):
|
||||
"""Async test client with DB override and fresh FakeRedis."""
|
||||
from deps.db import get_db
|
||||
from main import app
|
||||
from api.auth import limiter as auth_limiter
|
||||
|
||||
app.dependency_overrides[get_db] = lambda: db_session
|
||||
fake_redis = FakeRedis()
|
||||
app.state.redis = fake_redis
|
||||
|
||||
try:
|
||||
auth_limiter._storage.reset()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c:
|
||||
yield c
|
||||
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
|
||||
async def _setup_totp_user(client, db_session, handle="replayuser", email="replay@example.com"):
|
||||
"""Register a user, provision TOTP, and return (access_token, totp_secret, fake_redis).
|
||||
|
||||
Directly manipulates the DB to set totp_enabled=True without consuming any
|
||||
TOTP code via the API — this leaves the Redis replay-prevention store clean,
|
||||
so the first login attempt with a fresh code has never been marked used.
|
||||
"""
|
||||
from sqlalchemy import select as _select
|
||||
from services.auth import hash_password as _hash
|
||||
from db.models import User, Quota
|
||||
|
||||
import uuid as _uuid
|
||||
import pyotp as _pyotp
|
||||
|
||||
# Create user directly in the DB (avoids HIBP check in test environment)
|
||||
user_id = _uuid.uuid4()
|
||||
secret = _pyotp.random_base32()
|
||||
user = User(
|
||||
id=user_id,
|
||||
handle=handle,
|
||||
email=email,
|
||||
password_hash=_hash(VALID_PASSWORD),
|
||||
role="user",
|
||||
is_active=True,
|
||||
password_must_change=False,
|
||||
totp_enabled=True,
|
||||
totp_secret=secret,
|
||||
)
|
||||
quota = Quota(user_id=user_id, limit_bytes=104857600, used_bytes=0)
|
||||
db_session.add(user)
|
||||
db_session.add(quota)
|
||||
await db_session.commit()
|
||||
|
||||
# Obtain an access token for this user without going through login
|
||||
from services.auth import create_access_token as _create_token
|
||||
token = _create_token(str(user_id), "user")
|
||||
|
||||
return token, secret
|
||||
|
||||
|
||||
# ── Tests ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_totp_replay_rejected_same_code(totp_replay_client, db_session):
|
||||
"""AUTH-08: Same TOTP code used twice must be rejected on second use.
|
||||
|
||||
First login with TOTP succeeds. The second login with the *identical*
|
||||
code within the same 90-second window must return 401 (code already used).
|
||||
"""
|
||||
client = totp_replay_client
|
||||
_token, secret = await _setup_totp_user(client, db_session)
|
||||
|
||||
# First login step: password only — get requires_totp challenge
|
||||
r1 = await client.post(
|
||||
"/api/auth/login",
|
||||
json={"email": "replay@example.com", "password": VALID_PASSWORD},
|
||||
)
|
||||
assert r1.status_code == 200, f"First login step failed: {r1.text}"
|
||||
# Should ask for TOTP
|
||||
assert r1.json().get("requires_totp") is True, (
|
||||
f"Expected TOTP challenge, got: {r1.json()}"
|
||||
)
|
||||
|
||||
# Get a valid TOTP code
|
||||
totp = pyotp.TOTP(secret)
|
||||
valid_code = totp.now()
|
||||
|
||||
# First use of the code: must succeed and return access_token
|
||||
r2 = await client.post(
|
||||
"/api/auth/login",
|
||||
json={
|
||||
"email": "replay@example.com",
|
||||
"password": VALID_PASSWORD,
|
||||
"totp_code": valid_code,
|
||||
},
|
||||
)
|
||||
assert r2.status_code == 200, f"First TOTP use failed: {r2.text}"
|
||||
assert "access_token" in r2.json(), (
|
||||
f"Expected access_token on first TOTP use, got: {r2.json()}"
|
||||
)
|
||||
|
||||
# Second use of the SAME code: must be rejected (Redis replay prevention)
|
||||
r3 = await client.post(
|
||||
"/api/auth/login",
|
||||
json={
|
||||
"email": "replay@example.com",
|
||||
"password": VALID_PASSWORD,
|
||||
"totp_code": valid_code, # identical code — replay attempt
|
||||
},
|
||||
)
|
||||
assert r3.status_code == 401, (
|
||||
f"AUTH-08 VIOLATED: Second use of same TOTP code within 90s window was accepted "
|
||||
f"(status {r3.status_code}). Redis replay prevention is not working. "
|
||||
f"Response: {r3.text}"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_totp_replay_service_layer(db_session):
|
||||
"""AUTH-08: verify_totp() service returns False when same code presented twice.
|
||||
|
||||
Tests the service layer directly (not via HTTP) to isolate Redis-based
|
||||
replay prevention from the login rate limiter and other middleware.
|
||||
"""
|
||||
import uuid as _uuid
|
||||
from db.models import User, Quota
|
||||
from services.auth import hash_password, provision_totp, verify_totp
|
||||
|
||||
# Create a user
|
||||
user_id = _uuid.uuid4()
|
||||
user = User(
|
||||
id=user_id,
|
||||
handle=f"srv_replay_{user_id.hex[:6]}",
|
||||
email=f"srv_replay_{user_id.hex[:6]}@example.com",
|
||||
password_hash=hash_password(VALID_PASSWORD),
|
||||
role="user",
|
||||
is_active=True,
|
||||
password_must_change=False,
|
||||
)
|
||||
quota = Quota(user_id=user_id, limit_bytes=104857600, used_bytes=0)
|
||||
db_session.add(user)
|
||||
db_session.add(quota)
|
||||
await db_session.commit()
|
||||
|
||||
# Provision a TOTP secret
|
||||
secret, _ = await provision_totp(db_session, user_id)
|
||||
|
||||
# Fresh FakeRedis for this service-layer test
|
||||
fake_redis = FakeRedis()
|
||||
|
||||
# Generate a valid code
|
||||
totp = pyotp.TOTP(secret)
|
||||
code = totp.now()
|
||||
|
||||
# First verification must succeed
|
||||
result1 = await verify_totp(db_session, user_id, code, fake_redis)
|
||||
assert result1 is True, (
|
||||
f"First TOTP verification should succeed, got False. "
|
||||
f"Code: {code}, Secret: {secret}"
|
||||
)
|
||||
|
||||
# Second verification with same code must fail (replay prevention)
|
||||
result2 = await verify_totp(db_session, user_id, code, fake_redis)
|
||||
assert result2 is False, (
|
||||
f"AUTH-08 VIOLATED: verify_totp() accepted the same code a second time. "
|
||||
f"Redis key 'totp_used:{user_id}:{code}' should block the second use."
|
||||
)
|
||||
Reference in New Issue
Block a user