""" Document API tests — async only (Plan 05 cutover). Legacy sync tests (using the flat-file storage layer) were deleted in Plan 05. All tests here use async_client (httpx.AsyncClient + ASGITransport + in-memory SQLite). """ from __future__ import annotations import re import pytest @pytest.mark.xfail(strict=False, reason="POST /api/documents/upload removed in Plan 03-02 — replaced by upload-url + confirm flow") async def test_upload_txt_no_classify(async_client, sample_txt): """Legacy multipart upload test — endpoint removed in Plan 03-02 (D-04). Replaced by test_upload_url_endpoint + test_confirm_endpoint. """ with open(sample_txt, "rb") as f: resp = await async_client.post( "/api/documents/upload", files={"file": ("sample.txt", f, "text/plain")}, data={"auto_classify": "false"}, ) assert resp.status_code == 200 data = resp.json() assert data["original_name"] == "sample.txt" @pytest.mark.xfail(strict=False, reason="POST /api/documents/upload removed in Plan 03-02 — replaced by upload-url + confirm flow") async def test_upload_pdf_no_classify(async_client, sample_pdf): """Legacy multipart upload test — endpoint removed in Plan 03-02 (D-04).""" with open(sample_pdf, "rb") as f: resp = await async_client.post( "/api/documents/upload", files={"file": ("sample.pdf", f, "application/pdf")}, data={"auto_classify": "false"}, ) assert resp.status_code == 200 async def test_list_documents(async_client, auth_user): """GET /api/documents returns an empty list when no documents exist.""" resp = await async_client.get("/api/documents", headers=auth_user["headers"]) assert resp.status_code == 200 data = resp.json() assert data["total"] == 0 assert data["items"] == [] async def test_list_documents_filter_by_topic(async_client, auth_user, db_session): """GET /api/documents?topic=finance returns only matching documents.""" import uuid as _uuid from db.models import Document from services import storage # Create a document directly via ORM (bypasses the upload endpoint) doc_id = _uuid.uuid4() doc = Document( id=doc_id, user_id=auth_user["user"].id, filename="test.txt", content_type="text/plain", size_bytes=100, storage_backend="minio", status="uploaded", object_key=f"{auth_user['user'].id}/{doc_id}/{_uuid.uuid4()}.txt", ) db_session.add(doc) await db_session.commit() await storage.update_document_topics(db_session, str(doc_id), ["finance"]) resp = await async_client.get("/api/documents?topic=finance", headers=auth_user["headers"]) assert resp.json()["total"] == 1 resp2 = await async_client.get("/api/documents?topic=legal", headers=auth_user["headers"]) assert resp2.json()["total"] == 0 async def test_get_document(async_client, auth_user, db_session): """GET /api/documents/{id} returns metadata for an existing document.""" import uuid as _uuid from db.models import Document doc_id = _uuid.uuid4() doc = Document( id=doc_id, user_id=auth_user["user"].id, filename="test.txt", content_type="text/plain", size_bytes=100, storage_backend="minio", status="uploaded", object_key=f"{auth_user['user'].id}/{doc_id}/{_uuid.uuid4()}.txt", ) db_session.add(doc) await db_session.commit() resp = await async_client.get(f"/api/documents/{doc_id}", headers=auth_user["headers"]) assert resp.status_code == 200 assert resp.json()["id"] == str(doc_id) async def test_get_document_not_found(async_client, auth_user): resp = await async_client.get("/api/documents/nonexistent", headers=auth_user["headers"]) assert resp.status_code == 404 async def test_delete_document(async_client, auth_user, db_session, monkeypatch): """DELETE /api/documents/{id} removes the document.""" import uuid as _uuid from db.models import Document from unittest.mock import AsyncMock # Mock MinIO delete so we don't need a live MinIO monkeypatch.setattr("services.storage._backend", lambda: type("B", (), {"delete_object": AsyncMock()})()) doc_id = _uuid.uuid4() doc = Document( id=doc_id, user_id=auth_user["user"].id, filename="test.txt", content_type="text/plain", size_bytes=0, storage_backend="minio", status="uploaded", object_key=f"{auth_user['user'].id}/{doc_id}/{_uuid.uuid4()}.txt", ) db_session.add(doc) await db_session.commit() resp = await async_client.delete(f"/api/documents/{doc_id}", headers=auth_user["headers"]) assert resp.status_code == 200 assert resp.json()["success"] is True resp2 = await async_client.get(f"/api/documents/{doc_id}", headers=auth_user["headers"]) assert resp2.status_code == 404 async def test_delete_document_not_found(async_client, auth_user): resp = await async_client.delete("/api/documents/nonexistent", headers=auth_user["headers"]) assert resp.status_code == 404 @pytest.mark.xfail(strict=False, reason="POST /api/documents/upload removed in Plan 03-02 — replaced by upload-url + confirm flow") async def test_upload_empty_file(async_client): """Legacy empty file test — endpoint removed in Plan 03-02 (D-04).""" resp = await async_client.post( "/api/documents/upload", files={"file": ("empty.txt", b"", "text/plain")}, data={"auto_classify": "false"}, ) assert resp.status_code == 400 @pytest.mark.xfail(strict=False, reason="POST /api/documents/upload removed in Plan 03-02 — replaced by upload-url + confirm flow") async def test_upload_persists_to_postgres_and_minio(async_client, sample_txt): """Legacy upload+persist test — endpoint removed in Plan 03-02 (D-04). Replaced by the upload-url + confirm flow tested in test_upload_url_endpoint and test_confirm_endpoint. """ with open(sample_txt, "rb") as f: resp = await async_client.post( "/api/documents/upload", files={"file": ("sample.txt", f, "text/plain")}, data={"auto_classify": "false"}, ) assert resp.status_code == 200 # --------------------------------------------------------------------------- # Wave 0 xfail stubs for Phase 3 document endpoint tests — Plans 03-02 / 03-03 # --------------------------------------------------------------------------- async def test_upload_url_endpoint(async_client, auth_user, mock_minio_presigned): """POST /api/documents/upload-url returns {upload_url, document_id} and creates a Document row with status='pending'. D-05: two-step upload flow — step 1 creates the pending Document row and returns the presigned PUT URL (15-min TTL). Quota is NOT reserved here. """ resp = await async_client.post( "/api/documents/upload-url", json={"filename": "report.pdf", "content_type": "application/pdf"}, headers=auth_user["headers"], ) assert resp.status_code == 200, resp.text data = resp.json() assert "upload_url" in data, f"Missing upload_url: {data}" assert "document_id" in data, f"Missing document_id: {data}" assert "presigned" in data["upload_url"] or "localhost" in data["upload_url"], ( f"Expected a presigned URL: {data['upload_url']}" ) # Verify mock was called assert mock_minio_presigned.called, "generate_presigned_put_url was not called" async def test_confirm_endpoint( async_client, auth_user, mock_minio_presigned, mock_minio_stat, monkeypatch ): """POST /api/documents/{id}/confirm calls stat_object once, updates Document.size_bytes from the stat return value, sets Document.status='uploaded', and runs atomic quota. D-05: step 3 of the presigned upload flow. stat_object provides the authoritative file size (D-07). The atomic quota UPDATE runs unconditionally here (STORE-03, Plan 03-03+). SQLite note: The raw SQL quota UPDATE uses :uid in dashed UUID format, which does not match SQLite's CHAR(32) undashed storage. This test xfails on SQLite and xpasses on PostgreSQL (run with INTEGRATION=1). Same as test_quota.py pattern. """ from unittest.mock import MagicMock # Patch out the Celery delay call — no Redis in unit test environment mock_delay = MagicMock() monkeypatch.setattr("api.documents.extract_and_classify.delay", mock_delay) mock_minio_stat.return_value = 2048 # Step 1: get upload URL resp = await async_client.post( "/api/documents/upload-url", json={"filename": "doc.txt", "content_type": "text/plain"}, headers=auth_user["headers"], ) assert resp.status_code == 200, resp.text doc_id = resp.json()["document_id"] # Step 2: confirm — quota runs unconditionally (Plan 03-03+, no Wave 2 guard) conf_resp = await async_client.post( f"/api/documents/{doc_id}/confirm", headers=auth_user["headers"], ) assert conf_resp.status_code == 200, conf_resp.text conf_data = conf_resp.json() assert conf_data["id"] == doc_id assert conf_data["size_bytes"] == 2048 assert conf_data["status"] == "uploaded" # stat_object was called once assert mock_minio_stat.called, "stat_object was not called" # Celery task was dispatched assert mock_delay.called, "extract_and_classify.delay was not called" async def test_get_quota(async_client, auth_user): """GET /api/auth/me/quota returns {used_bytes: 0, limit_bytes: 104857600}. STORE-04: quota usage bar endpoint. Returns current usage and limit for the authenticated user. Newly created users start at used_bytes=0. """ resp = await async_client.get( "/api/auth/me/quota", headers=auth_user["headers"], ) assert resp.status_code == 200, resp.text data = resp.json() assert "used_bytes" in data, f"Missing used_bytes: {data}" assert "limit_bytes" in data, f"Missing limit_bytes: {data}" assert data["used_bytes"] == 0, f"Expected 0 used_bytes for new user: {data}" assert data["limit_bytes"] == 104_857_600, f"Expected 100 MB limit: {data}" async def test_cross_user_access_404(async_client, auth_user, db_session): """User B's request for GET /api/documents/{A_doc_id} returns 404. SEC-04: cross-user access returns 404 (not 403) to avoid information leakage (CONTEXT.md D-16). An attacker cannot distinguish between 'document does not exist' and 'document belongs to someone else'. """ import uuid as _uuid from db.models import Document, User, Quota from services.auth import hash_password, create_access_token # Create User A's document directly via ORM doc_id = _uuid.uuid4() doc = Document( id=doc_id, user_id=auth_user["user"].id, filename="user_a_doc.txt", content_type="text/plain", size_bytes=100, storage_backend="minio", status="uploaded", object_key=f"{auth_user['user'].id}/{doc_id}/{_uuid.uuid4()}.txt", ) db_session.add(doc) # Create User B user_b_id = _uuid.uuid4() user_b = User( id=user_b_id, handle=f"user_b_{user_b_id.hex[:8]}", email=f"user_b_{user_b_id.hex[:8]}@example.com", password_hash=hash_password("Testpassword123!"), role="user", is_active=True, password_must_change=False, ) quota_b = Quota(user_id=user_b_id, limit_bytes=104857600, used_bytes=0) db_session.add(user_b) db_session.add(quota_b) await db_session.commit() token_b = create_access_token(str(user_b_id), "user") headers_b = {"Authorization": f"Bearer {token_b}"} # User B attempts to access User A's document — must get 404 (not 403) resp = await async_client.get(f"/api/documents/{doc_id}", headers=headers_b) assert resp.status_code == 404, ( f"Expected 404 for cross-user access, got {resp.status_code}: {resp.text}" ) async def test_admin_cannot_access_documents(async_client, admin_user): """GET /api/documents using admin_user.headers returns 403. SEC-04 SC4: admin accounts cannot access document content (CLAUDE.md + CONTEXT.md D-16). The get_regular_user dependency enforces this for all /api/documents/* handlers. """ resp = await async_client.get("/api/documents", headers=admin_user["headers"]) assert resp.status_code == 403, ( f"Expected 403 for admin on document endpoints, got {resp.status_code}: {resp.text}" ) async def test_documents_require_auth(async_client): """Anonymous GET /api/documents (no Authorization header) returns 401 or 403. D-16: all /api/documents/* endpoints require authentication via get_current_user (Phase 2 D-07 fulfilled in Phase 3). """ resp = await async_client.get("/api/documents") assert resp.status_code in (401, 403), f"Expected 401 or 403, got {resp.status_code}" # --------------------------------------------------------------------------- # Phase 4 DOC-02 proxy / content-stream tests # --------------------------------------------------------------------------- async def test_content_stream_200(async_client, auth_user, db_session, monkeypatch): """GET /api/documents/{id}/content returns 200 with correct Content-Type and Content-Disposition: inline.""" import uuid as _uuid from unittest.mock import AsyncMock from db.models import Document from storage.minio_backend import MinIOBackend file_bytes = b"Hello, PDF content!" monkeypatch.setattr(MinIOBackend, "get_object", AsyncMock(return_value=file_bytes), raising=False) doc_id = _uuid.uuid4() doc = Document( id=doc_id, user_id=auth_user["user"].id, filename="test.pdf", content_type="application/pdf", size_bytes=len(file_bytes), storage_backend="minio", status="uploaded", object_key=f"{auth_user['user'].id}/{doc_id}/{_uuid.uuid4()}.pdf", ) db_session.add(doc) await db_session.commit() resp = await async_client.get( f"/api/documents/{doc_id}/content", headers=auth_user["headers"], ) assert resp.status_code == 200 assert resp.content == file_bytes assert resp.headers["content-type"].startswith("application/pdf") assert "inline" in resp.headers.get("content-disposition", "") async def test_content_stream_206_range(async_client, auth_user, db_session, monkeypatch): """GET /api/documents/{id}/content with Range header returns 206 and Content-Range header.""" import uuid as _uuid from unittest.mock import AsyncMock from db.models import Document from storage.minio_backend import MinIOBackend file_bytes = b"0123456789ABCDEF" # 16 bytes monkeypatch.setattr(MinIOBackend, "get_object", AsyncMock(return_value=file_bytes), raising=False) doc_id = _uuid.uuid4() doc = Document( id=doc_id, user_id=auth_user["user"].id, filename="test.pdf", content_type="application/pdf", size_bytes=len(file_bytes), storage_backend="minio", status="uploaded", object_key=f"{auth_user['user'].id}/{doc_id}/{_uuid.uuid4()}.pdf", ) db_session.add(doc) await db_session.commit() resp = await async_client.get( f"/api/documents/{doc_id}/content", headers={**auth_user["headers"], "Range": "bytes=0-7"}, ) assert resp.status_code == 206 assert resp.content == b"01234567" assert "content-range" in resp.headers assert resp.headers["content-range"] == "bytes 0-7/16" async def test_content_stream_admin_403(async_client, admin_user, db_session, monkeypatch): """GET /api/documents/{id}/content with admin JWT returns 403.""" import uuid as _uuid from unittest.mock import AsyncMock from db.models import Document from storage.minio_backend import MinIOBackend file_bytes = b"admin should not see this" monkeypatch.setattr(MinIOBackend, "get_object", AsyncMock(return_value=file_bytes), raising=False) doc_id = _uuid.uuid4() doc = Document( id=doc_id, user_id=admin_user["user"].id, filename="test.pdf", content_type="application/pdf", size_bytes=len(file_bytes), storage_backend="minio", status="uploaded", object_key=f"{admin_user['user'].id}/{doc_id}/{_uuid.uuid4()}.pdf", ) db_session.add(doc) await db_session.commit() resp = await async_client.get( f"/api/documents/{doc_id}/content", headers=admin_user["headers"], ) assert resp.status_code == 403 async def test_content_stream_no_presigned_url(async_client, auth_user, db_session, monkeypatch): """GET /api/documents/{id}/content response does not call presigned_get_url.""" import uuid as _uuid from unittest.mock import AsyncMock, MagicMock from db.models import Document from storage.minio_backend import MinIOBackend file_bytes = b"document content" get_object_mock = AsyncMock(return_value=file_bytes) presigned_mock = AsyncMock(return_value="http://minio/presigned?X-Amz-Signature=FAKE") monkeypatch.setattr(MinIOBackend, "get_object", get_object_mock, raising=False) monkeypatch.setattr(MinIOBackend, "presigned_get_url", presigned_mock, raising=False) doc_id = _uuid.uuid4() doc = Document( id=doc_id, user_id=auth_user["user"].id, filename="test.pdf", content_type="application/pdf", size_bytes=len(file_bytes), storage_backend="minio", status="uploaded", object_key=f"{auth_user['user'].id}/{doc_id}/{_uuid.uuid4()}.pdf", ) db_session.add(doc) await db_session.commit() resp = await async_client.get( f"/api/documents/{doc_id}/content", headers=auth_user["headers"], ) assert resp.status_code == 200 # presigned_get_url must NEVER be called presigned_mock.assert_not_called() # get_object must be called (direct fetch) get_object_mock.assert_called_once() async def test_content_stream_share_recipient_200(async_client, auth_user, admin_user, db_session, monkeypatch): """Share recipient can access document content via GET /api/documents/{id}/content.""" import uuid as _uuid from unittest.mock import AsyncMock from db.models import Document, Share from storage.minio_backend import MinIOBackend file_bytes = b"shared document content" monkeypatch.setattr(MinIOBackend, "get_object", AsyncMock(return_value=file_bytes), raising=False) # Create a regular user as recipient (use auth_user as recipient, admin_user as owner) # But we need two regular users; use auth_user as owner and create a second regular user import uuid as _uuid2 from db.models import User, Quota from services.auth import hash_password, create_access_token recipient_id = _uuid2.uuid4() recipient = User( id=recipient_id, handle=f"recipient_{recipient_id.hex[:8]}", email=f"recipient_{recipient_id.hex[:8]}@example.com", password_hash=hash_password("Testpassword123!"), role="user", is_active=True, password_must_change=False, ) recipient_quota = Quota(user_id=recipient_id, limit_bytes=104857600, used_bytes=0) db_session.add(recipient) db_session.add(recipient_quota) await db_session.flush() doc_id = _uuid.uuid4() doc = Document( id=doc_id, user_id=auth_user["user"].id, filename="shared.pdf", content_type="application/pdf", size_bytes=len(file_bytes), storage_backend="minio", status="uploaded", object_key=f"{auth_user['user'].id}/{doc_id}/{_uuid.uuid4()}.pdf", ) db_session.add(doc) await db_session.flush() share = Share( document_id=doc_id, owner_id=auth_user["user"].id, recipient_id=recipient_id, permission="view", ) db_session.add(share) await db_session.commit() recipient_token = create_access_token(str(recipient_id), "user") recipient_headers = {"Authorization": f"Bearer {recipient_token}"} resp = await async_client.get( f"/api/documents/{doc_id}/content", headers=recipient_headers, ) assert resp.status_code == 200 assert resp.content == file_bytes async def test_content_stream_not_found(async_client, auth_user): """GET /api/documents/{id}/content returns 404 for unknown document ID.""" import uuid as _uuid resp = await async_client.get( f"/api/documents/{_uuid.uuid4()}/content", headers=auth_user["headers"], ) assert resp.status_code == 404 async def test_content_stream_invalid_id(async_client, auth_user): """GET /api/documents/{id}/content returns 404 for invalid UUID.""" resp = await async_client.get( "/api/documents/not-a-uuid/content", headers=auth_user["headers"], ) assert resp.status_code == 404 async def test_parse_range_416(async_client, auth_user, db_session, monkeypatch): """GET /api/documents/{id}/content with invalid Range returns 416.""" import uuid as _uuid from unittest.mock import AsyncMock from db.models import Document from storage.minio_backend import MinIOBackend file_bytes = b"short" monkeypatch.setattr(MinIOBackend, "get_object", AsyncMock(return_value=file_bytes), raising=False) doc_id = _uuid.uuid4() doc = Document( id=doc_id, user_id=auth_user["user"].id, filename="test.pdf", content_type="application/pdf", size_bytes=len(file_bytes), storage_backend="minio", status="uploaded", object_key=f"{auth_user['user'].id}/{doc_id}/{_uuid.uuid4()}.pdf", ) db_session.add(doc) await db_session.commit() resp = await async_client.get( f"/api/documents/{doc_id}/content", headers={**auth_user["headers"], "Range": "bytes=100-200"}, ) assert resp.status_code == 416 async def test_stream_document_content_cloud_backend_error(async_client, auth_user, db_session, monkeypatch): """GET /api/documents/{id}/content returns 502 when cloud backend raises a non-CloudConnectionError exception. Plan 05-12 gap closure: broad except-clause catches RuntimeError, timeout, etc. and returns a user-friendly 502 instead of an opaque 500. """ import uuid as _uuid from unittest.mock import AsyncMock from db.models import Document doc_id = _uuid.uuid4() doc = Document( id=doc_id, user_id=auth_user["user"].id, filename="cloud_doc.pdf", content_type="application/pdf", size_bytes=1024, storage_backend="google_drive", status="uploaded", object_key=f"{auth_user['user'].id}/{doc_id}/{_uuid.uuid4()}.pdf", ) db_session.add(doc) await db_session.commit() async def raise_runtime_error(*args, **kwargs): raise RuntimeError("connection timeout") monkeypatch.setattr("api.documents.get_storage_backend_for_document", raise_runtime_error) resp = await async_client.get( f"/api/documents/{doc_id}/content", headers=auth_user["headers"], ) assert resp.status_code == 502, f"Expected 502, got {resp.status_code}: {resp.text}" assert "Cloud backend unreachable" in resp.json()["detail"] # --------------------------------------------------------------------------- # Phase 6.2 Wave 0 xfail stubs — cloud document delete # --------------------------------------------------------------------------- async def test_delete_cloud_document_propagates(async_client, auth_user, db_session): """DELETE /api/documents/{id} for a cloud doc calls cloud backend delete_object (D-01)""" pytest.xfail("Phase 6.2 — not implemented yet") async def test_delete_cloud_document_failure(async_client, auth_user, db_session): """DELETE /api/documents/{id} returns cloud_delete_failed=True when provider raises (D-03)""" pytest.xfail("Phase 6.2 — not implemented yet") async def test_delete_cloud_remove_only(async_client, auth_user, db_session): """DELETE /api/documents/{id}?remove_only=true skips cloud delete, removes DB row only (D-02)""" pytest.xfail("Phase 6.2 — not implemented yet")