diff --git a/src/config/settings.py b/src/config/settings.py index c4544f50..f2e3be7e 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -29,7 +29,7 @@ LANGFLOW_PUBLIC_URL = os.getenv("LANGFLOW_PUBLIC_URL") _legacy_flow_id = os.getenv("FLOW_ID") LANGFLOW_CHAT_FLOW_ID = os.getenv("LANGFLOW_CHAT_FLOW_ID") or _legacy_flow_id -LANGFLOW_INGEST_FLOW_ID = os.getenv("LANGFLOW_INGEST_FLOW_ID") or _legacy_flow_id_ingest +LANGFLOW_INGEST_FLOW_ID = os.getenv("LANGFLOW_INGEST_FLOW_ID") if _legacy_flow_id and not os.getenv("LANGFLOW_CHAT_FLOW_ID"): logger.warning("FLOW_ID is deprecated. Please use LANGFLOW_CHAT_FLOW_ID instead") @@ -239,6 +239,7 @@ class AppClients: def __init__(self): self.opensearch = None self.langflow_client = None + self.langflow_http_client = None self.patched_async_client = None self.converter = None @@ -284,6 +285,11 @@ class AppClients: # Initialize document converter self.converter = DocumentConverter() + # Initialize Langflow HTTP client + self.langflow_http_client = httpx.AsyncClient( + base_url=LANGFLOW_URL, timeout=60.0 + ) + return self async def ensure_langflow_client(self): @@ -305,6 +311,23 @@ class AppClients: self.langflow_client = None return self.langflow_client + async def langflow_request(self, method: str, endpoint: str, **kwargs): + """Central method for all Langflow API requests""" + api_key = await generate_langflow_api_key() + if not api_key: + raise ValueError("No Langflow API key available") + + # Merge headers properly - passed headers take precedence over defaults + default_headers = {"x-api-key": api_key, "Content-Type": "application/json"} + existing_headers = kwargs.pop("headers", {}) + headers = {**default_headers, **existing_headers} + + url = f"{LANGFLOW_URL}{endpoint}" + + return await self.langflow_http_client.request( + method=method, url=url, headers=headers, **kwargs + ) + async def _create_langflow_global_variable(self, name: str, value: str): """Create a global variable in Langflow via API""" api_key = await generate_langflow_api_key()