""" SEC-05: Every API response must include the mandatory security headers. Tests verify that SecurityHeadersMiddleware injects: - Content-Security-Policy - X-Frame-Options: DENY - X-Content-Type-Options: nosniff These headers must be present on every response — GET and POST alike, authenticated and unauthenticated. """ from __future__ import annotations from datetime import datetime, timezone import pytest import pytest_asyncio from httpx import ASGITransport, AsyncClient from sqlalchemy.ext.asyncio import AsyncSession # ── FakeRedis (needed by login endpoint) ───────────────────────────────────── class _FakeRedis: """Minimal in-process Redis stand-in for login rate-limit checks.""" def __init__(self): self._store: dict = {} async def get(self, key): entry = self._store.get(key) if entry is None: return None val, exp = entry if exp is not None and datetime.now(timezone.utc).timestamp() > exp: del self._store[key] return None return val async def incr(self, key): entry = self._store.get(key) if entry is None: self._store[key] = (1, None) return 1 val, exp = entry new_val = val + 1 self._store[key] = (new_val, exp) return new_val async def expire(self, key, seconds): if key in self._store: val, _ = self._store[key] self._store[key] = (val, datetime.now(timezone.utc).timestamp() + seconds) async def set(self, key, value, ex=None): deadline = None if ex is not None: deadline = datetime.now(timezone.utc).timestamp() + ex self._store[key] = (value, deadline) async def close(self): pass @pytest_asyncio.fixture async def headers_client(db_session: AsyncSession): """Async test client with DB override AND FakeRedis on app.state.redis.""" from deps.db import get_db from main import app from api.auth import limiter as auth_limiter app.dependency_overrides[get_db] = lambda: db_session app.state.redis = _FakeRedis() try: auth_limiter._storage.reset() except Exception: pass async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c: yield c app.dependency_overrides.clear() # ── Helpers ─────────────────────────────────────────────────────────────────── REQUIRED_HEADERS = { "content-security-policy": None, # any value — presence is the invariant "x-frame-options": "DENY", "x-content-type-options": "nosniff", } def _assert_security_headers(response) -> None: """Assert all required security headers are present with correct values.""" for header, expected_value in REQUIRED_HEADERS.items(): actual = response.headers.get(header) assert actual is not None, ( f"Missing security header '{header}' — SEC-05 requires it on every response. " f"Status: {response.status_code}, URL: {response.url}" ) if expected_value is not None: assert actual == expected_value, ( f"Security header '{header}' has wrong value: " f"expected '{expected_value}', got '{actual}'" ) # ── Tests ───────────────────────────────────────────────────────────────────── @pytest.mark.asyncio async def test_security_headers_on_unauthenticated_get(async_client): """GET /api/auth/me (unauthenticated) response carries all three SEC-05 headers.""" resp = await async_client.get("/api/auth/me") # Endpoint should 401 — we only care about headers, not status code assert resp.status_code in (401, 403), ( f"Expected auth error, got {resp.status_code}" ) _assert_security_headers(resp) @pytest.mark.asyncio async def test_security_headers_on_post_endpoint(headers_client): """POST /api/auth/login response carries all three SEC-05 headers.""" resp = await headers_client.post( "/api/auth/login", json={"email": "nobody@example.com", "password": "wrongpassword"}, ) # Should 401 or 429 — we care about the security headers, not auth outcome _assert_security_headers(resp) @pytest.mark.asyncio async def test_csp_header_contains_default_src(async_client): """CSP header must include 'default-src' directive.""" resp = await async_client.get("/api/auth/me") csp = resp.headers.get("content-security-policy", "") assert "default-src" in csp, ( f"CSP header is present but missing 'default-src' directive. Got: '{csp}'" ) @pytest.mark.asyncio async def test_x_frame_options_is_deny(async_client): """X-Frame-Options must be exactly 'DENY', not SAMEORIGIN or any other value.""" resp = await async_client.get("/api/auth/me") val = resp.headers.get("x-frame-options", "") assert val == "DENY", ( f"X-Frame-Options must be 'DENY' for SEC-05, got '{val}'" ) @pytest.mark.asyncio async def test_security_headers_on_authenticated_route(headers_client, db_session): """GET /api/auth/me with valid token also returns the three SEC-05 headers.""" import uuid as _uuid from db.models import User, Quota from services.auth import hash_password, create_access_token user_id = _uuid.uuid4() user = User( id=user_id, handle=f"hdr_user_{user_id.hex[:6]}", email=f"hdr_{user_id.hex[:6]}@example.com", password_hash=hash_password("StrongPass12!"), role="user", is_active=True, password_must_change=False, ) quota = Quota(user_id=user_id, limit_bytes=104857600, used_bytes=0) db_session.add(user) db_session.add(quota) await db_session.commit() token = create_access_token(str(user_id), "user") resp = await headers_client.get( "/api/auth/me", headers={"Authorization": f"Bearer {token}"}, ) assert resp.status_code == 200, f"Expected 200, got {resp.status_code}: {resp.text}" _assert_security_headers(resp)