---
phase: 02-users-authentication
plan: 01
type: execute
wave: 1
depends_on: []
files_modified:
- backend/config.py
- backend/requirements.txt
- backend/deps/auth.py
- backend/services/auth.py
- backend/db/models.py
- backend/migrations/versions/
- backend/tasks/email_tasks.py
- .env.example
autonomous: true
requirements:
- AUTH-01
- AUTH-02
- AUTH-07
- SEC-03
- SEC-06
must_haves:
truths:
- "User passwords are Argon2-hashed and verified in constant time"
- "JWT access token creation and decode function reject tampered tokens"
- "Refresh token rotation revokes the entire family on reuse, and a security alert email is enqueued when a revoked family token is presented"
- "Users can receive and later redeem single-use backup codes"
- "All auth and email configuration reads from environment variables at startup without falling back to hard-coded values"
artifacts:
- path: "backend/services/auth.py"
provides: "Auth service: hash_password, verify_password, create_access_token, decode_access_token, create_refresh_token, rotate_refresh_token, revoke_all_refresh_tokens, provision_totp, verify_totp, verify_backup_code, generate_backup_codes, bootstrap_admin"
exports:
- "hash_password"
- "verify_password"
- "create_access_token"
- "decode_access_token"
- "rotate_refresh_token"
- "revoke_all_refresh_tokens"
- path: "backend/deps/auth.py"
provides: "FastAPI dependencies: get_current_user, get_current_admin"
exports:
- "get_current_user"
- "get_current_admin"
- path: "backend/db/models.py"
provides: "BackupCode ORM model added, password_must_change field on User"
contains: "class BackupCode"
- path: "backend/tasks/email_tasks.py"
provides: "Celery tasks: send_reset_email, send_security_alert_email"
exports:
- "send_reset_email"
- "send_security_alert_email"
key_links:
- from: "backend/deps/auth.py"
to: "backend/services/auth.py"
via: "decode_access_token(credentials.credentials)"
pattern: "decode_access_token"
- from: "backend/services/auth.py"
to: "backend/db/models.py"
via: "RefreshToken, BackupCode ORM queries"
pattern: "session\\.get.*RefreshToken|BackupCode"
- from: "backend/services/auth.py"
to: "backend/tasks/email_tasks.py"
via: "send_security_alert_email.delay(user_id) on family revocation"
pattern: "send_security_alert_email\\.delay"
---
Build the auth service layer and database foundations that all subsequent plans depend on. This plan produces no user-visible endpoints — it creates the pure-Python services, FastAPI dependency chain, and a migration adding the `backup_codes` table and `password_must_change` field.
Purpose: Every auth endpoint in Plans 02–06 imports from this plan's outputs. Getting contracts right here prevents rework across all later plans.
Output: backend/services/auth.py (full auth logic), backend/deps/auth.py (FastAPI deps), BackupCode ORM model, password_must_change column on User, Alembic migration, updated requirements.txt and config.py, send_security_alert_email Celery task.
@/Users/nik/.claude/get-shit-done/workflows/execute-plan.md
@/Users/nik/.claude/get-shit-done/templates/summary.md
@.planning/PROJECT.md
@.planning/ROADMAP.md
@.planning/STATE.md
@.planning/phases/02-users-authentication/02-CONTEXT.md
@.planning/phases/02-users-authentication/02-PATTERNS.md
From backend/db/models.py (existing ORM models for reference):
class User(Base):
__tablename__ = "users"
id: Mapped[uuid.UUID] # primary key
handle: Mapped[str] # unique
email: Mapped[str] # unique
password_hash: Mapped[str]
totp_secret: Mapped[Optional[str]]
totp_enabled: Mapped[bool] # default False
role: Mapped[str] # "user" | "admin"
is_active: Mapped[bool] # default True
ai_provider: Mapped[Optional[str]]
ai_model: Mapped[Optional[str]]
created_at: Mapped[datetime]
class RefreshToken(Base):
__tablename__ = "refresh_tokens"
id: Mapped[uuid.UUID]
user_id: Mapped[uuid.UUID] # FK users.id CASCADE
token_hash: Mapped[str] # unique
expires_at: Mapped[datetime]
revoked: Mapped[bool] # default False
created_at: Mapped[datetime]
class Quota(Base):
__tablename__ = "quotas"
user_id: Mapped[uuid.UUID] # PK, FK users.id CASCADE
limit_bytes: Mapped[int] # default 104857600
used_bytes: Mapped[int] # default 0
From backend/config.py (current Settings fields):
database_url: str
database_migrate_url: str
minio_endpoint, minio_access_key, minio_secret_key, minio_bucket: str
redis_url: str = "redis://:changeme_redis@redis:6379/0"
secret_key: str = "CHANGEME"
From backend/deps/db.py (dependency pattern):
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()
Task 1: Add BackupCode ORM model, password_must_change field, Alembic migration, and extend Settings
backend/db/models.py,
backend/config.py,
backend/requirements.txt,
.env.example
- backend/db/models.py (full file — understand existing model patterns before adding BackupCode and password_must_change)
- backend/config.py (full file — extend Settings class in place, do not recreate)
- backend/requirements.txt (full file — append new packages, preserve existing)
- .env.example (full file — append new vars with comments)
- backend/celery_app.py (task_routes pattern — add email queue route)
- BackupCode model stores code_hash (Argon2), user_id FK, used_at nullable timestamp
- BackupCode.used_at is None when unused; set to now() on first use (single-use enforcement)
- User model gains: password_must_change: Mapped[bool] = mapped_column(Boolean, server_default="false") — added immediately after is_active column
- Settings.cors_origins parses comma-separated env var as list[str] via pydantic-settings JSON parse
- Settings loads access_token_expire_minutes=15, refresh_token_expire_days=30
- requirements.txt gains: PyJWT>=2.8.0, pwdlib[argon2]>=0.2.1, pyotp>=2.9.0, httpx (already present for HIBP check), aioredis>=2.0.0, slowapi>=0.1.9
Add BackupCode to backend/db/models.py. The model needs: id (UUID PK), user_id (UUID FK users.id CASCADE NOT NULL), code_hash (Text NOT NULL), used_at (TIMESTAMP timezone=True nullable). Add Index on user_id. Place after the RefreshToken class.
Add password_must_change to the User model (per ADMIN-01): password_must_change: Mapped[bool] = mapped_column(Boolean, server_default="false"). Insert this field immediately after the is_active field in the User class body.
Extend backend/config.py Settings class (do not recreate the file) by appending these fields inside the Settings class body — per D-01, D-04, D-05, D-06, D-09:
access_token_expire_minutes: int = 15
refresh_token_expire_days: int = 30
smtp_host: str = ""
smtp_port: int = 587
smtp_user: str = ""
smtp_password: str = ""
smtp_from: str = "noreply@docuvault.local"
admin_email: str = ""
admin_password: str = ""
cors_origins: list[str] = ["http://localhost:5173"]
In SettingsConfigDict, add env_list_separator="," to handle cors_origins env var as comma-separated string.
Append to requirements.txt:
PyJWT>=2.8.0
pwdlib[argon2]>=0.2.1
pyotp>=2.9.0
aioredis>=2.0.0
slowapi>=0.1.9
Append to .env.example with comments explaining each var (D-01, D-04, D-09):
SECRET_KEY, ADMIN_EMAIL, ADMIN_PASSWORD, SMTP_HOST, SMTP_PORT, SMTP_USER, SMTP_PASSWORD, SMTP_FROM, CORS_ORIGINS.
Generate an Alembic migration (run: cd backend && alembic revision --autogenerate -m "add_backup_codes_and_password_must_change") then verify and clean the generated file. The migration must: create backup_codes table with columns (id UUID PK, user_id UUID FK NOT NULL, code_hash TEXT NOT NULL, used_at TIMESTAMPTZ nullable); add column password_must_change BOOLEAN NOT NULL DEFAULT false to users table. Do NOT add NOT NULL to documents.user_id in this migration (per D-03 from Phase 1).
cd /Users/nik/Documents/Progamming/document_scanner/backend && python -c "from db.models import BackupCode; print('BackupCode OK')"
cd /Users/nik/Documents/Progamming/document_scanner/backend && python -c "from db.models import User; assert hasattr(User, 'password_must_change'); print('password_must_change OK')"
cd /Users/nik/Documents/Progamming/document_scanner/backend && python -c "from config import settings; assert hasattr(settings, 'cors_origins'); assert hasattr(settings, 'admin_email'); print('Settings OK')"
- backend/db/models.py contains class BackupCode with __tablename__ = "backup_codes", columns: id, user_id, code_hash, used_at
- backend/db/models.py User class contains password_must_change column with server_default="false"
- backend/config.py Settings class contains: cors_origins, admin_email, admin_password, smtp_host, access_token_expire_minutes, refresh_token_expire_days
- A new migration file exists in backend/migrations/versions/ with "backup_codes" in the filename
- The migration adds both the backup_codes table AND the password_must_change column to users
- requirements.txt contains lines matching: PyJWT, pwdlib, pyotp, aioredis, slowapi
- .env.example contains lines for SECRET_KEY, ADMIN_EMAIL, SMTP_HOST, CORS_ORIGINS
- `python -c "from config import settings"` exits 0
BackupCode model exists, password_must_change field added to User, migration generated and runnable, Settings extended with all Phase 2 env vars, requirements.txt updated.
Task 2: Implement services/auth.py — full auth service layer
backend/services/auth.py,
backend/tasks/email_tasks.py
- backend/services/classifier.py (pure-Python service pattern — module docstring, async functions, no FastAPI imports, no HTTPException)
- backend/db/models.py (User, RefreshToken, Quota, BackupCode — after Task 1 adds BackupCode and password_must_change)
- backend/config.py (settings.secret_key, settings.access_token_expire_minutes, settings.refresh_token_expire_days)
- backend/celery_app.py (task_routes dict — add email queue route for send_security_alert_email)
- backend/tasks/document_tasks.py (Celery task pattern with asyncio.run + deferred imports)
- .planning/phases/02-users-authentication/02-CONTEXT.md (specifics section: PyOTP valid_window=1, reset token format, TOTP replay prevention)
- .planning/phases/02-users-authentication/02-PATTERNS.md (services/auth.py section — function signatures)
- hash_password("plain") returns Argon2 hash string (pwdlib PasswordHash with argon2 scheme)
- verify_password("plain", "hash") uses hmac.compare_digest internally (constant-time, SEC-06)
- create_access_token(user_id, role) returns signed JWT with sub=str(user_id), role=role, typ="access", exp=now+15min
- decode_access_token(token) returns payload dict; raises ValueError on expired/tampered/wrong typ
- create_refresh_token(session, user_id) inserts RefreshToken row with token_hash=sha256(raw), returns raw token string
- rotate_refresh_token(session, raw_token): if revoked=True, revoke ALL tokens in family (where user_id=this.user_id), enqueue send_security_alert_email.delay(str(user_id)) via Celery, and raise ValueError("token_family_revoked"); if valid, revoke old, create new, return (new_raw, user_id_str)
- revoke_all_refresh_tokens(session, user_id) sets revoked=True on all user's active tokens, returns count
- provision_totp(session, user_id) generates pyotp.random_base32() secret, stores in users.totp_secret (not enabled yet), returns (secret, provisioning_uri) where uri uses pyotp.totp.TOTP(secret).provisioning_uri(email, issuer_name="DocuVault")
- verify_totp(session, user_id, code, redis_client): get user.totp_secret, check pyotp.TOTP(secret).verify(code, valid_window=1); if valid, store used code in Redis key "totp_used:{user_id}:{code}" with TTL=90s; return False if key already exists (replay prevention, AUTH-08)
- generate_backup_codes(n=10) returns list of 10 random 8-char alphanumeric strings (secrets.token_hex(4).upper() format)
- store_backup_codes(session, user_id, codes) inserts BackupCode rows with code_hash=hash_password(code) for each code, deletes any existing unused codes first
- verify_backup_code(session, user_id, code): fetch all unused BackupCode rows for user; iterate and hmac.compare_digest-based check via verify_password; on match set used_at=now(), commit, return True; return False if none match (constant-time: always iterate all codes)
- check_hibp(password) calls https://api.pwnedpasswords.com/range/{sha1_prefix} and returns True if count > 0 (breach found) — async, uses httpx.AsyncClient
- create_password_reset_token(user_id) returns signed JWT with sub=str(user_id), typ="password-reset", exp=now+3600s
- decode_password_reset_token(token) returns user_id str; raises ValueError if expired/wrong typ
- bootstrap_admin(session) is an idempotent async function: if users table is empty AND settings.admin_email AND settings.admin_password are non-empty, create User(role="admin", password_must_change=False) + Quota(limit_bytes=104857600) rows; log WARNING if vars not set (D-04, D-05, D-06)
Create backend/services/auth.py as a pure-Python module (no FastAPI imports, no HTTPException — per the classifier.py pattern).
Imports needed: hashlib, hmac, secrets, uuid, logging, datetime, from typing, from sqlalchemy.ext.asyncio import AsyncSession, from sqlalchemy import select, from jose/jwt (use PyJWT: import jwt), from pwdlib import PasswordHash, from pwdlib.hashers.argon2 import Argon2Hasher, import pyotp, import httpx, from db.models import User, RefreshToken, Quota, BackupCode, from config import settings.
Password hasher: _pwd = PasswordHash([Argon2Hasher()]). Use _pwd.hash(plain) and _pwd.verify(plain, hashed).
JWT: use jwt.encode(payload, settings.secret_key, algorithm="HS256") and jwt.decode(token, settings.secret_key, algorithms=["HS256"]). In decode_access_token, verify payload["typ"] == "access" after decoding.
Refresh token: raw = secrets.token_urlsafe(32). token_hash = hashlib.sha256(raw.encode()).hexdigest(). Store token_hash in RefreshToken.token_hash. Never store raw token in DB.
Refresh family revocation (AUTH-07): add family_id column is NOT in the schema — use user_id as the family proxy. When reuse detected (token exists but revoked=True), call revoke_all_refresh_tokens(session, this_token.user_id), then import and enqueue send_security_alert_email.delay(str(this_token.user_id)), then raise ValueError("token_family_revoked").
TOTP replay: Redis key = f"totp_used:{user_id}:{code}", TTL = 90 (covers valid_window=1 which spans ±30s = 90s total window). Use aioredis (or redis.asyncio — check which is available) for async Redis operations. Accept redis_client as parameter to allow injection in tests.
HIBP check: k-anonymity model — sha1(password.upper()).hexdigest()[:5] as prefix, check response lines for matching suffix. Function is async, uses httpx.AsyncClient with timeout=5. Returns True (pwned) / False (not found or network error — fail open, log warning on error).
All functions that raise on bad input raise plain ValueError, never HTTPException.
Also create backend/tasks/email_tasks.py following document_tasks.py pattern. Add two tasks:
1. "tasks.email_tasks.send_reset_email" — calls send_password_reset_email(to_address, reset_link) from services.email
2. "tasks.email_tasks.send_security_alert_email" — sends a security alert to the user identified by user_id (fetch email from DB inside the task using asyncio.run; if SMTP not configured, log WARNING "Security alert for user {user_id}: suspicious refresh token reuse detected"). No top-level config or model imports (deferred import pattern). Add both task routes to celery_app.conf.task_routes under "tasks.email_tasks.*": {"queue": "email"}.
cd /Users/nik/Documents/Progamming/document_scanner/backend && python -c "from services.auth import hash_password, verify_password, create_access_token, decode_access_token; h = hash_password('TestPass123!'); assert verify_password('TestPass123!', h); t = create_access_token('test-id', 'user'); p = decode_access_token(t); assert p['sub'] == 'test-id'; print('auth service OK')"
cd /Users/nik/Documents/Progamming/document_scanner/backend && python -c "from tasks.email_tasks import send_reset_email, send_security_alert_email; print('email tasks OK')"
- `from services.auth import hash_password` imports without error
- hash_password("test") returns a string starting with "$argon2"
- verify_password("correct", hash_password("correct")) returns True
- verify_password("wrong", hash_password("correct")) returns False
- create_access_token("uid", "user") returns a string with 2 dots (JWT format)
- decode_access_token with a tampered token raises ValueError
- generate_backup_codes(10) returns a list of 10 strings each len 8
- create_password_reset_token("uid") returns JWT; decode_password_reset_token on it returns "uid"
- decode_password_reset_token on an access token raises ValueError (wrong typ)
- No import of FastAPI, HTTPException, or any fastapi module in services/auth.py
- grep -c "HTTPException" backend/services/auth.py returns 0
- rotate_refresh_token enqueues send_security_alert_email.delay when family_revoked=True — grep -c "send_security_alert_email" backend/services/auth.py returns at least 1
- POST /api/auth/refresh with a revoked family token triggers send_security_alert_email Celery task (assert task enqueued in test using mock or celery task_always_eager=True)
- backend/tasks/email_tasks.py exists with both send_reset_email and send_security_alert_email Celery tasks
services/auth.py implements all auth primitives: Argon2 hashing, constant-time verify, JWT creation/decode, refresh token lifecycle with security alert on family revocation, TOTP provisioning and replay-prevention, backup code generation/storage/verification, HIBP check, admin bootstrap. email_tasks.py has both send_reset_email and send_security_alert_email tasks.
Task 3: Implement deps/auth.py — FastAPI dependency chain
backend/deps/auth.py,
backend/tests/test_auth_deps.py
- backend/deps/db.py (dependency function pattern — how get_db is structured)
- backend/services/auth.py (decode_access_token signature — after Task 2)
- backend/db/models.py (User model fields: id, role, is_active)
- .planning/phases/02-users-authentication/02-PATTERNS.md (deps/auth.py section — get_current_user, get_current_admin code)
Create backend/deps/auth.py with:
Imports: uuid, from fastapi import Depends, HTTPException, status, from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials, from sqlalchemy.ext.asyncio import AsyncSession, from deps.db import get_db, from services import auth as auth_service, from db.models import User.
security = HTTPBearer()
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security), session: AsyncSession = Depends(get_db)) -> User:
Calls auth_service.decode_access_token(credentials.credentials). On ValueError raises HTTPException(401, "Invalid or expired token"). Loads User via session.get(User, uuid.UUID(payload["sub"])). Raises 401 if user is None or not user.is_active.
async def get_current_admin(user: User = Depends(get_current_user)) -> User:
Raises HTTPException(403, "Admin access required") if user.role != "admin". Returns user.
Create backend/tests/test_auth_deps.py with two tests:
test_get_current_user_returns_user: override get_db with db_session; insert a User row; create a valid access token; call the endpoint with the Bearer token; assert 200 (use a minimal /api/auth/me stub if needed — or test the dep directly by calling it in an app context).
test_get_current_admin_rejects_non_admin: create user with role="user"; assert 403 when get_current_admin is called.
Test pattern: use the async_client fixture + app.dependency_overrides[get_db] = lambda: db_session from conftest.py.
cd /Users/nik/Documents/Progamming/document_scanner/backend && python -c "from deps.auth import get_current_user, get_current_admin; print('deps/auth OK')"
cd /Users/nik/Documents/Progamming/document_scanner/backend && python -m pytest tests/test_auth_deps.py -x -q 2>&1 | tail -5
- `from deps.auth import get_current_user, get_current_admin` imports without error
- get_current_user raises HTTPException(401) when token is expired or tampered
- get_current_user raises HTTPException(401) when user.is_active is False
- get_current_admin raises HTTPException(403) when user.role == "user"
- get_current_admin returns user when user.role == "admin"
- grep -c "HTTPException(status.HTTP_403" backend/deps/auth.py returns at least 1
- tests/test_auth_deps.py exists and pytest exits 0
FastAPI dependency chain complete: get_current_user validates Bearer JWT and returns User ORM object; get_current_admin enforces role="admin" or raises 403.
## Trust Boundaries
| Boundary | Description |
|----------|-------------|
| client→API (auth service) | Untrusted email, password, handle in JSON body |
| API→Redis | Token replay keys written/read; Redis on internal network only |
| API→HIBP external | Password SHA-1 prefix sent to third-party; no full password leaves the system |
## STRIDE Threat Register
| Threat ID | Category | Component | Disposition | Mitigation Plan |
|-----------|----------|-----------|-------------|-----------------|
| T-02-01 | Spoofing | JWT decode in decode_access_token | mitigate | Verify `typ` claim after decode to prevent password-reset tokens being used as access tokens |
| T-02-02 | Spoofing | refresh token reuse | mitigate | Family revocation: if revoked token is presented, revoke all tokens for that user_id and enqueue security alert email; RFC 9700 per AUTH-07 |
| T-02-03 | Tampering | backup code storage | mitigate | Store Argon2 hash of each backup code (never plaintext); verify via constant-time compare (SEC-06) |
| T-02-04 | Repudiation | bootstrap_admin idempotency | mitigate | bootstrap_admin checks `users table empty` before insert; logs WARNING when env vars missing; idempotent |
| T-02-05 | Information Disclosure | HIBP check | mitigate | k-anonymity model: only SHA-1 prefix (5 chars) sent to HIBP API; full hash never leaves the system |
| T-02-06 | Denial of Service | HIBP network call | accept | httpx timeout=5s; on network error function returns False (fail-open, log warning) — auth proceeds |
| T-02-07 | Elevation of Privilege | get_current_admin | mitigate | Explicit role != "admin" check raises 403; no path to skip this dependency |
| T-02-08 | Elevation of Privilege | Admin impersonation | mitigate | Architectural exclusion: no endpoint or code path that sets a JWT sub to a different user exists; ADMIN-07 |
| T-02-SC | Tampering | npm/pip installs (PyJWT, pwdlib, pyotp, slowapi) | mitigate | Packages added to requirements.txt; legitimacy verified: PyJWT (jpadilla, 70M+/mo), pwdlib (frankie567, auth library), pyotp (0.9.0, crypto vetted), slowapi (laurentS, 1M+/mo) — all [VERIFIED] |
1. `python -c "from services.auth import hash_password, verify_password, create_access_token, decode_access_token, create_refresh_token, rotate_refresh_token, revoke_all_refresh_tokens, provision_totp, verify_totp, verify_backup_code, generate_backup_codes, check_hibp, bootstrap_admin"` — exits 0
2. `python -c "from deps.auth import get_current_user, get_current_admin"` — exits 0
3. `python -c "from config import settings; assert settings.cors_origins == ['http://localhost:5173']"` — exits 0
4. `grep -c 'class BackupCode' backend/db/models.py` — returns 1
5. `grep -c 'password_must_change' backend/db/models.py` — returns at least 1
6. Migration file exists: `ls backend/migrations/versions/ | grep backup_codes`
7. `pytest tests/test_auth_deps.py -x -q` passes
8. `grep -c 'send_security_alert_email' backend/services/auth.py` — returns at least 1
- All auth primitives implemented in services/auth.py with no FastAPI coupling
- rotate_refresh_token enqueues send_security_alert_email.delay on family revocation (AUTH-07)
- FastAPI dependency chain in deps/auth.py with get_current_user and get_current_admin
- BackupCode ORM model and Alembic migration in place
- password_must_change column added to User model and included in migration
- Settings class extended with all Phase 2 env vars
- requirements.txt includes PyJWT, pwdlib[argon2], pyotp, aioredis, slowapi
- Test file for deps/auth.py passes