Merge pull request #338 from langflow-ai/tui-optional-openai-key
Tui optional OpenAI key
This commit is contained in:
commit
a35235eb59
6 changed files with 169 additions and 11 deletions
4
Makefile
4
Makefile
|
|
@ -2,10 +2,12 @@
|
||||||
# Provides easy commands for development workflow
|
# Provides easy commands for development workflow
|
||||||
|
|
||||||
# Load variables from .env if present so `make` commands pick them up
|
# Load variables from .env if present so `make` commands pick them up
|
||||||
|
# Strip quotes from values to avoid issues with tools that don't handle them like python-dotenv does
|
||||||
ifneq (,$(wildcard .env))
|
ifneq (,$(wildcard .env))
|
||||||
include .env
|
include .env
|
||||||
# Export all simple KEY=VALUE pairs to the environment for child processes
|
|
||||||
export $(shell sed -n 's/^\([A-Za-z_][A-Za-z0-9_]*\)=.*/\1/p' .env)
|
export $(shell sed -n 's/^\([A-Za-z_][A-Za-z0-9_]*\)=.*/\1/p' .env)
|
||||||
|
# Strip single quotes from all exported variables
|
||||||
|
$(foreach var,$(shell sed -n 's/^\([A-Za-z_][A-Za-z0-9_]*\)=.*/\1/p' .env),$(eval $(var):=$(shell echo $($(var)) | sed "s/^'//;s/'$$//")))
|
||||||
endif
|
endif
|
||||||
|
|
||||||
.PHONY: help dev dev-cpu dev-local infra stop clean build logs shell-backend shell-frontend install \
|
.PHONY: help dev dev-cpu dev-local infra stop clean build logs shell-backend shell-frontend install \
|
||||||
|
|
|
||||||
|
|
@ -279,7 +279,8 @@ class AppClients:
|
||||||
self.opensearch = None
|
self.opensearch = None
|
||||||
self.langflow_client = None
|
self.langflow_client = None
|
||||||
self.langflow_http_client = None
|
self.langflow_http_client = None
|
||||||
self.patched_async_client = None
|
self._patched_async_client = None # Private attribute
|
||||||
|
self._client_init_lock = __import__('threading').Lock() # Lock for thread-safe initialization
|
||||||
self.converter = None
|
self.converter = None
|
||||||
|
|
||||||
async def initialize(self):
|
async def initialize(self):
|
||||||
|
|
@ -318,8 +319,15 @@ class AppClients:
|
||||||
"No Langflow client initialized yet, will attempt later on first use"
|
"No Langflow client initialized yet, will attempt later on first use"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Initialize patched OpenAI client
|
# Initialize patched OpenAI client if API key is available
|
||||||
self.patched_async_client = patch_openai_with_mcp(AsyncOpenAI())
|
# This allows the app to start even if OPENAI_API_KEY is not set yet
|
||||||
|
# (e.g., when it will be provided during onboarding)
|
||||||
|
# The property will handle lazy initialization with probe when first accessed
|
||||||
|
openai_key = os.getenv("OPENAI_API_KEY")
|
||||||
|
if openai_key:
|
||||||
|
logger.info("OpenAI API key found in environment - will be initialized lazily on first use with HTTP/2 probe")
|
||||||
|
else:
|
||||||
|
logger.info("OpenAI API key not found in environment - will be initialized on first use if needed")
|
||||||
|
|
||||||
# Initialize document converter
|
# Initialize document converter
|
||||||
self.converter = create_document_converter(ocr_engine=DOCLING_OCR_ENGINE)
|
self.converter = create_document_converter(ocr_engine=DOCLING_OCR_ENGINE)
|
||||||
|
|
@ -350,6 +358,145 @@ class AppClients:
|
||||||
self.langflow_client = None
|
self.langflow_client = None
|
||||||
return self.langflow_client
|
return self.langflow_client
|
||||||
|
|
||||||
|
@property
|
||||||
|
def patched_async_client(self):
|
||||||
|
"""
|
||||||
|
Property that ensures OpenAI client is initialized on first access.
|
||||||
|
This allows lazy initialization so the app can start without an API key.
|
||||||
|
|
||||||
|
Note: The client is a long-lived singleton that should be closed via cleanup().
|
||||||
|
Thread-safe via lock to prevent concurrent initialization attempts.
|
||||||
|
"""
|
||||||
|
# Quick check without lock
|
||||||
|
if self._patched_async_client is not None:
|
||||||
|
return self._patched_async_client
|
||||||
|
|
||||||
|
# Use lock to ensure only one thread initializes
|
||||||
|
with self._client_init_lock:
|
||||||
|
# Double-check after acquiring lock
|
||||||
|
if self._patched_async_client is not None:
|
||||||
|
return self._patched_async_client
|
||||||
|
|
||||||
|
# Try to initialize the client on-demand
|
||||||
|
# First check if OPENAI_API_KEY is in environment
|
||||||
|
openai_key = os.getenv("OPENAI_API_KEY")
|
||||||
|
|
||||||
|
if not openai_key:
|
||||||
|
# Try to get from config (in case it was set during onboarding)
|
||||||
|
try:
|
||||||
|
config = get_openrag_config()
|
||||||
|
if config and config.provider and config.provider.api_key:
|
||||||
|
openai_key = config.provider.api_key
|
||||||
|
# Set it in environment so AsyncOpenAI can pick it up
|
||||||
|
os.environ["OPENAI_API_KEY"] = openai_key
|
||||||
|
logger.info("Loaded OpenAI API key from config file")
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug("Could not load OpenAI key from config", error=str(e))
|
||||||
|
|
||||||
|
# Try to initialize the client - AsyncOpenAI() will read from environment
|
||||||
|
# We'll try HTTP/2 first with a probe, then fall back to HTTP/1.1 if it times out
|
||||||
|
import asyncio
|
||||||
|
import concurrent.futures
|
||||||
|
import threading
|
||||||
|
|
||||||
|
async def probe_and_initialize():
|
||||||
|
# Try HTTP/2 first (default)
|
||||||
|
client_http2 = patch_openai_with_mcp(AsyncOpenAI())
|
||||||
|
logger.info("Probing OpenAI client with HTTP/2...")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Probe with a small embedding and short timeout
|
||||||
|
await asyncio.wait_for(
|
||||||
|
client_http2.embeddings.create(
|
||||||
|
model='text-embedding-3-small',
|
||||||
|
input=['test']
|
||||||
|
),
|
||||||
|
timeout=5.0
|
||||||
|
)
|
||||||
|
logger.info("OpenAI client initialized with HTTP/2 (probe successful)")
|
||||||
|
return client_http2
|
||||||
|
except (asyncio.TimeoutError, Exception) as probe_error:
|
||||||
|
logger.warning("HTTP/2 probe failed, falling back to HTTP/1.1", error=str(probe_error))
|
||||||
|
# Close the HTTP/2 client
|
||||||
|
try:
|
||||||
|
await client_http2.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Fall back to HTTP/1.1 with explicit timeout settings
|
||||||
|
http_client = httpx.AsyncClient(
|
||||||
|
http2=False,
|
||||||
|
timeout=httpx.Timeout(60.0, connect=10.0)
|
||||||
|
)
|
||||||
|
client_http1 = patch_openai_with_mcp(
|
||||||
|
AsyncOpenAI(http_client=http_client)
|
||||||
|
)
|
||||||
|
logger.info("OpenAI client initialized with HTTP/1.1 (fallback)")
|
||||||
|
return client_http1
|
||||||
|
|
||||||
|
def run_probe_in_thread():
|
||||||
|
"""Run the async probe in a new thread with its own event loop"""
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(loop)
|
||||||
|
try:
|
||||||
|
return loop.run_until_complete(probe_and_initialize())
|
||||||
|
finally:
|
||||||
|
loop.close()
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Run the probe in a separate thread with its own event loop
|
||||||
|
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
|
||||||
|
future = executor.submit(run_probe_in_thread)
|
||||||
|
self._patched_async_client = future.result(timeout=15)
|
||||||
|
logger.info("Successfully initialized OpenAI client")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to initialize OpenAI client: {e.__class__.__name__}: {str(e)}")
|
||||||
|
raise ValueError(f"Failed to initialize OpenAI client: {str(e)}. Please complete onboarding or set OPENAI_API_KEY environment variable.")
|
||||||
|
|
||||||
|
return self._patched_async_client
|
||||||
|
|
||||||
|
async def cleanup(self):
|
||||||
|
"""Cleanup resources - should be called on application shutdown"""
|
||||||
|
# Close AsyncOpenAI client if it was created
|
||||||
|
if self._patched_async_client is not None:
|
||||||
|
try:
|
||||||
|
await self._patched_async_client.close()
|
||||||
|
logger.info("Closed AsyncOpenAI client")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Failed to close AsyncOpenAI client", error=str(e))
|
||||||
|
finally:
|
||||||
|
self._patched_async_client = None
|
||||||
|
|
||||||
|
# Close Langflow HTTP client if it exists
|
||||||
|
if self.langflow_http_client is not None:
|
||||||
|
try:
|
||||||
|
await self.langflow_http_client.aclose()
|
||||||
|
logger.info("Closed Langflow HTTP client")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Failed to close Langflow HTTP client", error=str(e))
|
||||||
|
finally:
|
||||||
|
self.langflow_http_client = None
|
||||||
|
|
||||||
|
# Close OpenSearch client if it exists
|
||||||
|
if self.opensearch is not None:
|
||||||
|
try:
|
||||||
|
await self.opensearch.close()
|
||||||
|
logger.info("Closed OpenSearch client")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Failed to close OpenSearch client", error=str(e))
|
||||||
|
finally:
|
||||||
|
self.opensearch = None
|
||||||
|
|
||||||
|
# Close Langflow client if it exists (also an AsyncOpenAI client)
|
||||||
|
if self.langflow_client is not None:
|
||||||
|
try:
|
||||||
|
await self.langflow_client.close()
|
||||||
|
logger.info("Closed Langflow client")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Failed to close Langflow client", error=str(e))
|
||||||
|
finally:
|
||||||
|
self.langflow_client = None
|
||||||
|
|
||||||
async def langflow_request(self, method: str, endpoint: str, **kwargs):
|
async def langflow_request(self, method: str, endpoint: str, **kwargs):
|
||||||
"""Central method for all Langflow API requests"""
|
"""Central method for all Langflow API requests"""
|
||||||
api_key = await generate_langflow_api_key()
|
api_key = await generate_langflow_api_key()
|
||||||
|
|
|
||||||
|
|
@ -21,7 +21,7 @@ class ConnectorService:
|
||||||
task_service=None,
|
task_service=None,
|
||||||
session_manager=None,
|
session_manager=None,
|
||||||
):
|
):
|
||||||
self.openai_client = patched_async_client
|
self.clients = patched_async_client # Store the clients object to access the property
|
||||||
self.process_pool = process_pool
|
self.process_pool = process_pool
|
||||||
self.embed_model = embed_model
|
self.embed_model = embed_model
|
||||||
self.index_name = index_name
|
self.index_name = index_name
|
||||||
|
|
|
||||||
|
|
@ -470,7 +470,7 @@ async def initialize_services():
|
||||||
session_manager=session_manager,
|
session_manager=session_manager,
|
||||||
)
|
)
|
||||||
openrag_connector_service = ConnectorService(
|
openrag_connector_service = ConnectorService(
|
||||||
patched_async_client=clients.patched_async_client,
|
patched_async_client=clients, # Pass the clients object itself
|
||||||
process_pool=process_pool,
|
process_pool=process_pool,
|
||||||
embed_model=get_embedding_model(),
|
embed_model=get_embedding_model(),
|
||||||
index_name=INDEX_NAME,
|
index_name=INDEX_NAME,
|
||||||
|
|
@ -1108,6 +1108,8 @@ async def create_app():
|
||||||
@app.on_event("shutdown")
|
@app.on_event("shutdown")
|
||||||
async def shutdown_event():
|
async def shutdown_event():
|
||||||
await cleanup_subscriptions_proper(services)
|
await cleanup_subscriptions_proper(services)
|
||||||
|
# Cleanup async clients
|
||||||
|
await clients.cleanup()
|
||||||
|
|
||||||
return app
|
return app
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -170,8 +170,9 @@ class EnvManager:
|
||||||
"""
|
"""
|
||||||
self.config.validation_errors.clear()
|
self.config.validation_errors.clear()
|
||||||
|
|
||||||
# Always validate OpenAI API key
|
# OpenAI API key is now optional (can be provided during onboarding)
|
||||||
if not validate_openai_api_key(self.config.openai_api_key):
|
# Only validate format if a key is provided
|
||||||
|
if self.config.openai_api_key and not validate_openai_api_key(self.config.openai_api_key):
|
||||||
self.config.validation_errors["openai_api_key"] = (
|
self.config.validation_errors["openai_api_key"] = (
|
||||||
"Invalid OpenAI API key format (should start with sk-)"
|
"Invalid OpenAI API key format (should start with sk-)"
|
||||||
)
|
)
|
||||||
|
|
@ -268,7 +269,9 @@ class EnvManager:
|
||||||
f.write(f"LANGFLOW_URL_INGEST_FLOW_ID={self._quote_env_value(self.config.langflow_url_ingest_flow_id)}\n")
|
f.write(f"LANGFLOW_URL_INGEST_FLOW_ID={self._quote_env_value(self.config.langflow_url_ingest_flow_id)}\n")
|
||||||
f.write(f"NUDGES_FLOW_ID={self._quote_env_value(self.config.nudges_flow_id)}\n")
|
f.write(f"NUDGES_FLOW_ID={self._quote_env_value(self.config.nudges_flow_id)}\n")
|
||||||
f.write(f"OPENSEARCH_PASSWORD={self._quote_env_value(self.config.opensearch_password)}\n")
|
f.write(f"OPENSEARCH_PASSWORD={self._quote_env_value(self.config.opensearch_password)}\n")
|
||||||
f.write(f"OPENAI_API_KEY={self._quote_env_value(self.config.openai_api_key)}\n")
|
# Only write OpenAI API key if provided (can be set during onboarding instead)
|
||||||
|
if self.config.openai_api_key:
|
||||||
|
f.write(f"OPENAI_API_KEY={self._quote_env_value(self.config.openai_api_key)}\n")
|
||||||
f.write(
|
f.write(
|
||||||
f"OPENRAG_DOCUMENTS_PATHS={self._quote_env_value(self.config.openrag_documents_paths)}\n"
|
f"OPENRAG_DOCUMENTS_PATHS={self._quote_env_value(self.config.openrag_documents_paths)}\n"
|
||||||
)
|
)
|
||||||
|
|
@ -345,7 +348,7 @@ class EnvManager:
|
||||||
def get_no_auth_setup_fields(self) -> List[tuple[str, str, str, bool]]:
|
def get_no_auth_setup_fields(self) -> List[tuple[str, str, str, bool]]:
|
||||||
"""Get fields required for no-auth setup mode. Returns (field_name, display_name, placeholder, can_generate)."""
|
"""Get fields required for no-auth setup mode. Returns (field_name, display_name, placeholder, can_generate)."""
|
||||||
return [
|
return [
|
||||||
("openai_api_key", "OpenAI API Key", "sk-...", False),
|
("openai_api_key", "OpenAI API Key", "sk-... or leave empty", False),
|
||||||
(
|
(
|
||||||
"opensearch_password",
|
"opensearch_password",
|
||||||
"OpenSearch Password",
|
"OpenSearch Password",
|
||||||
|
|
|
||||||
|
|
@ -203,12 +203,16 @@ class ConfigScreen(Screen):
|
||||||
yield Static(" ")
|
yield Static(" ")
|
||||||
|
|
||||||
# OpenAI API Key
|
# OpenAI API Key
|
||||||
yield Label("OpenAI API Key *")
|
yield Label("OpenAI API Key")
|
||||||
# Where to create OpenAI keys (helper above the box)
|
# Where to create OpenAI keys (helper above the box)
|
||||||
yield Static(
|
yield Static(
|
||||||
Text("Get a key: https://platform.openai.com/api-keys", style="dim"),
|
Text("Get a key: https://platform.openai.com/api-keys", style="dim"),
|
||||||
classes="helper-text",
|
classes="helper-text",
|
||||||
)
|
)
|
||||||
|
yield Static(
|
||||||
|
Text("Can also be provided during onboarding", style="dim italic"),
|
||||||
|
classes="helper-text",
|
||||||
|
)
|
||||||
current_value = getattr(self.env_manager.config, "openai_api_key", "")
|
current_value = getattr(self.env_manager.config, "openai_api_key", "")
|
||||||
with Horizontal(id="openai-key-row"):
|
with Horizontal(id="openai-key-row"):
|
||||||
input_widget = Input(
|
input_widget = Input(
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue