Files
kite/backend/tests/test_documents.py
T
curo1305 d856a2eaa9 test(01-02): extend test_health.py and port test_documents.py to async client
test_health.py:
  - Keep existing test_health(client) sync test unchanged (Plan 01 baseline)
  - Add test_health_checks_postgres_and_minio(async_client) xfail scaffold
    for extended /health response with postgres+minio checks (Plan 05, D-07)

test_documents.py:
  - Keep all 9 existing sync tests verbatim
  - Add async ports (_async suffix) for each: 9 xfail tests using async_client
  - Add test_upload_persists_to_postgres_and_minio_async (UUID id + GET
    round-trip assertion) — xfail until Plan 05 storage rewrite
  - Total: 10 new xfail async tests, 9 sync tests unchanged
2026-05-22 09:08:05 +02:00

282 lines
10 KiB
Python

"""
Document API tests.
Sync tests (top section) — test current flat-file behavior; remain until Plan 05 cuts over.
Async tests (bottom section, _async suffix) — xfail scaffolds for Plan 05 PostgreSQL+MinIO layer.
"""
from __future__ import annotations
import re
import pytest
def test_upload_txt_no_classify(client, sample_txt):
with open(sample_txt, "rb") as f:
resp = client.post(
"/api/documents/upload",
files={"file": ("sample.txt", f, "text/plain")},
data={"auto_classify": "false"},
)
assert resp.status_code == 200
data = resp.json()
assert data["original_name"] == "sample.txt"
assert "extracted_text" in data
assert "invoices" in data["extracted_text"].lower() or len(data["extracted_text"]) > 0
assert data["topics"] == []
assert "id" in data
def test_upload_pdf_no_classify(client, sample_pdf):
with open(sample_pdf, "rb") as f:
resp = client.post(
"/api/documents/upload",
files={"file": ("sample.pdf", f, "application/pdf")},
data={"auto_classify": "false"},
)
assert resp.status_code == 200
data = resp.json()
assert data["mime_type"] == "application/pdf"
assert len(data["extracted_text"]) > 0
def test_list_documents(client, sample_txt):
with open(sample_txt, "rb") as f:
client.post(
"/api/documents/upload",
files={"file": ("a.txt", f, "text/plain")},
data={"auto_classify": "false"},
)
resp = client.get("/api/documents")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 1
assert len(data["items"]) == 1
def test_list_documents_filter_by_topic(client, sample_txt):
with open(sample_txt, "rb") as f:
upload = client.post(
"/api/documents/upload",
files={"file": ("a.txt", f, "text/plain")},
data={"auto_classify": "false"},
).json()
import services.storage as st
st.update_document_topics(upload["id"], ["finance"])
resp = client.get("/api/documents?topic=finance")
assert resp.json()["total"] == 1
resp2 = client.get("/api/documents?topic=legal")
assert resp2.json()["total"] == 0
def test_get_document(client, sample_txt):
with open(sample_txt, "rb") as f:
upload = client.post(
"/api/documents/upload",
files={"file": ("a.txt", f, "text/plain")},
data={"auto_classify": "false"},
).json()
resp = client.get(f"/api/documents/{upload['id']}")
assert resp.status_code == 200
assert resp.json()["id"] == upload["id"]
def test_get_document_not_found(client):
resp = client.get("/api/documents/nonexistent")
assert resp.status_code == 404
def test_delete_document(client, sample_txt):
with open(sample_txt, "rb") as f:
upload = client.post(
"/api/documents/upload",
files={"file": ("a.txt", f, "text/plain")},
data={"auto_classify": "false"},
).json()
resp = client.delete(f"/api/documents/{upload['id']}")
assert resp.status_code == 200
assert resp.json()["success"] is True
resp2 = client.get(f"/api/documents/{upload['id']}")
assert resp2.status_code == 404
def test_delete_document_not_found(client):
resp = client.delete("/api/documents/nonexistent")
assert resp.status_code == 404
def test_upload_empty_file(client):
resp = client.post(
"/api/documents/upload",
files={"file": ("empty.txt", b"", "text/plain")},
data={"auto_classify": "false"},
)
assert resp.status_code == 400
# ── Async port (Plan 05 cutover) ─────────────────────────────────────────────
# Each test below is an async version of the corresponding sync test above.
# They use async_client (httpx.AsyncClient + ASGITransport) and are marked
# xfail until Plan 05 completes the PostgreSQL+MinIO storage rewrite.
# ─────────────────────────────────────────────────────────────────────────────
@pytest.mark.xfail(strict=False, reason="async storage layer implemented in plan 05")
async def test_upload_txt_no_classify_async(async_client, sample_txt):
with open(sample_txt, "rb") as f:
resp = await async_client.post(
"/api/documents/upload",
files={"file": ("sample.txt", f, "text/plain")},
data={"auto_classify": "false"},
)
assert resp.status_code == 200
data = resp.json()
assert data["original_name"] == "sample.txt"
assert "extracted_text" in data
assert "invoices" in data["extracted_text"].lower() or len(data["extracted_text"]) > 0
assert data["topics"] == []
assert "id" in data
@pytest.mark.xfail(strict=False, reason="async storage layer implemented in plan 05")
async def test_upload_pdf_no_classify_async(async_client, sample_pdf):
with open(sample_pdf, "rb") as f:
resp = await async_client.post(
"/api/documents/upload",
files={"file": ("sample.pdf", f, "application/pdf")},
data={"auto_classify": "false"},
)
assert resp.status_code == 200
data = resp.json()
assert data["mime_type"] == "application/pdf"
assert len(data["extracted_text"]) > 0
@pytest.mark.xfail(strict=False, reason="async storage layer implemented in plan 05")
async def test_list_documents_async(async_client, sample_txt):
with open(sample_txt, "rb") as f:
await async_client.post(
"/api/documents/upload",
files={"file": ("a.txt", f, "text/plain")},
data={"auto_classify": "false"},
)
resp = await async_client.get("/api/documents")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 1
assert len(data["items"]) == 1
@pytest.mark.xfail(strict=False, reason="async storage layer implemented in plan 05")
async def test_list_documents_filter_by_topic_async(async_client, db_session, sample_txt):
with open(sample_txt, "rb") as f:
upload = (await async_client.post(
"/api/documents/upload",
files={"file": ("a.txt", f, "text/plain")},
data={"auto_classify": "false"},
)).json()
# Update topics via direct SQL on db_session (replaces flat-file call)
try:
from sqlalchemy import update
from db.models import Document
import uuid
await db_session.execute(
update(Document)
.where(Document.id == uuid.UUID(upload["id"]))
.values(topics=["finance"])
)
await db_session.commit()
except ImportError:
pytest.skip("db.models not yet implemented — plan 03")
resp = await async_client.get("/api/documents?topic=finance")
assert resp.json()["total"] == 1
resp2 = await async_client.get("/api/documents?topic=legal")
assert resp2.json()["total"] == 0
@pytest.mark.xfail(strict=False, reason="async storage layer implemented in plan 05")
async def test_get_document_async(async_client, sample_txt):
with open(sample_txt, "rb") as f:
upload = (await async_client.post(
"/api/documents/upload",
files={"file": ("a.txt", f, "text/plain")},
data={"auto_classify": "false"},
)).json()
resp = await async_client.get(f"/api/documents/{upload['id']}")
assert resp.status_code == 200
assert resp.json()["id"] == upload["id"]
@pytest.mark.xfail(strict=False, reason="async storage layer implemented in plan 05")
async def test_get_document_not_found_async(async_client):
resp = await async_client.get("/api/documents/nonexistent")
assert resp.status_code == 404
@pytest.mark.xfail(strict=False, reason="async storage layer implemented in plan 05")
async def test_delete_document_async(async_client, sample_txt):
with open(sample_txt, "rb") as f:
upload = (await async_client.post(
"/api/documents/upload",
files={"file": ("a.txt", f, "text/plain")},
data={"auto_classify": "false"},
)).json()
resp = await async_client.delete(f"/api/documents/{upload['id']}")
assert resp.status_code == 200
assert resp.json()["success"] is True
resp2 = await async_client.get(f"/api/documents/{upload['id']}")
assert resp2.status_code == 404
@pytest.mark.xfail(strict=False, reason="async storage layer implemented in plan 05")
async def test_delete_document_not_found_async(async_client):
resp = await async_client.delete("/api/documents/nonexistent")
assert resp.status_code == 404
@pytest.mark.xfail(strict=False, reason="async storage layer implemented in plan 05")
async def test_upload_empty_file_async(async_client):
resp = await async_client.post(
"/api/documents/upload",
files={"file": ("empty.txt", b"", "text/plain")},
data={"auto_classify": "false"},
)
assert resp.status_code == 400
@pytest.mark.xfail(strict=False, reason="async storage layer implemented in plan 05")
async def test_upload_persists_to_postgres_and_minio_async(async_client, sample_txt):
"""After a successful upload, document is persisted and queryable via GET (STORE-01, STORE-02)."""
with open(sample_txt, "rb") as f:
resp = await async_client.post(
"/api/documents/upload",
files={"file": ("sample.txt", f, "text/plain")},
data={"auto_classify": "false"},
)
assert resp.status_code == 200
data = resp.json()
# Response must include a UUID-format id
uuid_pattern = re.compile(
r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$'
)
assert "id" in data, "Upload response missing 'id'"
assert uuid_pattern.match(data["id"]), f"id '{data['id']}' is not a UUID"
# Metadata round-trips via GET
doc_id = data["id"]
get_resp = await async_client.get(f"/api/documents/{doc_id}")
assert get_resp.status_code == 200
get_data = get_resp.json()
assert get_data["original_name"] == "sample.txt"