Merge pull request #611 from langflow-ai/relax-langflow-key-validation

perf fix: relax blocking langflow key validation on every request
This commit is contained in:
Sebastián Estévez 2025-12-05 10:43:22 -05:00 committed by GitHub
commit b7d846cc2e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -1,3 +1,4 @@
import asyncio
import os import os
import time import time
@ -140,61 +141,29 @@ INDEX_BODY = {
LANGFLOW_BASE_URL = f"{LANGFLOW_URL}/api/v1" LANGFLOW_BASE_URL = f"{LANGFLOW_URL}/api/v1"
async def generate_langflow_api_key(modify: bool = False): async def get_langflow_api_key(force_regenerate: bool = False):
"""Generate Langflow API key using superuser credentials at startup""" """Get the Langflow API key, generating one if needed.
Args:
force_regenerate: If True, generates a new key even if one is cached.
Used when a request fails with 401/403 to get a fresh key.
"""
global LANGFLOW_KEY global LANGFLOW_KEY
logger.debug( logger.debug(
"generate_langflow_api_key called", current_key_present=bool(LANGFLOW_KEY) "get_langflow_api_key called",
current_key_present=bool(LANGFLOW_KEY),
force_regenerate=force_regenerate,
) )
# If key already provided via env, do not attempt generation # If we have a cached key and not forcing regeneration, return it
if LANGFLOW_KEY: if LANGFLOW_KEY and not force_regenerate:
if os.getenv("LANGFLOW_KEY"): return LANGFLOW_KEY
logger.info("Using LANGFLOW_KEY from environment; skipping generation")
return LANGFLOW_KEY # If forcing regeneration, clear the cached key
else: if force_regenerate and LANGFLOW_KEY:
# We have a cached key, but let's validate it first logger.info("Forcing Langflow API key regeneration due to auth failure")
logger.debug("Validating cached LANGFLOW_KEY", key_prefix=LANGFLOW_KEY[:8]) LANGFLOW_KEY = None
try:
validation_response = requests.get(
f"{LANGFLOW_URL}/api/v1/users/whoami",
headers={"x-api-key": LANGFLOW_KEY},
timeout=5,
)
if validation_response.status_code == 200:
logger.debug("Cached API key is valid", key_prefix=LANGFLOW_KEY[:8])
return LANGFLOW_KEY
elif validation_response.status_code in (401, 403):
logger.warning(
"Cached API key is unauthorized, generating fresh key",
status_code=validation_response.status_code,
)
LANGFLOW_KEY = None # Clear invalid key
else:
logger.warning(
"Cached API key validation returned non-access error; keeping existing key",
status_code=validation_response.status_code,
)
return LANGFLOW_KEY
except requests.exceptions.Timeout as e:
logger.warning(
"Cached API key validation timed out; keeping existing key",
error=str(e),
)
return LANGFLOW_KEY
except requests.exceptions.RequestException as e:
logger.warning(
"Cached API key validation failed due to request error; keeping existing key",
error=str(e),
)
return LANGFLOW_KEY
except Exception as e:
logger.warning(
"Unexpected error during cached API key validation; keeping existing key",
error=str(e),
)
return LANGFLOW_KEY
# Use default langflow/langflow credentials if auto-login is enabled and credentials not set # Use default langflow/langflow credentials if auto-login is enabled and credentials not set
username = LANGFLOW_SUPERUSER username = LANGFLOW_SUPERUSER
@ -216,72 +185,70 @@ async def generate_langflow_api_key(modify: bool = False):
max_attempts = int(os.getenv("LANGFLOW_KEY_RETRIES", "15")) max_attempts = int(os.getenv("LANGFLOW_KEY_RETRIES", "15"))
delay_seconds = float(os.getenv("LANGFLOW_KEY_RETRY_DELAY", "2.0")) delay_seconds = float(os.getenv("LANGFLOW_KEY_RETRY_DELAY", "2.0"))
for attempt in range(1, max_attempts + 1): async with httpx.AsyncClient(timeout=10.0) as client:
try: for attempt in range(1, max_attempts + 1):
# Login to get access token try:
login_response = requests.post( # Login to get access token
f"{LANGFLOW_URL}/api/v1/login", login_response = await client.post(
headers={"Content-Type": "application/x-www-form-urlencoded"}, f"{LANGFLOW_URL}/api/v1/login",
data={ headers={"Content-Type": "application/x-www-form-urlencoded"},
"username": username, data={
"password": password, "username": username,
}, "password": password,
timeout=10, },
)
login_response.raise_for_status()
access_token = login_response.json().get("access_token")
if not access_token:
raise KeyError("access_token")
# Create API key
api_key_response = requests.post(
f"{LANGFLOW_URL}/api/v1/api_key/",
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {access_token}",
},
json={"name": "openrag-auto-generated"},
timeout=10,
)
api_key_response.raise_for_status()
api_key = api_key_response.json().get("api_key")
if not api_key:
raise KeyError("api_key")
# Validate the API key works
validation_response = requests.get(
f"{LANGFLOW_URL}/api/v1/users/whoami",
headers={"x-api-key": api_key},
timeout=10,
)
if validation_response.status_code == 200:
LANGFLOW_KEY = api_key
logger.info(
"Successfully generated and validated Langflow API key",
key_prefix=api_key[:8],
) )
return api_key login_response.raise_for_status()
else: access_token = login_response.json().get("access_token")
logger.error( if not access_token:
"Generated API key validation failed", raise KeyError("access_token")
status_code=validation_response.status_code,
)
raise ValueError(
f"API key validation failed: {validation_response.status_code}"
)
except (requests.exceptions.RequestException, KeyError) as e:
logger.warning(
"Attempt to generate Langflow API key failed",
attempt=attempt,
max_attempts=max_attempts,
error=str(e),
)
if attempt < max_attempts:
time.sleep(delay_seconds)
else:
raise
except requests.exceptions.RequestException as e: # Create API key
api_key_response = await client.post(
f"{LANGFLOW_URL}/api/v1/api_key/",
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {access_token}",
},
json={"name": "openrag-auto-generated"},
)
api_key_response.raise_for_status()
api_key = api_key_response.json().get("api_key")
if not api_key:
raise KeyError("api_key")
# Validate the API key works
validation_response = await client.get(
f"{LANGFLOW_URL}/api/v1/users/whoami",
headers={"x-api-key": api_key},
)
if validation_response.status_code == 200:
LANGFLOW_KEY = api_key
logger.info(
"Successfully generated and validated Langflow API key",
key_prefix=api_key[:8],
)
return api_key
else:
logger.error(
"Generated API key validation failed",
status_code=validation_response.status_code,
)
raise ValueError(
f"API key validation failed: {validation_response.status_code}"
)
except (httpx.HTTPStatusError, httpx.RequestError, KeyError) as e:
logger.warning(
"Attempt to generate Langflow API key failed",
attempt=attempt,
max_attempts=max_attempts,
error=str(e),
)
if attempt < max_attempts:
await asyncio.sleep(delay_seconds)
else:
raise
except (httpx.HTTPStatusError, httpx.RequestError) as e:
logger.error("Failed to generate Langflow API key", error=str(e)) logger.error("Failed to generate Langflow API key", error=str(e))
return None return None
except KeyError as e: except KeyError as e:
@ -303,7 +270,7 @@ class AppClients:
async def initialize(self): async def initialize(self):
# Generate Langflow API key first # Generate Langflow API key first
await generate_langflow_api_key() await get_langflow_api_key()
# Initialize OpenSearch client # Initialize OpenSearch client
self.opensearch = AsyncOpenSearch( self.opensearch = AsyncOpenSearch(
@ -362,7 +329,7 @@ class AppClients:
if self.langflow_client is not None: if self.langflow_client is not None:
return self.langflow_client return self.langflow_client
# Try generating key again (with retries) # Try generating key again (with retries)
await generate_langflow_api_key() await get_langflow_api_key()
if LANGFLOW_KEY and self.langflow_client is None: if LANGFLOW_KEY and self.langflow_client is None:
try: try:
self.langflow_client = AsyncOpenAI( self.langflow_client = AsyncOpenAI(
@ -559,8 +526,11 @@ class AppClients:
self.langflow_client = None self.langflow_client = None
async def langflow_request(self, method: str, endpoint: str, **kwargs): async def langflow_request(self, method: str, endpoint: str, **kwargs):
"""Central method for all Langflow API requests""" """Central method for all Langflow API requests.
api_key = await generate_langflow_api_key()
Retries once with a fresh API key on auth failures (401/403).
"""
api_key = await get_langflow_api_key()
if not api_key: if not api_key:
raise ValueError("No Langflow API key available") raise ValueError("No Langflow API key available")
@ -575,57 +545,65 @@ class AppClients:
url = f"{LANGFLOW_URL}{endpoint}" url = f"{LANGFLOW_URL}{endpoint}"
return await self.langflow_http_client.request( response = await self.langflow_http_client.request(
method=method, url=url, headers=headers, **kwargs method=method, url=url, headers=headers, **kwargs
) )
# Retry once with a fresh API key on auth failure
if response.status_code in (401, 403):
logger.warning(
"Langflow request auth failed, regenerating API key and retrying",
status_code=response.status_code,
endpoint=endpoint,
)
api_key = await get_langflow_api_key(force_regenerate=True)
if api_key:
headers["x-api-key"] = api_key
response = await self.langflow_http_client.request(
method=method, url=url, headers=headers, **kwargs
)
return response
async def _create_langflow_global_variable( async def _create_langflow_global_variable(
self, name: str, value: str, modify: bool = False self, name: str, value: str, modify: bool = False
): ):
"""Create a global variable in Langflow via API""" """Create a global variable in Langflow via API"""
api_key = await generate_langflow_api_key()
if not api_key:
logger.warning(
"Cannot create Langflow global variable: No API key", variable_name=name
)
return
url = f"{LANGFLOW_URL}/api/v1/variables/"
payload = { payload = {
"name": name, "name": name,
"value": value, "value": value,
"default_fields": [], "default_fields": [],
"type": "Credential", "type": "Credential",
} }
headers = {"x-api-key": api_key, "Content-Type": "application/json"}
try: try:
async with httpx.AsyncClient() as client: response = await self.langflow_request(
response = await client.post(url, headers=headers, json=payload) "POST", "/api/v1/variables/", json=payload
)
if response.status_code in [200, 201]: if response.status_code in [200, 201]:
logger.info(
"Successfully created Langflow global variable",
variable_name=name,
)
elif response.status_code == 400 and "already exists" in response.text:
if modify:
logger.info( logger.info(
"Successfully created Langflow global variable", "Langflow global variable already exists, attempting to update",
variable_name=name, variable_name=name,
) )
elif response.status_code == 400 and "already exists" in response.text: await self._update_langflow_global_variable(name, value)
if modify:
logger.info(
"Langflow global variable already exists, attempting to update",
variable_name=name,
)
await self._update_langflow_global_variable(name, value)
else:
logger.info(
"Langflow global variable already exists",
variable_name=name,
)
else: else:
logger.warning( logger.info(
"Failed to create Langflow global variable", "Langflow global variable already exists",
variable_name=name, variable_name=name,
status_code=response.status_code,
) )
else:
logger.warning(
"Failed to create Langflow global variable",
variable_name=name,
status_code=response.status_code,
)
except Exception as e: except Exception as e:
logger.error( logger.error(
"Exception creating Langflow global variable", "Exception creating Langflow global variable",
@ -635,76 +613,62 @@ class AppClients:
async def _update_langflow_global_variable(self, name: str, value: str): async def _update_langflow_global_variable(self, name: str, value: str):
"""Update an existing global variable in Langflow via API""" """Update an existing global variable in Langflow via API"""
api_key = await generate_langflow_api_key()
if not api_key:
logger.warning(
"Cannot update Langflow global variable: No API key", variable_name=name
)
return
headers = {"x-api-key": api_key, "Content-Type": "application/json"}
try: try:
async with httpx.AsyncClient() as client: # First, get all variables to find the one with the matching name
# First, get all variables to find the one with the matching name get_response = await self.langflow_request("GET", "/api/v1/variables/")
get_response = await client.get(
f"{LANGFLOW_URL}/api/v1/variables/", headers=headers if get_response.status_code != 200:
logger.error(
"Failed to retrieve variables for update",
variable_name=name,
status_code=get_response.status_code,
) )
return
if get_response.status_code != 200: variables = get_response.json()
logger.error( target_variable = None
"Failed to retrieve variables for update",
variable_name=name,
status_code=get_response.status_code,
)
return
variables = get_response.json() # Find the variable with matching name
target_variable = None for variable in variables:
if variable.get("name") == name:
target_variable = variable
break
# Find the variable with matching name if not target_variable:
for variable in variables: logger.error("Variable not found for update", variable_name=name)
if variable.get("name") == name: return
target_variable = variable
break
if not target_variable: variable_id = target_variable.get("id")
logger.error("Variable not found for update", variable_name=name) if not variable_id:
return logger.error("Variable ID not found for update", variable_name=name)
return
variable_id = target_variable.get("id") # Update the variable using PATCH
if not variable_id: update_payload = {
logger.error("Variable ID not found for update", variable_name=name) "id": variable_id,
return "name": name,
"value": value,
"default_fields": target_variable.get("default_fields", []),
}
# Update the variable using PATCH patch_response = await self.langflow_request(
update_payload = { "PATCH", f"/api/v1/variables/{variable_id}", json=update_payload
"id": variable_id, )
"name": name,
"value": value,
"default_fields": target_variable.get("default_fields", []),
}
patch_response = await client.patch( if patch_response.status_code == 200:
f"{LANGFLOW_URL}/api/v1/variables/{variable_id}", logger.info(
headers=headers, "Successfully updated Langflow global variable",
json=update_payload, variable_name=name,
variable_id=variable_id,
)
else:
logger.warning(
"Failed to update Langflow global variable",
variable_name=name,
variable_id=variable_id,
status_code=patch_response.status_code,
response_text=patch_response.text,
) )
if patch_response.status_code == 200:
logger.info(
"Successfully updated Langflow global variable",
variable_name=name,
variable_id=variable_id,
)
else:
logger.warning(
"Failed to update Langflow global variable",
variable_name=name,
variable_id=variable_id,
status_code=patch_response.status_code,
response_text=patch_response.text,
)
except Exception as e: except Exception as e:
logger.error( logger.error(