From 89246ed1ae86ef3fd3f883f7a119fb3c631d23c3 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Fri, 5 Sep 2025 11:33:37 -0300 Subject: [PATCH] Refactor Langflow API integration in AppClients This commit updates the settings for LANGFLOW_INGEST_FLOW_ID to remove reliance on a deprecated environment variable. It introduces a new Langflow HTTP client for making API requests and adds a centralized method for handling Langflow API requests, improving code organization and maintainability. The changes enhance the robustness of the async code while ensuring proper documentation practices are followed. --- src/config/settings.py | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/src/config/settings.py b/src/config/settings.py index c4544f50..f2e3be7e 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -29,7 +29,7 @@ LANGFLOW_PUBLIC_URL = os.getenv("LANGFLOW_PUBLIC_URL") _legacy_flow_id = os.getenv("FLOW_ID") LANGFLOW_CHAT_FLOW_ID = os.getenv("LANGFLOW_CHAT_FLOW_ID") or _legacy_flow_id -LANGFLOW_INGEST_FLOW_ID = os.getenv("LANGFLOW_INGEST_FLOW_ID") or _legacy_flow_id_ingest +LANGFLOW_INGEST_FLOW_ID = os.getenv("LANGFLOW_INGEST_FLOW_ID") if _legacy_flow_id and not os.getenv("LANGFLOW_CHAT_FLOW_ID"): logger.warning("FLOW_ID is deprecated. Please use LANGFLOW_CHAT_FLOW_ID instead") @@ -239,6 +239,7 @@ class AppClients: def __init__(self): self.opensearch = None self.langflow_client = None + self.langflow_http_client = None self.patched_async_client = None self.converter = None @@ -284,6 +285,11 @@ class AppClients: # Initialize document converter self.converter = DocumentConverter() + # Initialize Langflow HTTP client + self.langflow_http_client = httpx.AsyncClient( + base_url=LANGFLOW_URL, timeout=60.0 + ) + return self async def ensure_langflow_client(self): @@ -305,6 +311,23 @@ class AppClients: self.langflow_client = None return self.langflow_client + async def langflow_request(self, method: str, endpoint: str, **kwargs): + """Central method for all Langflow API requests""" + api_key = await generate_langflow_api_key() + if not api_key: + raise ValueError("No Langflow API key available") + + # Merge headers properly - passed headers take precedence over defaults + default_headers = {"x-api-key": api_key, "Content-Type": "application/json"} + existing_headers = kwargs.pop("headers", {}) + headers = {**default_headers, **existing_headers} + + url = f"{LANGFLOW_URL}{endpoint}" + + return await self.langflow_http_client.request( + method=method, url=url, headers=headers, **kwargs + ) + async def _create_langflow_global_variable(self, name: str, value: str): """Create a global variable in Langflow via API""" api_key = await generate_langflow_api_key()