feat(02-03): add TOTP setup/enable/disable, password reset, and frontend_url to config

- GET /api/auth/totp/setup: returns provisioning_uri + secret (400 if already enabled)
- POST /api/auth/totp/enable: rate-limited 10/min, verifies TOTP code with Redis replay prevention, returns 10 backup codes
- DELETE /api/auth/totp: disables TOTP, clears secret, deletes backup codes
- POST /api/auth/password-reset: always returns 202 (anti-enumeration), enqueues Celery email task
- POST /api/auth/password-reset/confirm: validates token, strength, HIBP; updates password; no auto-login (AUTH-05)
- config.py: added frontend_url setting for password reset link construction
- test_auth_totp.py: all 11 tests passing (GREEN)
This commit is contained in:
curo1305
2026-05-22 19:52:36 +02:00
parent d7831e9382
commit 43e1d0145e
3 changed files with 201 additions and 8 deletions
+185
View File
@@ -35,6 +35,7 @@ from deps.db import get_db
from services import auth as auth_service
from slowapi import Limiter
from slowapi.util import get_remote_address
from sqlalchemy import delete
router = APIRouter(prefix="/api/auth", tags=["auth"])
@@ -428,3 +429,187 @@ async def change_password(
await session.commit()
return {"message": "Password updated"}
# ── Request models for new endpoints ─────────────────────────────────────────
class TotpEnableRequest(BaseModel):
code: str
class PasswordResetRequest(BaseModel):
email: EmailStr
class PasswordResetConfirmRequest(BaseModel):
token: str
new_password: str
# ── GET /api/auth/totp/setup ──────────────────────────────────────────────────
@router.get("/totp/setup")
async def totp_setup(
session: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Provision a TOTP secret for the current user.
If TOTP is already enabled, returns 400.
Returns { provisioning_uri, secret } — the provisioning_uri is suitable
for QR code generation. The secret is the base32-encoded TOTP secret.
"""
if current_user.totp_enabled:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="TOTP already enabled",
)
secret, provisioning_uri = await auth_service.provision_totp(session, current_user.id)
return {"provisioning_uri": provisioning_uri, "secret": secret}
# ── POST /api/auth/totp/enable ───────────────────────────────────────────────
@router.post("/totp/enable")
@limiter.limit("10/minute")
async def enable_totp(
request: Request,
body: TotpEnableRequest,
session: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Enable TOTP for the current user.
Rate-limited to 10 attempts/minute per IP (SEC-02 / T-02-25).
Verifies the submitted 6-digit code (with Redis replay prevention, AUTH-08).
On success: marks TOTP enabled, generates and returns 10 one-time backup codes.
The backup codes are ONLY returned here — they are stored as Argon2 hashes
in the DB and never returned again (T-02-19).
"""
redis_client = request.app.state.redis
ok = await auth_service.verify_totp(session, current_user.id, body.code, redis_client)
if not ok:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Incorrect or expired code",
)
# Mark TOTP as enabled
user = await session.get(User, current_user.id)
user.totp_enabled = True
await session.flush()
# Generate and store 10 backup codes; return plaintext to user (one-time, T-02-19)
plain_codes = auth_service.generate_backup_codes(10)
await auth_service.store_backup_codes(session, current_user.id, plain_codes)
return {"backup_codes": plain_codes}
# ── DELETE /api/auth/totp ─────────────────────────────────────────────────────
@router.delete("/totp")
async def disable_totp(
session: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Disable TOTP for the current user.
Clears totp_secret, sets totp_enabled=False, and deletes all backup codes.
"""
user = await session.get(User, current_user.id)
user.totp_enabled = False
user.totp_secret = None
# Delete all backup codes for this user (including unused ones)
await session.execute(delete(BackupCode).where(BackupCode.user_id == current_user.id))
await session.commit()
return {"message": "TOTP disabled"}
# ── POST /api/auth/password-reset ─────────────────────────────────────────────
@router.post("/password-reset", status_code=status.HTTP_202_ACCEPTED)
@limiter.limit("5/hour")
async def password_reset_request(
request: Request,
body: PasswordResetRequest,
session: AsyncSession = Depends(get_db),
):
"""Request a password reset email.
Always returns 202 regardless of whether the email exists (anti-enumeration, T-02-22).
If the user is found, a signed reset token (1-hour JWT) is generated and a Celery
task is enqueued to send the email (D-02, D-03).
"""
from sqlalchemy import select as _select # noqa: PLC0415 (already imported above)
result = await session.execute(_select(User).where(User.email == str(body.email)))
user: Optional[User] = result.scalar_one_or_none()
if user is not None:
token = auth_service.create_password_reset_token(str(user.id))
reset_link = f"{settings.frontend_url}/password-reset/confirm?token={token}"
# Deferred import to avoid circular import; Celery task is fire-and-forget
from tasks.email_tasks import send_reset_email # noqa: PLC0415
send_reset_email.delay(user.email, reset_link)
# Always return 202 (anti-enumeration — never reveal whether email exists)
return {
"message": (
"If an account exists for that email, you will receive a reset link shortly."
)
}
# ── POST /api/auth/password-reset/confirm ────────────────────────────────────
@router.post("/password-reset/confirm")
async def password_reset_confirm(
body: PasswordResetConfirmRequest,
session: AsyncSession = Depends(get_db),
):
"""Confirm a password reset using the token from the email link.
Validates the reset token, enforces password strength + HIBP check, updates
the password, and revokes all refresh tokens. Does NOT issue new tokens —
the user must sign in again through /login (AUTH-05, T-02-21).
"""
try:
user_id_str = auth_service.decode_password_reset_token(body.token)
except ValueError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid or expired reset link",
)
# Password strength validation
if not _validate_password_strength(body.new_password):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=_PASSWORD_DETAIL,
)
# HIBP breach check (SEC-03)
if await auth_service.check_hibp(body.new_password):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="This password has appeared in a data breach. Choose a different password.",
)
# Load user
user = await session.get(User, uuid.UUID(user_id_str))
if user is None or not user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid or expired reset link",
)
# Update password and revoke all sessions (forces re-auth through TOTP if enabled)
user.password_hash = auth_service.hash_password(body.new_password)
await auth_service.revoke_all_refresh_tokens(session, user.id)
await session.commit()
# Do NOT issue tokens (AUTH-05 — user must pass TOTP gate on next login)
return {"message": "Password updated. Please sign in."}