a5994d9ff4
Includes planning artifacts (03-CONTEXT, 03-DISCUSSION-LOG, 03-02-SUMMARY), integration test script, MinIO/auth/docker fixes, and local dev account reference. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
294 lines
13 KiB
Python
294 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
DocuVault Phase 3 integration tests.
|
|
Runs against the live stack on localhost:8000 / localhost:9000.
|
|
No pytest required — just: python test_integration.py
|
|
"""
|
|
import io
|
|
import sys
|
|
import uuid
|
|
import httpx
|
|
|
|
BASE = "http://localhost:8000"
|
|
MINIO_PUBLIC = "http://localhost:9000"
|
|
|
|
PASS = "\033[32m✓\033[0m"
|
|
FAIL = "\033[31m✗\033[0m"
|
|
|
|
results = []
|
|
|
|
def check(name: str, ok: bool, detail: str = "") -> None:
|
|
results.append(ok)
|
|
icon = PASS if ok else FAIL
|
|
print(f" {icon} {name}" + (f" [{detail}]" if detail else ""))
|
|
if not ok:
|
|
print(f" DETAIL: {detail}")
|
|
|
|
def section(title: str) -> None:
|
|
print(f"\n{'─'*60}\n {title}\n{'─'*60}")
|
|
|
|
|
|
# ── helpers ──────────────────────────────────────────────────────────────────
|
|
|
|
def login(client: httpx.Client, email: str, password: str):
|
|
r = client.post("/api/auth/login", json={"email": email, "password": password})
|
|
if r.status_code == 200:
|
|
return r.json().get("access_token")
|
|
return None
|
|
|
|
def auth_headers(token: str) -> dict:
|
|
return {"Authorization": f"Bearer {token}"}
|
|
|
|
|
|
# ── 0. Health ─────────────────────────────────────────────────────────────────
|
|
|
|
section("0. Health")
|
|
with httpx.Client(base_url=BASE, timeout=10) as c:
|
|
r = c.get("/health")
|
|
check("GET /health returns 200", r.status_code == 200)
|
|
body = r.json()
|
|
check("postgres healthy", body.get("checks", {}).get("postgres") == "ok", str(body))
|
|
check("minio healthy", body.get("checks", {}).get("minio") == "ok", str(body))
|
|
|
|
|
|
# ── 1. Admin login ────────────────────────────────────────────────────────────
|
|
|
|
section("1. Admin login")
|
|
with httpx.Client(base_url=BASE, timeout=10) as c:
|
|
admin_token = login(c, "admin@docuvault.example", "Admin1234!")
|
|
check("Admin login succeeds", admin_token is not None)
|
|
|
|
if admin_token:
|
|
r = c.get("/api/auth/me", headers=auth_headers(admin_token))
|
|
check("GET /api/auth/me role=admin", r.json().get("role") == "admin", str(r.json()))
|
|
|
|
|
|
# ── 2. Register regular user ──────────────────────────────────────────────────
|
|
|
|
section("2. Register regular user")
|
|
user_email = f"testuser_{uuid.uuid4().hex[:6]}@example.com"
|
|
_uid = uuid.uuid4().hex[:12]
|
|
user_password = f"Dv!{_uid}Z9x#" # unique per run — won't appear in HIBP
|
|
|
|
with httpx.Client(base_url=BASE, timeout=10) as c:
|
|
r = c.post("/api/auth/register", json={
|
|
"email": user_email,
|
|
"password": user_password,
|
|
"handle": f"tester_{uuid.uuid4().hex[:4]}",
|
|
})
|
|
check("POST /api/auth/register returns 201",
|
|
r.status_code == 201, f"{r.status_code} {r.text[:120]}")
|
|
|
|
user_token = login(c, user_email, user_password)
|
|
check("Regular user login succeeds", user_token is not None)
|
|
|
|
if user_token:
|
|
me = c.get("/api/auth/me", headers=auth_headers(user_token)).json()
|
|
check("Role is 'user'", me.get("role") == "user", str(me))
|
|
user_id = me.get("id")
|
|
|
|
|
|
# ── 3. Quota endpoint ─────────────────────────────────────────────────────────
|
|
|
|
section("3. Quota")
|
|
with httpx.Client(base_url=BASE, timeout=10) as c:
|
|
if user_token:
|
|
r = c.get("/api/auth/me/quota", headers=auth_headers(user_token))
|
|
check("GET /api/auth/me/quota returns 200", r.status_code == 200, r.text[:120])
|
|
q = r.json()
|
|
check("used_bytes starts at 0", q.get("used_bytes") == 0, str(q))
|
|
check("limit_bytes is 100 MB", q.get("limit_bytes") == 104857600, str(q))
|
|
|
|
# Admin cannot read quota (document endpoints blocked for admin)
|
|
if admin_token:
|
|
r = c.get("/api/auth/me/quota", headers=auth_headers(admin_token))
|
|
# Admin has a quota row too (they're a user in the DB), so 200 is also acceptable
|
|
check("GET /api/auth/me/quota returns 200 or 404", r.status_code in (200, 404), r.text)
|
|
|
|
|
|
# ── 4. Presigned upload flow ──────────────────────────────────────────────────
|
|
|
|
section("4. Three-step presigned upload")
|
|
doc_id = None
|
|
with httpx.Client(base_url=BASE, timeout=15) as c:
|
|
if not user_token:
|
|
print(" SKIP — no user token")
|
|
else:
|
|
# Step 1: get upload URL
|
|
r = c.post("/api/documents/upload-url",
|
|
headers=auth_headers(user_token),
|
|
json={"filename": "test.txt", "content_type": "text/plain"})
|
|
check("POST /api/documents/upload-url returns 200",
|
|
r.status_code == 200, f"{r.status_code} {r.text[:120]}")
|
|
|
|
if r.status_code == 200:
|
|
body = r.json()
|
|
upload_url = body.get("upload_url", "")
|
|
doc_id = body.get("document_id")
|
|
check("upload_url present", bool(upload_url), upload_url[:60] if upload_url else "missing")
|
|
check("document_id present", bool(doc_id), str(doc_id))
|
|
|
|
# Step 2: PUT file bytes directly to MinIO via presigned URL.
|
|
# The URL may contain the Docker-internal host (minio:9000); rewrite
|
|
# to localhost:9000 for the TCP connection but keep minio:9000 in the
|
|
# Host header so the HMAC signature remains valid.
|
|
file_content = b"Hello DocuVault integration test! " * 100 # ~3.4 KB
|
|
try:
|
|
put_url = upload_url
|
|
extra_headers = {"Content-Type": "text/plain"}
|
|
if "minio:9000" in put_url:
|
|
extra_headers["Host"] = "minio:9000"
|
|
put_url = put_url.replace("http://minio:9000", "http://localhost:9000", 1)
|
|
put_r = httpx.put(put_url, content=file_content,
|
|
headers=extra_headers, timeout=15)
|
|
check("PUT to MinIO presigned URL succeeds",
|
|
put_r.status_code in (200, 204),
|
|
f"{put_r.status_code} {put_r.text[:60]}")
|
|
except Exception as e:
|
|
check("PUT to MinIO presigned URL succeeds", False, str(e))
|
|
doc_id = None
|
|
|
|
# Step 3: confirm
|
|
if doc_id:
|
|
r2 = c.post(f"/api/documents/{doc_id}/confirm",
|
|
headers=auth_headers(user_token))
|
|
check("POST /api/documents/{id}/confirm returns 200",
|
|
r2.status_code == 200, f"{r2.status_code} {r2.text[:120]}")
|
|
if r2.status_code == 200:
|
|
conf = r2.json()
|
|
check("confirm returns size_bytes > 0",
|
|
conf.get("size_bytes", 0) > 0, str(conf))
|
|
check("confirm status=uploaded",
|
|
conf.get("status") == "uploaded", str(conf))
|
|
|
|
|
|
# ── 5. Quota updated after upload ─────────────────────────────────────────────
|
|
|
|
section("5. Quota updated after upload")
|
|
with httpx.Client(base_url=BASE, timeout=10) as c:
|
|
if user_token and doc_id:
|
|
q = c.get("/api/auth/me/quota", headers=auth_headers(user_token)).json()
|
|
check("used_bytes > 0 after upload", q.get("used_bytes", 0) > 0, str(q))
|
|
|
|
|
|
# ── 6. Document list + ownership ─────────────────────────────────────────────
|
|
|
|
section("6. Document list and ownership isolation")
|
|
with httpx.Client(base_url=BASE, timeout=10) as c:
|
|
if user_token:
|
|
r = c.get("/api/documents", headers=auth_headers(user_token))
|
|
check("GET /api/documents returns 200", r.status_code == 200, r.text[:60])
|
|
items = r.json() if r.status_code == 200 else []
|
|
if isinstance(items, dict):
|
|
items = items.get("items", items.get("documents", []))
|
|
check("Document list contains uploaded doc", len(items) >= 1, f"count={len(items)}")
|
|
|
|
# Second user cannot see first user's document
|
|
if doc_id:
|
|
user2_email = f"user2_{uuid.uuid4().hex[:6]}@example.com"
|
|
c.post("/api/auth/register", json={
|
|
"email": user2_email,
|
|
"password": user_password,
|
|
"handle": f"u2_{uuid.uuid4().hex[:4]}",
|
|
})
|
|
tok2 = login(c, user2_email, user_password)
|
|
if tok2:
|
|
r = c.get(f"/api/documents/{doc_id}", headers=auth_headers(tok2))
|
|
check("Cross-user GET returns 404 (SEC-04)", r.status_code == 404,
|
|
f"got {r.status_code}")
|
|
|
|
|
|
# ── 7. Admin blocked from documents ──────────────────────────────────────────
|
|
|
|
section("7. Admin 403 on document endpoints (SEC-04 / SC4)")
|
|
with httpx.Client(base_url=BASE, timeout=10) as c:
|
|
if admin_token:
|
|
r = c.get("/api/documents", headers=auth_headers(admin_token))
|
|
check("Admin GET /api/documents returns 403",
|
|
r.status_code == 403, f"got {r.status_code}")
|
|
|
|
r = c.post("/api/documents/upload-url",
|
|
headers=auth_headers(admin_token),
|
|
json={"filename": "x.txt", "content_type": "text/plain"})
|
|
check("Admin POST /api/documents/upload-url returns 403",
|
|
r.status_code == 403, f"got {r.status_code}")
|
|
|
|
|
|
# ── 8. Topics namespace ───────────────────────────────────────────────────────
|
|
|
|
section("8. Topics namespace isolation (DOC-04)")
|
|
with httpx.Client(base_url=BASE, timeout=10) as c:
|
|
if admin_token:
|
|
# Admin creates a system topic
|
|
r = c.post("/api/admin/topics",
|
|
headers=auth_headers(admin_token),
|
|
json={"name": "System Topic", "description": "visible to all"})
|
|
check("Admin POST /api/admin/topics returns 201",
|
|
r.status_code == 201, f"{r.status_code} {r.text[:80]}")
|
|
|
|
if user_token:
|
|
# User can see system topic
|
|
r = c.get("/api/topics", headers=auth_headers(user_token))
|
|
check("GET /api/topics returns 200", r.status_code == 200, r.text[:60])
|
|
topics = r.json() if r.status_code == 200 else []
|
|
if isinstance(topics, dict):
|
|
topics = topics.get("items", topics.get("topics", []))
|
|
system_visible = any(t.get("name") == "System Topic" for t in topics)
|
|
check("System topic visible to regular user", system_visible, str([t.get("name") for t in topics]))
|
|
|
|
# User creates own topic
|
|
r2 = c.post("/api/topics",
|
|
headers=auth_headers(user_token),
|
|
json={"name": "My Topic", "color": "#ff0000"})
|
|
check("User POST /api/topics returns 200 or 201",
|
|
r2.status_code in (200, 201), f"{r2.status_code} {r2.text[:80]}")
|
|
|
|
|
|
# ── 9. Unauthenticated blocked ────────────────────────────────────────────────
|
|
|
|
section("9. Unauthenticated requests blocked")
|
|
with httpx.Client(base_url=BASE, timeout=10) as c:
|
|
r = c.get("/api/documents")
|
|
check("GET /api/documents without token returns 401 or 403",
|
|
r.status_code in (401, 403), f"got {r.status_code}")
|
|
|
|
r = c.get("/api/topics")
|
|
check("GET /api/topics without token returns 401 or 403",
|
|
r.status_code in (401, 403), f"got {r.status_code}")
|
|
|
|
|
|
# ── 10. Settings endpoint removed ─────────────────────────────────────────────
|
|
|
|
section("10. /api/settings removed (D-12)")
|
|
with httpx.Client(base_url=BASE, timeout=10) as c:
|
|
r = c.get("/api/settings")
|
|
check("GET /api/settings returns 404", r.status_code == 404, f"got {r.status_code}")
|
|
|
|
|
|
# ── 11. Quota delete decrement ────────────────────────────────────────────────
|
|
|
|
section("11. Quota decrements on document delete (STORE-06)")
|
|
with httpx.Client(base_url=BASE, timeout=10) as c:
|
|
if user_token and doc_id:
|
|
q_before = c.get("/api/auth/me/quota", headers=auth_headers(user_token)).json()
|
|
used_before = q_before.get("used_bytes", 0)
|
|
|
|
r = c.delete(f"/api/documents/{doc_id}", headers=auth_headers(user_token))
|
|
check("DELETE /api/documents/{id} returns 200 or 204",
|
|
r.status_code in (200, 204), f"{r.status_code} {r.text[:60]}")
|
|
|
|
q_after = c.get("/api/auth/me/quota", headers=auth_headers(user_token)).json()
|
|
used_after = q_after.get("used_bytes", 0)
|
|
check("used_bytes decreased after delete",
|
|
used_after < used_before, f"{used_before} → {used_after}")
|
|
|
|
|
|
# ── Summary ───────────────────────────────────────────────────────────────────
|
|
|
|
passed = sum(results)
|
|
total = len(results)
|
|
print(f"\n{'═'*60}")
|
|
print(f" {'PASS' if passed == total else 'FAIL'} {passed}/{total} checks passed")
|
|
print(f"{'═'*60}\n")
|
|
sys.exit(0 if passed == total else 1)
|