diff --git a/tests/integration/test_api_endpoints.py b/tests/integration/test_api_endpoints.py index 6c28e51c..7f4855bf 100644 --- a/tests/integration/test_api_endpoints.py +++ b/tests/integration/test_api_endpoints.py @@ -204,9 +204,145 @@ async def test_upload_and_search_endpoint(tmp_path: Path, disable_langflow_inges pass +async def _wait_for_langflow_chat( + client: httpx.AsyncClient, payload: dict, timeout_s: float = 120.0 +) -> dict: + deadline = asyncio.get_event_loop().time() + timeout_s + last_payload = None + while asyncio.get_event_loop().time() < deadline: + resp = await client.post("/langflow", json=payload) + if resp.status_code == 200: + try: + data = resp.json() + except Exception: + last_payload = resp.text + else: + response_text = data.get("response") + if isinstance(response_text, str) and response_text.strip(): + return data + last_payload = data + else: + last_payload = resp.text + await asyncio.sleep(1.0) + raise AssertionError(f"/langflow never returned a usable response. Last payload: {last_payload}") + + +async def _wait_for_nudges( + client: httpx.AsyncClient, chat_id: str | None = None, timeout_s: float = 90.0 +) -> dict: + endpoint = "/nudges" if not chat_id else f"/nudges/{chat_id}" + deadline = asyncio.get_event_loop().time() + timeout_s + last_payload = None + while asyncio.get_event_loop().time() < deadline: + resp = await client.get(endpoint) + if resp.status_code == 200: + try: + data = resp.json() + except Exception: + last_payload = resp.text + else: + response_text = data.get("response") + if isinstance(response_text, str) and response_text.strip(): + return data + last_payload = data + else: + last_payload = resp.text + await asyncio.sleep(1.0) + raise AssertionError(f"{endpoint} never returned a usable response. Last payload: {last_payload}") + + +@pytest.mark.asyncio +async def test_langflow_chat_and_nudges_endpoints(): + """Exercise /langflow and /nudges endpoints against a live Langflow backend.""" + required_env = ["LANGFLOW_CHAT_FLOW_ID", "NUDGES_FLOW_ID"] + missing = [var for var in required_env if not os.getenv(var)] + assert not missing, f"Missing required Langflow configuration: {missing}" + + os.environ["DISABLE_INGEST_WITH_LANGFLOW"] = "true" + os.environ["DISABLE_STARTUP_INGEST"] = "true" + os.environ["GOOGLE_OAUTH_CLIENT_ID"] = "" + os.environ["GOOGLE_OAUTH_CLIENT_SECRET"] = "" + + import sys + + for mod in [ + "src.api.chat", + "api.chat", + "src.api.nudges", + "api.nudges", + "src.api.router", + "api.router", + "src.api.connector_router", + "api.connector_router", + "src.config.settings", + "config.settings", + "src.auth_middleware", + "auth_middleware", + "src.main", + "api", + "src.api", + "services", + "src.services", + "services.search_service", + "src.services.search_service", + "services.chat_service", + "src.services.chat_service", + ]: + sys.modules.pop(mod, None) + + from src.main import create_app, startup_tasks + from src.config.settings import clients, LANGFLOW_CHAT_FLOW_ID, NUDGES_FLOW_ID + + assert LANGFLOW_CHAT_FLOW_ID, "LANGFLOW_CHAT_FLOW_ID must be configured for integration test" + assert NUDGES_FLOW_ID, "NUDGES_FLOW_ID must be configured for integration test" + + await clients.initialize() + app = await create_app() + await startup_tasks(app.state.services) + + langflow_client = None + deadline = asyncio.get_event_loop().time() + 60.0 + while asyncio.get_event_loop().time() < deadline: + langflow_client = await clients.ensure_langflow_client() + if langflow_client is not None: + break + await asyncio.sleep(1.0) + assert langflow_client is not None, "Langflow client not initialized. Provide LANGFLOW_KEY or enable superuser auto-login." + + transport = httpx.ASGITransport(app=app) + try: + async with httpx.AsyncClient(transport=transport, base_url="http://testserver") as client: + await wait_for_service_ready(client) + + prompt = "Respond with a brief acknowledgement for the OpenRAG integration test." + langflow_payload = {"prompt": prompt, "limit": 5, "scoreThreshold": 0} + langflow_data = await _wait_for_langflow_chat(client, langflow_payload) + + assert isinstance(langflow_data.get("response"), str) + assert langflow_data["response"].strip() + + response_id = langflow_data.get("response_id") + + nudges_data = await _wait_for_nudges(client) + assert isinstance(nudges_data.get("response"), str) + assert nudges_data["response"].strip() + + if response_id: + nudges_thread_data = await _wait_for_nudges(client, response_id) + assert isinstance(nudges_thread_data.get("response"), str) + assert nudges_thread_data["response"].strip() + finally: + from src.config.settings import clients + + try: + await clients.close() + except Exception: + pass + + @pytest.mark.asyncio async def test_search_multi_embedding_models( - tmp_path: Path, monkeypatch: pytest.MonkeyPatch + tmp_path: Path ): """Ensure /search fans out across multiple embedding models when present.""" os.environ["DISABLE_INGEST_WITH_LANGFLOW"] = "true"