Refactor Langflow API integration in AppClients

This commit updates the settings for LANGFLOW_INGEST_FLOW_ID to remove reliance on a deprecated environment variable. It introduces a new Langflow HTTP client for making API requests and adds a centralized method for handling Langflow API requests, improving code organization and maintainability. The changes enhance the robustness of the async code while ensuring proper documentation practices are followed.
This commit is contained in:
Gabriel Luiz Freitas Almeida 2025-09-05 11:33:37 -03:00
parent 531ff8b6ac
commit 89246ed1ae

View file

@ -29,7 +29,7 @@ LANGFLOW_PUBLIC_URL = os.getenv("LANGFLOW_PUBLIC_URL")
_legacy_flow_id = os.getenv("FLOW_ID")
LANGFLOW_CHAT_FLOW_ID = os.getenv("LANGFLOW_CHAT_FLOW_ID") or _legacy_flow_id
LANGFLOW_INGEST_FLOW_ID = os.getenv("LANGFLOW_INGEST_FLOW_ID") or _legacy_flow_id_ingest
LANGFLOW_INGEST_FLOW_ID = os.getenv("LANGFLOW_INGEST_FLOW_ID")
if _legacy_flow_id and not os.getenv("LANGFLOW_CHAT_FLOW_ID"):
logger.warning("FLOW_ID is deprecated. Please use LANGFLOW_CHAT_FLOW_ID instead")
@ -239,6 +239,7 @@ class AppClients:
def __init__(self):
self.opensearch = None
self.langflow_client = None
self.langflow_http_client = None
self.patched_async_client = None
self.converter = None
@ -284,6 +285,11 @@ class AppClients:
# Initialize document converter
self.converter = DocumentConverter()
# Initialize Langflow HTTP client
self.langflow_http_client = httpx.AsyncClient(
base_url=LANGFLOW_URL, timeout=60.0
)
return self
async def ensure_langflow_client(self):
@ -305,6 +311,23 @@ class AppClients:
self.langflow_client = None
return self.langflow_client
async def langflow_request(self, method: str, endpoint: str, **kwargs):
"""Central method for all Langflow API requests"""
api_key = await generate_langflow_api_key()
if not api_key:
raise ValueError("No Langflow API key available")
# Merge headers properly - passed headers take precedence over defaults
default_headers = {"x-api-key": api_key, "Content-Type": "application/json"}
existing_headers = kwargs.pop("headers", {})
headers = {**default_headers, **existing_headers}
url = f"{LANGFLOW_URL}{endpoint}"
return await self.langflow_http_client.request(
method=method, url=url, headers=headers, **kwargs
)
async def _create_langflow_global_variable(self, name: str, value: str):
"""Create a global variable in Langflow via API"""
api_key = await generate_langflow_api_key()