d98e3ab7a1
8 test files, 60 new tests (14 backend + 46 frontend). All green. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
183 lines
6.3 KiB
Python
183 lines
6.3 KiB
Python
"""
|
|
SEC-05: Every API response must include the mandatory security headers.
|
|
|
|
Tests verify that SecurityHeadersMiddleware injects:
|
|
- Content-Security-Policy
|
|
- X-Frame-Options: DENY
|
|
- X-Content-Type-Options: nosniff
|
|
|
|
These headers must be present on every response — GET and POST alike,
|
|
authenticated and unauthenticated.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime, timezone
|
|
|
|
import pytest
|
|
import pytest_asyncio
|
|
from httpx import ASGITransport, AsyncClient
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
|
|
# ── FakeRedis (needed by login endpoint) ─────────────────────────────────────
|
|
|
|
class _FakeRedis:
|
|
"""Minimal in-process Redis stand-in for login rate-limit checks."""
|
|
|
|
def __init__(self):
|
|
self._store: dict = {}
|
|
|
|
async def get(self, key):
|
|
entry = self._store.get(key)
|
|
if entry is None:
|
|
return None
|
|
val, exp = entry
|
|
if exp is not None and datetime.now(timezone.utc).timestamp() > exp:
|
|
del self._store[key]
|
|
return None
|
|
return val
|
|
|
|
async def incr(self, key):
|
|
entry = self._store.get(key)
|
|
if entry is None:
|
|
self._store[key] = (1, None)
|
|
return 1
|
|
val, exp = entry
|
|
new_val = val + 1
|
|
self._store[key] = (new_val, exp)
|
|
return new_val
|
|
|
|
async def expire(self, key, seconds):
|
|
if key in self._store:
|
|
val, _ = self._store[key]
|
|
self._store[key] = (val, datetime.now(timezone.utc).timestamp() + seconds)
|
|
|
|
async def set(self, key, value, ex=None):
|
|
deadline = None
|
|
if ex is not None:
|
|
deadline = datetime.now(timezone.utc).timestamp() + ex
|
|
self._store[key] = (value, deadline)
|
|
|
|
async def close(self):
|
|
pass
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def headers_client(db_session: AsyncSession):
|
|
"""Async test client with DB override AND FakeRedis on app.state.redis."""
|
|
from deps.db import get_db
|
|
from main import app
|
|
from api.auth import limiter as auth_limiter
|
|
|
|
app.dependency_overrides[get_db] = lambda: db_session
|
|
app.state.redis = _FakeRedis()
|
|
|
|
try:
|
|
auth_limiter._storage.reset()
|
|
except Exception:
|
|
pass
|
|
|
|
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c:
|
|
yield c
|
|
|
|
app.dependency_overrides.clear()
|
|
|
|
|
|
# ── Helpers ───────────────────────────────────────────────────────────────────
|
|
|
|
REQUIRED_HEADERS = {
|
|
"content-security-policy": None, # any value — presence is the invariant
|
|
"x-frame-options": "DENY",
|
|
"x-content-type-options": "nosniff",
|
|
}
|
|
|
|
|
|
def _assert_security_headers(response) -> None:
|
|
"""Assert all required security headers are present with correct values."""
|
|
for header, expected_value in REQUIRED_HEADERS.items():
|
|
actual = response.headers.get(header)
|
|
assert actual is not None, (
|
|
f"Missing security header '{header}' — SEC-05 requires it on every response. "
|
|
f"Status: {response.status_code}, URL: {response.url}"
|
|
)
|
|
if expected_value is not None:
|
|
assert actual == expected_value, (
|
|
f"Security header '{header}' has wrong value: "
|
|
f"expected '{expected_value}', got '{actual}'"
|
|
)
|
|
|
|
|
|
# ── Tests ─────────────────────────────────────────────────────────────────────
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_security_headers_on_unauthenticated_get(async_client):
|
|
"""GET /api/auth/me (unauthenticated) response carries all three SEC-05 headers."""
|
|
resp = await async_client.get("/api/auth/me")
|
|
# Endpoint should 401 — we only care about headers, not status code
|
|
assert resp.status_code in (401, 403), (
|
|
f"Expected auth error, got {resp.status_code}"
|
|
)
|
|
_assert_security_headers(resp)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_security_headers_on_post_endpoint(headers_client):
|
|
"""POST /api/auth/login response carries all three SEC-05 headers."""
|
|
resp = await headers_client.post(
|
|
"/api/auth/login",
|
|
json={"email": "nobody@example.com", "password": "wrongpassword"},
|
|
)
|
|
# Should 401 or 429 — we care about the security headers, not auth outcome
|
|
_assert_security_headers(resp)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_csp_header_contains_default_src(async_client):
|
|
"""CSP header must include 'default-src' directive."""
|
|
resp = await async_client.get("/api/auth/me")
|
|
csp = resp.headers.get("content-security-policy", "")
|
|
assert "default-src" in csp, (
|
|
f"CSP header is present but missing 'default-src' directive. Got: '{csp}'"
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_x_frame_options_is_deny(async_client):
|
|
"""X-Frame-Options must be exactly 'DENY', not SAMEORIGIN or any other value."""
|
|
resp = await async_client.get("/api/auth/me")
|
|
val = resp.headers.get("x-frame-options", "")
|
|
assert val == "DENY", (
|
|
f"X-Frame-Options must be 'DENY' for SEC-05, got '{val}'"
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_security_headers_on_authenticated_route(headers_client, db_session):
|
|
"""GET /api/auth/me with valid token also returns the three SEC-05 headers."""
|
|
import uuid as _uuid
|
|
from db.models import User, Quota
|
|
from services.auth import hash_password, create_access_token
|
|
|
|
user_id = _uuid.uuid4()
|
|
user = User(
|
|
id=user_id,
|
|
handle=f"hdr_user_{user_id.hex[:6]}",
|
|
email=f"hdr_{user_id.hex[:6]}@example.com",
|
|
password_hash=hash_password("StrongPass12!"),
|
|
role="user",
|
|
is_active=True,
|
|
password_must_change=False,
|
|
)
|
|
quota = Quota(user_id=user_id, limit_bytes=104857600, used_bytes=0)
|
|
db_session.add(user)
|
|
db_session.add(quota)
|
|
await db_session.commit()
|
|
|
|
token = create_access_token(str(user_id), "user")
|
|
resp = await headers_client.get(
|
|
"/api/auth/me",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
)
|
|
assert resp.status_code == 200, f"Expected 200, got {resp.status_code}: {resp.text}"
|
|
_assert_security_headers(resp)
|