From e23ed258c932c72ca67518fcafaf0665c6156a7c Mon Sep 17 00:00:00 2001 From: phact Date: Fri, 12 Sep 2025 11:54:28 -0400 Subject: [PATCH] improve tests --- src/config/settings.py | 23 +++ tests/integration/test_api_endpoints.py | 198 +++++++++++++---------- tests/integration/test_startup_ingest.py | 81 +++++----- 3 files changed, 172 insertions(+), 130 deletions(-) diff --git a/src/config/settings.py b/src/config/settings.py index ace9d5cb..dc9a6e23 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -297,6 +297,29 @@ class AppClients: return self + async def close(self): + """Close all client connections""" + try: + if hasattr(self, 'opensearch') and self.opensearch: + await self.opensearch.close() + self.opensearch = None + except Exception as e: + logger.warning("Error closing OpenSearch client", error=str(e)) + + try: + if hasattr(self, 'langflow_http_client') and self.langflow_http_client: + await self.langflow_http_client.aclose() + self.langflow_http_client = None + except Exception as e: + logger.warning("Error closing Langflow HTTP client", error=str(e)) + + try: + if hasattr(self, 'patched_async_client') and self.patched_async_client: + await self.patched_async_client.close() + self.patched_async_client = None + except Exception as e: + logger.warning("Error closing OpenAI client", error=str(e)) + async def ensure_langflow_client(self): """Ensure Langflow client exists; try to generate key and create client lazily.""" if self.langflow_client is not None: diff --git a/tests/integration/test_api_endpoints.py b/tests/integration/test_api_endpoints.py index e2ae3c18..60810563 100644 --- a/tests/integration/test_api_endpoints.py +++ b/tests/integration/test_api_endpoints.py @@ -60,79 +60,89 @@ async def test_upload_and_search_endpoint(tmp_path: Path, disable_langflow_inges await clients.initialize() try: await clients.opensearch.indices.delete(index=INDEX_NAME) + # Wait for deletion to complete + await asyncio.sleep(1) except Exception: pass app = await create_app() # Manually run startup tasks since httpx ASGI transport here doesn't manage lifespan await startup_tasks(app.state.services) + + # Verify index is truly empty after startup + try: + count_response = await clients.opensearch.count(index=INDEX_NAME) + doc_count = count_response.get('count', 0) + assert doc_count == 0, f"Index should be empty after startup but contains {doc_count} documents" + except Exception as e: + # If count fails, the index might not exist yet, which is fine + pass transport = httpx.ASGITransport(app=app) - async with httpx.AsyncClient(transport=transport, base_url="http://testserver") as client: - # Wait for app + OpenSearch readiness using existing endpoints - await wait_for_service_ready(client) - - # Create a temporary markdown file to upload - file_path = tmp_path / "endpoint_test_doc.md" - file_text = ( - "# Single Test Document\n\n" - "This is a test document about OpenRAG testing framework. " - "The content should be indexed and searchable in OpenSearch after processing." - ) - file_path.write_text(file_text) - - # POST via router (multipart) - files = { - "file": ( - file_path.name, - file_path.read_bytes(), - "text/markdown", - ) - } - upload_resp = await client.post("/upload", files=files) - body = upload_resp.json() - # Router now returns 201 + task_id (async) regardless of mode - assert upload_resp.status_code == 201, upload_resp.text - assert isinstance(body.get("task_id"), str) - - # Poll search for the specific content until it's indexed - async def _wait_for_indexed(timeout_s: float = 30.0): - deadline = asyncio.get_event_loop().time() + timeout_s - while asyncio.get_event_loop().time() < deadline: - resp = await client.post( - "/search", - json={"query": "OpenRAG testing framework", "limit": 5}, - ) - if resp.status_code == 200 and resp.json().get("results"): - return resp - await asyncio.sleep(0.5) - return resp - - search_resp = await _wait_for_indexed() - - # POST /search - assert search_resp.status_code == 200, search_resp.text - search_body = search_resp.json() - - # Basic shape and at least one hit - assert isinstance(search_body.get("results"), list) - assert len(search_body["results"]) >= 0 - # When hits exist, confirm our phrase is present in top result content - if search_body["results"]: - top = search_body["results"][0] - assert "text" in top or "content" in top - text = top.get("text") or top.get("content") - assert isinstance(text, str) - assert "testing" in text.lower() - # Explicitly close global clients to avoid aiohttp warnings - from src.config.settings import clients try: - if getattr(clients, "opensearch", None): - await clients.opensearch.close() - if getattr(clients, "langflow_http_client", None): - await clients.langflow_http_client.aclose() - except Exception: - pass + async with httpx.AsyncClient(transport=transport, base_url="http://testserver") as client: + # Wait for app + OpenSearch readiness using existing endpoints + await wait_for_service_ready(client) + + # Create a temporary markdown file to upload + file_path = tmp_path / "endpoint_test_doc.md" + file_text = ( + "# Single Test Document\n\n" + "This is a test document about OpenRAG testing framework. " + "The content should be indexed and searchable in OpenSearch after processing." + ) + file_path.write_text(file_text) + + # POST via router (multipart) + files = { + "file": ( + file_path.name, + file_path.read_bytes(), + "text/markdown", + ) + } + upload_resp = await client.post("/upload", files=files) + body = upload_resp.json() + # Router now returns 201 + task_id (async) regardless of mode + assert upload_resp.status_code == 201, upload_resp.text + assert isinstance(body.get("task_id"), str) + + # Poll search for the specific content until it's indexed + async def _wait_for_indexed(timeout_s: float = 30.0): + deadline = asyncio.get_event_loop().time() + timeout_s + while asyncio.get_event_loop().time() < deadline: + resp = await client.post( + "/search", + json={"query": "OpenRAG testing framework", "limit": 5}, + ) + if resp.status_code == 200 and resp.json().get("results"): + return resp + await asyncio.sleep(0.5) + return resp + + search_resp = await _wait_for_indexed() + + # POST /search + assert search_resp.status_code == 200, search_resp.text + search_body = search_resp.json() + + # Basic shape and at least one hit + assert isinstance(search_body.get("results"), list) + assert len(search_body["results"]) >= 0 + # When hits exist, confirm our phrase is present in top result content + if search_body["results"]: + top = search_body["results"][0] + assert "text" in top or "content" in top + text = top.get("text") or top.get("content") + assert isinstance(text, str) + assert "testing" in text.lower() + finally: + # Explicitly close global clients to avoid aiohttp warnings + from src.config.settings import clients + try: + await clients.close() + except Exception: + pass @pytest.mark.parametrize("disable_langflow_ingest", [True, False]) @@ -159,35 +169,45 @@ async def test_router_upload_ingest_traditional(tmp_path: Path, disable_langflow await clients.initialize() try: await clients.opensearch.indices.delete(index=INDEX_NAME) + # Wait for deletion to complete + await asyncio.sleep(1) except Exception: pass app = await create_app() await startup_tasks(app.state.services) - transport = httpx.ASGITransport(app=app) - async with httpx.AsyncClient(transport=transport, base_url="http://testserver") as client: - await wait_for_service_ready(client) - - file_path = tmp_path / "router_test_doc.md" - file_path.write_text("# Router Test\n\nThis file validates the upload router.") - - files = { - "file": ( - file_path.name, - file_path.read_bytes(), - "text/markdown", - ) - } - - resp = await client.post("/upload", files=files) - data = resp.json() - assert resp.status_code == 201, resp.text - assert isinstance(data.get("task_id"), str) - from src.config.settings import clients + + # Verify index is truly empty after startup try: - if getattr(clients, "opensearch", None): - await clients.opensearch.close() - if getattr(clients, "langflow_http_client", None): - await clients.langflow_http_client.aclose() - except Exception: + count_response = await clients.opensearch.count(index=INDEX_NAME) + doc_count = count_response.get('count', 0) + assert doc_count == 0, f"Index should be empty after startup but contains {doc_count} documents" + except Exception as e: + # If count fails, the index might not exist yet, which is fine pass + transport = httpx.ASGITransport(app=app) + try: + async with httpx.AsyncClient(transport=transport, base_url="http://testserver") as client: + await wait_for_service_ready(client) + + file_path = tmp_path / "router_test_doc.md" + file_path.write_text("# Router Test\n\nThis file validates the upload router.") + + files = { + "file": ( + file_path.name, + file_path.read_bytes(), + "text/markdown", + ) + } + + resp = await client.post("/upload", files=files) + data = resp.json() + assert resp.status_code == 201, resp.text + assert isinstance(data.get("task_id"), str) + finally: + from src.config.settings import clients + try: + await clients.close() + except Exception: + pass diff --git a/tests/integration/test_startup_ingest.py b/tests/integration/test_startup_ingest.py index 5ce62a94..436c4d28 100644 --- a/tests/integration/test_startup_ingest.py +++ b/tests/integration/test_startup_ingest.py @@ -70,45 +70,44 @@ async def test_startup_ingest_creates_task(disable_langflow_ingest: bool): await startup_tasks(app.state.services) transport = httpx.ASGITransport(app=app) - async with httpx.AsyncClient(transport=transport, base_url="http://testserver") as client: - await wait_for_ready(client) - - expected_files = count_files_in_documents() - - # Poll /tasks until we see at least one startup ingest task - async def _wait_for_task(timeout_s: float = 60.0): - deadline = asyncio.get_event_loop().time() + timeout_s - last = None - while asyncio.get_event_loop().time() < deadline: - resp = await client.get("/tasks") - if resp.status_code == 200: - data = resp.json() - last = data - tasks = data.get("tasks") if isinstance(data, dict) else None - if isinstance(tasks, list) and len(tasks) > 0: - return tasks - await asyncio.sleep(0.5) - return last.get("tasks") if isinstance(last, dict) else last - - tasks = await _wait_for_task() - if expected_files == 0: - return # Nothing to do - if not (isinstance(tasks, list) and len(tasks) > 0): - # Fallback: verify that documents were indexed as a sign of startup ingest - sr = await client.post("/search", json={"query": "*", "limit": 1}) - assert sr.status_code == 200, sr.text - total = sr.json().get("total") - assert isinstance(total, int) and total >= 0, "Startup ingest did not index documents" - return - newest = tasks[0] - assert "task_id" in newest - assert newest.get("total_files") == expected_files - # Explicitly close global clients to avoid aiohttp warnings - from src.config.settings import clients try: - if getattr(clients, "opensearch", None): - await clients.opensearch.close() - if getattr(clients, "langflow_http_client", None): - await clients.langflow_http_client.aclose() - except Exception: - pass + async with httpx.AsyncClient(transport=transport, base_url="http://testserver") as client: + await wait_for_ready(client) + + expected_files = count_files_in_documents() + + # Poll /tasks until we see at least one startup ingest task + async def _wait_for_task(timeout_s: float = 60.0): + deadline = asyncio.get_event_loop().time() + timeout_s + last = None + while asyncio.get_event_loop().time() < deadline: + resp = await client.get("/tasks") + if resp.status_code == 200: + data = resp.json() + last = data + tasks = data.get("tasks") if isinstance(data, dict) else None + if isinstance(tasks, list) and len(tasks) > 0: + return tasks + await asyncio.sleep(0.5) + return last.get("tasks") if isinstance(last, dict) else last + + tasks = await _wait_for_task() + if expected_files == 0: + return # Nothing to do + if not (isinstance(tasks, list) and len(tasks) > 0): + # Fallback: verify that documents were indexed as a sign of startup ingest + sr = await client.post("/search", json={"query": "*", "limit": 1}) + assert sr.status_code == 200, sr.text + total = sr.json().get("total") + assert isinstance(total, int) and total >= 0, "Startup ingest did not index documents" + return + newest = tasks[0] + assert "task_id" in newest + assert newest.get("total_files") == expected_files + finally: + # Explicitly close global clients to avoid aiohttp warnings + from src.config.settings import clients + try: + await clients.close() + except Exception: + pass