diff --git a/src/config/settings.py b/src/config/settings.py index 75b09f09..ad6f845e 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -140,61 +140,29 @@ INDEX_BODY = { LANGFLOW_BASE_URL = f"{LANGFLOW_URL}/api/v1" -async def generate_langflow_api_key(modify: bool = False): - """Generate Langflow API key using superuser credentials at startup""" +async def get_langflow_api_key(force_regenerate: bool = False): + """Get the Langflow API key, generating one if needed. + + Args: + force_regenerate: If True, generates a new key even if one is cached. + Used when a request fails with 401/403 to get a fresh key. + """ global LANGFLOW_KEY logger.debug( - "generate_langflow_api_key called", current_key_present=bool(LANGFLOW_KEY) + "get_langflow_api_key called", + current_key_present=bool(LANGFLOW_KEY), + force_regenerate=force_regenerate, ) - # If key already provided via env, do not attempt generation - if LANGFLOW_KEY: - if os.getenv("LANGFLOW_KEY"): - logger.info("Using LANGFLOW_KEY from environment; skipping generation") - return LANGFLOW_KEY - else: - # We have a cached key, but let's validate it first - logger.debug("Validating cached LANGFLOW_KEY", key_prefix=LANGFLOW_KEY[:8]) - try: - validation_response = requests.get( - f"{LANGFLOW_URL}/api/v1/users/whoami", - headers={"x-api-key": LANGFLOW_KEY}, - timeout=5, - ) - if validation_response.status_code == 200: - logger.debug("Cached API key is valid", key_prefix=LANGFLOW_KEY[:8]) - return LANGFLOW_KEY - elif validation_response.status_code in (401, 403): - logger.warning( - "Cached API key is unauthorized, generating fresh key", - status_code=validation_response.status_code, - ) - LANGFLOW_KEY = None # Clear invalid key - else: - logger.warning( - "Cached API key validation returned non-access error; keeping existing key", - status_code=validation_response.status_code, - ) - return LANGFLOW_KEY - except requests.exceptions.Timeout as e: - logger.warning( - "Cached API key validation timed out; keeping existing key", - error=str(e), - ) - return LANGFLOW_KEY - except requests.exceptions.RequestException as e: - logger.warning( - "Cached API key validation failed due to request error; keeping existing key", - error=str(e), - ) - return LANGFLOW_KEY - except Exception as e: - logger.warning( - "Unexpected error during cached API key validation; keeping existing key", - error=str(e), - ) - return LANGFLOW_KEY + # If we have a cached key and not forcing regeneration, return it + if LANGFLOW_KEY and not force_regenerate: + return LANGFLOW_KEY + + # If forcing regeneration, clear the cached key + if force_regenerate and LANGFLOW_KEY: + logger.info("Forcing Langflow API key regeneration due to auth failure") + LANGFLOW_KEY = None # Use default langflow/langflow credentials if auto-login is enabled and credentials not set username = LANGFLOW_SUPERUSER @@ -216,72 +184,70 @@ async def generate_langflow_api_key(modify: bool = False): max_attempts = int(os.getenv("LANGFLOW_KEY_RETRIES", "15")) delay_seconds = float(os.getenv("LANGFLOW_KEY_RETRY_DELAY", "2.0")) - for attempt in range(1, max_attempts + 1): - try: - # Login to get access token - login_response = requests.post( - f"{LANGFLOW_URL}/api/v1/login", - headers={"Content-Type": "application/x-www-form-urlencoded"}, - data={ - "username": username, - "password": password, - }, - timeout=10, - ) - login_response.raise_for_status() - access_token = login_response.json().get("access_token") - if not access_token: - raise KeyError("access_token") - - # Create API key - api_key_response = requests.post( - f"{LANGFLOW_URL}/api/v1/api_key/", - headers={ - "Content-Type": "application/json", - "Authorization": f"Bearer {access_token}", - }, - json={"name": "openrag-auto-generated"}, - timeout=10, - ) - api_key_response.raise_for_status() - api_key = api_key_response.json().get("api_key") - if not api_key: - raise KeyError("api_key") - - # Validate the API key works - validation_response = requests.get( - f"{LANGFLOW_URL}/api/v1/users/whoami", - headers={"x-api-key": api_key}, - timeout=10, - ) - if validation_response.status_code == 200: - LANGFLOW_KEY = api_key - logger.info( - "Successfully generated and validated Langflow API key", - key_prefix=api_key[:8], + async with httpx.AsyncClient(timeout=10.0) as client: + for attempt in range(1, max_attempts + 1): + try: + # Login to get access token + login_response = await client.post( + f"{LANGFLOW_URL}/api/v1/login", + headers={"Content-Type": "application/x-www-form-urlencoded"}, + data={ + "username": username, + "password": password, + }, ) - return api_key - else: - logger.error( - "Generated API key validation failed", - status_code=validation_response.status_code, - ) - raise ValueError( - f"API key validation failed: {validation_response.status_code}" - ) - except (requests.exceptions.RequestException, KeyError) as e: - logger.warning( - "Attempt to generate Langflow API key failed", - attempt=attempt, - max_attempts=max_attempts, - error=str(e), - ) - if attempt < max_attempts: - time.sleep(delay_seconds) - else: - raise + login_response.raise_for_status() + access_token = login_response.json().get("access_token") + if not access_token: + raise KeyError("access_token") - except requests.exceptions.RequestException as e: + # Create API key + api_key_response = await client.post( + f"{LANGFLOW_URL}/api/v1/api_key/", + headers={ + "Content-Type": "application/json", + "Authorization": f"Bearer {access_token}", + }, + json={"name": "openrag-auto-generated"}, + ) + api_key_response.raise_for_status() + api_key = api_key_response.json().get("api_key") + if not api_key: + raise KeyError("api_key") + + # Validate the API key works + validation_response = await client.get( + f"{LANGFLOW_URL}/api/v1/users/whoami", + headers={"x-api-key": api_key}, + ) + if validation_response.status_code == 200: + LANGFLOW_KEY = api_key + logger.info( + "Successfully generated and validated Langflow API key", + key_prefix=api_key[:8], + ) + return api_key + else: + logger.error( + "Generated API key validation failed", + status_code=validation_response.status_code, + ) + raise ValueError( + f"API key validation failed: {validation_response.status_code}" + ) + except (httpx.HTTPStatusError, httpx.RequestError, KeyError) as e: + logger.warning( + "Attempt to generate Langflow API key failed", + attempt=attempt, + max_attempts=max_attempts, + error=str(e), + ) + if attempt < max_attempts: + await asyncio.sleep(delay_seconds) + else: + raise + + except (httpx.HTTPStatusError, httpx.RequestError) as e: logger.error("Failed to generate Langflow API key", error=str(e)) return None except KeyError as e: @@ -303,7 +269,7 @@ class AppClients: async def initialize(self): # Generate Langflow API key first - await generate_langflow_api_key() + await get_langflow_api_key() # Initialize OpenSearch client self.opensearch = AsyncOpenSearch( @@ -362,7 +328,7 @@ class AppClients: if self.langflow_client is not None: return self.langflow_client # Try generating key again (with retries) - await generate_langflow_api_key() + await get_langflow_api_key() if LANGFLOW_KEY and self.langflow_client is None: try: self.langflow_client = AsyncOpenAI( @@ -559,8 +525,11 @@ class AppClients: self.langflow_client = None async def langflow_request(self, method: str, endpoint: str, **kwargs): - """Central method for all Langflow API requests""" - api_key = await generate_langflow_api_key() + """Central method for all Langflow API requests. + + Retries once with a fresh API key on auth failures (401/403). + """ + api_key = await get_langflow_api_key() if not api_key: raise ValueError("No Langflow API key available") @@ -575,15 +544,31 @@ class AppClients: url = f"{LANGFLOW_URL}{endpoint}" - return await self.langflow_http_client.request( + response = await self.langflow_http_client.request( method=method, url=url, headers=headers, **kwargs ) + # Retry once with a fresh API key on auth failure + if response.status_code in (401, 403): + logger.warning( + "Langflow request auth failed, regenerating API key and retrying", + status_code=response.status_code, + endpoint=endpoint, + ) + api_key = await get_langflow_api_key(force_regenerate=True) + if api_key: + headers["x-api-key"] = api_key + response = await self.langflow_http_client.request( + method=method, url=url, headers=headers, **kwargs + ) + + return response + async def _create_langflow_global_variable( self, name: str, value: str, modify: bool = False ): """Create a global variable in Langflow via API""" - api_key = await generate_langflow_api_key() + api_key = await get_langflow_api_key() if not api_key: logger.warning( "Cannot create Langflow global variable: No API key", variable_name=name @@ -635,7 +620,7 @@ class AppClients: async def _update_langflow_global_variable(self, name: str, value: str): """Update an existing global variable in Langflow via API""" - api_key = await generate_langflow_api_key() + api_key = await get_langflow_api_key() if not api_key: logger.warning( "Cannot update Langflow global variable: No API key", variable_name=name