improve tests
This commit is contained in:
parent
2ef560ca7f
commit
e23ed258c9
3 changed files with 172 additions and 130 deletions
|
|
@ -297,6 +297,29 @@ class AppClients:
|
|||
|
||||
return self
|
||||
|
||||
async def close(self):
|
||||
"""Close all client connections"""
|
||||
try:
|
||||
if hasattr(self, 'opensearch') and self.opensearch:
|
||||
await self.opensearch.close()
|
||||
self.opensearch = None
|
||||
except Exception as e:
|
||||
logger.warning("Error closing OpenSearch client", error=str(e))
|
||||
|
||||
try:
|
||||
if hasattr(self, 'langflow_http_client') and self.langflow_http_client:
|
||||
await self.langflow_http_client.aclose()
|
||||
self.langflow_http_client = None
|
||||
except Exception as e:
|
||||
logger.warning("Error closing Langflow HTTP client", error=str(e))
|
||||
|
||||
try:
|
||||
if hasattr(self, 'patched_async_client') and self.patched_async_client:
|
||||
await self.patched_async_client.close()
|
||||
self.patched_async_client = None
|
||||
except Exception as e:
|
||||
logger.warning("Error closing OpenAI client", error=str(e))
|
||||
|
||||
async def ensure_langflow_client(self):
|
||||
"""Ensure Langflow client exists; try to generate key and create client lazily."""
|
||||
if self.langflow_client is not None:
|
||||
|
|
|
|||
|
|
@ -60,79 +60,89 @@ async def test_upload_and_search_endpoint(tmp_path: Path, disable_langflow_inges
|
|||
await clients.initialize()
|
||||
try:
|
||||
await clients.opensearch.indices.delete(index=INDEX_NAME)
|
||||
# Wait for deletion to complete
|
||||
await asyncio.sleep(1)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
app = await create_app()
|
||||
# Manually run startup tasks since httpx ASGI transport here doesn't manage lifespan
|
||||
await startup_tasks(app.state.services)
|
||||
|
||||
# Verify index is truly empty after startup
|
||||
try:
|
||||
count_response = await clients.opensearch.count(index=INDEX_NAME)
|
||||
doc_count = count_response.get('count', 0)
|
||||
assert doc_count == 0, f"Index should be empty after startup but contains {doc_count} documents"
|
||||
except Exception as e:
|
||||
# If count fails, the index might not exist yet, which is fine
|
||||
pass
|
||||
|
||||
transport = httpx.ASGITransport(app=app)
|
||||
async with httpx.AsyncClient(transport=transport, base_url="http://testserver") as client:
|
||||
# Wait for app + OpenSearch readiness using existing endpoints
|
||||
await wait_for_service_ready(client)
|
||||
|
||||
# Create a temporary markdown file to upload
|
||||
file_path = tmp_path / "endpoint_test_doc.md"
|
||||
file_text = (
|
||||
"# Single Test Document\n\n"
|
||||
"This is a test document about OpenRAG testing framework. "
|
||||
"The content should be indexed and searchable in OpenSearch after processing."
|
||||
)
|
||||
file_path.write_text(file_text)
|
||||
|
||||
# POST via router (multipart)
|
||||
files = {
|
||||
"file": (
|
||||
file_path.name,
|
||||
file_path.read_bytes(),
|
||||
"text/markdown",
|
||||
)
|
||||
}
|
||||
upload_resp = await client.post("/upload", files=files)
|
||||
body = upload_resp.json()
|
||||
# Router now returns 201 + task_id (async) regardless of mode
|
||||
assert upload_resp.status_code == 201, upload_resp.text
|
||||
assert isinstance(body.get("task_id"), str)
|
||||
|
||||
# Poll search for the specific content until it's indexed
|
||||
async def _wait_for_indexed(timeout_s: float = 30.0):
|
||||
deadline = asyncio.get_event_loop().time() + timeout_s
|
||||
while asyncio.get_event_loop().time() < deadline:
|
||||
resp = await client.post(
|
||||
"/search",
|
||||
json={"query": "OpenRAG testing framework", "limit": 5},
|
||||
)
|
||||
if resp.status_code == 200 and resp.json().get("results"):
|
||||
return resp
|
||||
await asyncio.sleep(0.5)
|
||||
return resp
|
||||
|
||||
search_resp = await _wait_for_indexed()
|
||||
|
||||
# POST /search
|
||||
assert search_resp.status_code == 200, search_resp.text
|
||||
search_body = search_resp.json()
|
||||
|
||||
# Basic shape and at least one hit
|
||||
assert isinstance(search_body.get("results"), list)
|
||||
assert len(search_body["results"]) >= 0
|
||||
# When hits exist, confirm our phrase is present in top result content
|
||||
if search_body["results"]:
|
||||
top = search_body["results"][0]
|
||||
assert "text" in top or "content" in top
|
||||
text = top.get("text") or top.get("content")
|
||||
assert isinstance(text, str)
|
||||
assert "testing" in text.lower()
|
||||
# Explicitly close global clients to avoid aiohttp warnings
|
||||
from src.config.settings import clients
|
||||
try:
|
||||
if getattr(clients, "opensearch", None):
|
||||
await clients.opensearch.close()
|
||||
if getattr(clients, "langflow_http_client", None):
|
||||
await clients.langflow_http_client.aclose()
|
||||
except Exception:
|
||||
pass
|
||||
async with httpx.AsyncClient(transport=transport, base_url="http://testserver") as client:
|
||||
# Wait for app + OpenSearch readiness using existing endpoints
|
||||
await wait_for_service_ready(client)
|
||||
|
||||
# Create a temporary markdown file to upload
|
||||
file_path = tmp_path / "endpoint_test_doc.md"
|
||||
file_text = (
|
||||
"# Single Test Document\n\n"
|
||||
"This is a test document about OpenRAG testing framework. "
|
||||
"The content should be indexed and searchable in OpenSearch after processing."
|
||||
)
|
||||
file_path.write_text(file_text)
|
||||
|
||||
# POST via router (multipart)
|
||||
files = {
|
||||
"file": (
|
||||
file_path.name,
|
||||
file_path.read_bytes(),
|
||||
"text/markdown",
|
||||
)
|
||||
}
|
||||
upload_resp = await client.post("/upload", files=files)
|
||||
body = upload_resp.json()
|
||||
# Router now returns 201 + task_id (async) regardless of mode
|
||||
assert upload_resp.status_code == 201, upload_resp.text
|
||||
assert isinstance(body.get("task_id"), str)
|
||||
|
||||
# Poll search for the specific content until it's indexed
|
||||
async def _wait_for_indexed(timeout_s: float = 30.0):
|
||||
deadline = asyncio.get_event_loop().time() + timeout_s
|
||||
while asyncio.get_event_loop().time() < deadline:
|
||||
resp = await client.post(
|
||||
"/search",
|
||||
json={"query": "OpenRAG testing framework", "limit": 5},
|
||||
)
|
||||
if resp.status_code == 200 and resp.json().get("results"):
|
||||
return resp
|
||||
await asyncio.sleep(0.5)
|
||||
return resp
|
||||
|
||||
search_resp = await _wait_for_indexed()
|
||||
|
||||
# POST /search
|
||||
assert search_resp.status_code == 200, search_resp.text
|
||||
search_body = search_resp.json()
|
||||
|
||||
# Basic shape and at least one hit
|
||||
assert isinstance(search_body.get("results"), list)
|
||||
assert len(search_body["results"]) >= 0
|
||||
# When hits exist, confirm our phrase is present in top result content
|
||||
if search_body["results"]:
|
||||
top = search_body["results"][0]
|
||||
assert "text" in top or "content" in top
|
||||
text = top.get("text") or top.get("content")
|
||||
assert isinstance(text, str)
|
||||
assert "testing" in text.lower()
|
||||
finally:
|
||||
# Explicitly close global clients to avoid aiohttp warnings
|
||||
from src.config.settings import clients
|
||||
try:
|
||||
await clients.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
@pytest.mark.parametrize("disable_langflow_ingest", [True, False])
|
||||
|
|
@ -159,35 +169,45 @@ async def test_router_upload_ingest_traditional(tmp_path: Path, disable_langflow
|
|||
await clients.initialize()
|
||||
try:
|
||||
await clients.opensearch.indices.delete(index=INDEX_NAME)
|
||||
# Wait for deletion to complete
|
||||
await asyncio.sleep(1)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
app = await create_app()
|
||||
await startup_tasks(app.state.services)
|
||||
transport = httpx.ASGITransport(app=app)
|
||||
async with httpx.AsyncClient(transport=transport, base_url="http://testserver") as client:
|
||||
await wait_for_service_ready(client)
|
||||
|
||||
file_path = tmp_path / "router_test_doc.md"
|
||||
file_path.write_text("# Router Test\n\nThis file validates the upload router.")
|
||||
|
||||
files = {
|
||||
"file": (
|
||||
file_path.name,
|
||||
file_path.read_bytes(),
|
||||
"text/markdown",
|
||||
)
|
||||
}
|
||||
|
||||
resp = await client.post("/upload", files=files)
|
||||
data = resp.json()
|
||||
assert resp.status_code == 201, resp.text
|
||||
assert isinstance(data.get("task_id"), str)
|
||||
from src.config.settings import clients
|
||||
|
||||
# Verify index is truly empty after startup
|
||||
try:
|
||||
if getattr(clients, "opensearch", None):
|
||||
await clients.opensearch.close()
|
||||
if getattr(clients, "langflow_http_client", None):
|
||||
await clients.langflow_http_client.aclose()
|
||||
except Exception:
|
||||
count_response = await clients.opensearch.count(index=INDEX_NAME)
|
||||
doc_count = count_response.get('count', 0)
|
||||
assert doc_count == 0, f"Index should be empty after startup but contains {doc_count} documents"
|
||||
except Exception as e:
|
||||
# If count fails, the index might not exist yet, which is fine
|
||||
pass
|
||||
transport = httpx.ASGITransport(app=app)
|
||||
try:
|
||||
async with httpx.AsyncClient(transport=transport, base_url="http://testserver") as client:
|
||||
await wait_for_service_ready(client)
|
||||
|
||||
file_path = tmp_path / "router_test_doc.md"
|
||||
file_path.write_text("# Router Test\n\nThis file validates the upload router.")
|
||||
|
||||
files = {
|
||||
"file": (
|
||||
file_path.name,
|
||||
file_path.read_bytes(),
|
||||
"text/markdown",
|
||||
)
|
||||
}
|
||||
|
||||
resp = await client.post("/upload", files=files)
|
||||
data = resp.json()
|
||||
assert resp.status_code == 201, resp.text
|
||||
assert isinstance(data.get("task_id"), str)
|
||||
finally:
|
||||
from src.config.settings import clients
|
||||
try:
|
||||
await clients.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
|
|
|||
|
|
@ -70,45 +70,44 @@ async def test_startup_ingest_creates_task(disable_langflow_ingest: bool):
|
|||
await startup_tasks(app.state.services)
|
||||
|
||||
transport = httpx.ASGITransport(app=app)
|
||||
async with httpx.AsyncClient(transport=transport, base_url="http://testserver") as client:
|
||||
await wait_for_ready(client)
|
||||
|
||||
expected_files = count_files_in_documents()
|
||||
|
||||
# Poll /tasks until we see at least one startup ingest task
|
||||
async def _wait_for_task(timeout_s: float = 60.0):
|
||||
deadline = asyncio.get_event_loop().time() + timeout_s
|
||||
last = None
|
||||
while asyncio.get_event_loop().time() < deadline:
|
||||
resp = await client.get("/tasks")
|
||||
if resp.status_code == 200:
|
||||
data = resp.json()
|
||||
last = data
|
||||
tasks = data.get("tasks") if isinstance(data, dict) else None
|
||||
if isinstance(tasks, list) and len(tasks) > 0:
|
||||
return tasks
|
||||
await asyncio.sleep(0.5)
|
||||
return last.get("tasks") if isinstance(last, dict) else last
|
||||
|
||||
tasks = await _wait_for_task()
|
||||
if expected_files == 0:
|
||||
return # Nothing to do
|
||||
if not (isinstance(tasks, list) and len(tasks) > 0):
|
||||
# Fallback: verify that documents were indexed as a sign of startup ingest
|
||||
sr = await client.post("/search", json={"query": "*", "limit": 1})
|
||||
assert sr.status_code == 200, sr.text
|
||||
total = sr.json().get("total")
|
||||
assert isinstance(total, int) and total >= 0, "Startup ingest did not index documents"
|
||||
return
|
||||
newest = tasks[0]
|
||||
assert "task_id" in newest
|
||||
assert newest.get("total_files") == expected_files
|
||||
# Explicitly close global clients to avoid aiohttp warnings
|
||||
from src.config.settings import clients
|
||||
try:
|
||||
if getattr(clients, "opensearch", None):
|
||||
await clients.opensearch.close()
|
||||
if getattr(clients, "langflow_http_client", None):
|
||||
await clients.langflow_http_client.aclose()
|
||||
except Exception:
|
||||
pass
|
||||
async with httpx.AsyncClient(transport=transport, base_url="http://testserver") as client:
|
||||
await wait_for_ready(client)
|
||||
|
||||
expected_files = count_files_in_documents()
|
||||
|
||||
# Poll /tasks until we see at least one startup ingest task
|
||||
async def _wait_for_task(timeout_s: float = 60.0):
|
||||
deadline = asyncio.get_event_loop().time() + timeout_s
|
||||
last = None
|
||||
while asyncio.get_event_loop().time() < deadline:
|
||||
resp = await client.get("/tasks")
|
||||
if resp.status_code == 200:
|
||||
data = resp.json()
|
||||
last = data
|
||||
tasks = data.get("tasks") if isinstance(data, dict) else None
|
||||
if isinstance(tasks, list) and len(tasks) > 0:
|
||||
return tasks
|
||||
await asyncio.sleep(0.5)
|
||||
return last.get("tasks") if isinstance(last, dict) else last
|
||||
|
||||
tasks = await _wait_for_task()
|
||||
if expected_files == 0:
|
||||
return # Nothing to do
|
||||
if not (isinstance(tasks, list) and len(tasks) > 0):
|
||||
# Fallback: verify that documents were indexed as a sign of startup ingest
|
||||
sr = await client.post("/search", json={"query": "*", "limit": 1})
|
||||
assert sr.status_code == 200, sr.text
|
||||
total = sr.json().get("total")
|
||||
assert isinstance(total, int) and total >= 0, "Startup ingest did not index documents"
|
||||
return
|
||||
newest = tasks[0]
|
||||
assert "task_id" in newest
|
||||
assert newest.get("total_files") == expected_files
|
||||
finally:
|
||||
# Explicitly close global clients to avoid aiohttp warnings
|
||||
from src.config.settings import clients
|
||||
try:
|
||||
await clients.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue