""" Document API tests. Sync tests (top section) — test current flat-file behavior; remain until Plan 05 cuts over. Async tests (bottom section, _async suffix) — xfail scaffolds for Plan 05 PostgreSQL+MinIO layer. """ from __future__ import annotations import re import pytest def test_upload_txt_no_classify(client, sample_txt): with open(sample_txt, "rb") as f: resp = client.post( "/api/documents/upload", files={"file": ("sample.txt", f, "text/plain")}, data={"auto_classify": "false"}, ) assert resp.status_code == 200 data = resp.json() assert data["original_name"] == "sample.txt" assert "extracted_text" in data assert "invoices" in data["extracted_text"].lower() or len(data["extracted_text"]) > 0 assert data["topics"] == [] assert "id" in data def test_upload_pdf_no_classify(client, sample_pdf): with open(sample_pdf, "rb") as f: resp = client.post( "/api/documents/upload", files={"file": ("sample.pdf", f, "application/pdf")}, data={"auto_classify": "false"}, ) assert resp.status_code == 200 data = resp.json() assert data["mime_type"] == "application/pdf" assert len(data["extracted_text"]) > 0 def test_list_documents(client, sample_txt): with open(sample_txt, "rb") as f: client.post( "/api/documents/upload", files={"file": ("a.txt", f, "text/plain")}, data={"auto_classify": "false"}, ) resp = client.get("/api/documents") assert resp.status_code == 200 data = resp.json() assert data["total"] == 1 assert len(data["items"]) == 1 def test_list_documents_filter_by_topic(client, sample_txt): with open(sample_txt, "rb") as f: upload = client.post( "/api/documents/upload", files={"file": ("a.txt", f, "text/plain")}, data={"auto_classify": "false"}, ).json() import services.storage as st st.update_document_topics(upload["id"], ["finance"]) resp = client.get("/api/documents?topic=finance") assert resp.json()["total"] == 1 resp2 = client.get("/api/documents?topic=legal") assert resp2.json()["total"] == 0 def test_get_document(client, sample_txt): with open(sample_txt, "rb") as f: upload = client.post( "/api/documents/upload", files={"file": ("a.txt", f, "text/plain")}, data={"auto_classify": "false"}, ).json() resp = client.get(f"/api/documents/{upload['id']}") assert resp.status_code == 200 assert resp.json()["id"] == upload["id"] def test_get_document_not_found(client): resp = client.get("/api/documents/nonexistent") assert resp.status_code == 404 def test_delete_document(client, sample_txt): with open(sample_txt, "rb") as f: upload = client.post( "/api/documents/upload", files={"file": ("a.txt", f, "text/plain")}, data={"auto_classify": "false"}, ).json() resp = client.delete(f"/api/documents/{upload['id']}") assert resp.status_code == 200 assert resp.json()["success"] is True resp2 = client.get(f"/api/documents/{upload['id']}") assert resp2.status_code == 404 def test_delete_document_not_found(client): resp = client.delete("/api/documents/nonexistent") assert resp.status_code == 404 def test_upload_empty_file(client): resp = client.post( "/api/documents/upload", files={"file": ("empty.txt", b"", "text/plain")}, data={"auto_classify": "false"}, ) assert resp.status_code == 400 # ── Async port (Plan 05 cutover) ───────────────────────────────────────────── # Each test below is an async version of the corresponding sync test above. # They use async_client (httpx.AsyncClient + ASGITransport) and are marked # xfail until Plan 05 completes the PostgreSQL+MinIO storage rewrite. # ───────────────────────────────────────────────────────────────────────────── @pytest.mark.xfail(strict=False, reason="async storage layer implemented in plan 05") async def test_upload_txt_no_classify_async(async_client, sample_txt): with open(sample_txt, "rb") as f: resp = await async_client.post( "/api/documents/upload", files={"file": ("sample.txt", f, "text/plain")}, data={"auto_classify": "false"}, ) assert resp.status_code == 200 data = resp.json() assert data["original_name"] == "sample.txt" assert "extracted_text" in data assert "invoices" in data["extracted_text"].lower() or len(data["extracted_text"]) > 0 assert data["topics"] == [] assert "id" in data @pytest.mark.xfail(strict=False, reason="async storage layer implemented in plan 05") async def test_upload_pdf_no_classify_async(async_client, sample_pdf): with open(sample_pdf, "rb") as f: resp = await async_client.post( "/api/documents/upload", files={"file": ("sample.pdf", f, "application/pdf")}, data={"auto_classify": "false"}, ) assert resp.status_code == 200 data = resp.json() assert data["mime_type"] == "application/pdf" assert len(data["extracted_text"]) > 0 @pytest.mark.xfail(strict=False, reason="async storage layer implemented in plan 05") async def test_list_documents_async(async_client, sample_txt): with open(sample_txt, "rb") as f: await async_client.post( "/api/documents/upload", files={"file": ("a.txt", f, "text/plain")}, data={"auto_classify": "false"}, ) resp = await async_client.get("/api/documents") assert resp.status_code == 200 data = resp.json() assert data["total"] == 1 assert len(data["items"]) == 1 @pytest.mark.xfail(strict=False, reason="async storage layer implemented in plan 05") async def test_list_documents_filter_by_topic_async(async_client, db_session, sample_txt): with open(sample_txt, "rb") as f: upload = (await async_client.post( "/api/documents/upload", files={"file": ("a.txt", f, "text/plain")}, data={"auto_classify": "false"}, )).json() # Update topics via direct SQL on db_session (replaces flat-file call) try: from sqlalchemy import update from db.models import Document import uuid await db_session.execute( update(Document) .where(Document.id == uuid.UUID(upload["id"])) .values(topics=["finance"]) ) await db_session.commit() except ImportError: pytest.skip("db.models not yet implemented — plan 03") resp = await async_client.get("/api/documents?topic=finance") assert resp.json()["total"] == 1 resp2 = await async_client.get("/api/documents?topic=legal") assert resp2.json()["total"] == 0 @pytest.mark.xfail(strict=False, reason="async storage layer implemented in plan 05") async def test_get_document_async(async_client, sample_txt): with open(sample_txt, "rb") as f: upload = (await async_client.post( "/api/documents/upload", files={"file": ("a.txt", f, "text/plain")}, data={"auto_classify": "false"}, )).json() resp = await async_client.get(f"/api/documents/{upload['id']}") assert resp.status_code == 200 assert resp.json()["id"] == upload["id"] @pytest.mark.xfail(strict=False, reason="async storage layer implemented in plan 05") async def test_get_document_not_found_async(async_client): resp = await async_client.get("/api/documents/nonexistent") assert resp.status_code == 404 @pytest.mark.xfail(strict=False, reason="async storage layer implemented in plan 05") async def test_delete_document_async(async_client, sample_txt): with open(sample_txt, "rb") as f: upload = (await async_client.post( "/api/documents/upload", files={"file": ("a.txt", f, "text/plain")}, data={"auto_classify": "false"}, )).json() resp = await async_client.delete(f"/api/documents/{upload['id']}") assert resp.status_code == 200 assert resp.json()["success"] is True resp2 = await async_client.get(f"/api/documents/{upload['id']}") assert resp2.status_code == 404 @pytest.mark.xfail(strict=False, reason="async storage layer implemented in plan 05") async def test_delete_document_not_found_async(async_client): resp = await async_client.delete("/api/documents/nonexistent") assert resp.status_code == 404 @pytest.mark.xfail(strict=False, reason="async storage layer implemented in plan 05") async def test_upload_empty_file_async(async_client): resp = await async_client.post( "/api/documents/upload", files={"file": ("empty.txt", b"", "text/plain")}, data={"auto_classify": "false"}, ) assert resp.status_code == 400 @pytest.mark.xfail(strict=False, reason="async storage layer implemented in plan 05") async def test_upload_persists_to_postgres_and_minio_async(async_client, sample_txt): """After a successful upload, document is persisted and queryable via GET (STORE-01, STORE-02).""" with open(sample_txt, "rb") as f: resp = await async_client.post( "/api/documents/upload", files={"file": ("sample.txt", f, "text/plain")}, data={"auto_classify": "false"}, ) assert resp.status_code == 200 data = resp.json() # Response must include a UUID-format id uuid_pattern = re.compile( r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$' ) assert "id" in data, "Upload response missing 'id'" assert uuid_pattern.match(data["id"]), f"id '{data['id']}' is not a UUID" # Metadata round-trips via GET doc_id = data["id"] get_resp = await async_client.get(f"/api/documents/{doc_id}") assert get_resp.status_code == 200 get_data = get_resp.json() assert get_data["original_name"] == "sample.txt"