lazy client initialization + client cleanup + http2 probe and fallback
This commit is contained in:
parent
653ed344be
commit
563efd957f
2 changed files with 133 additions and 32 deletions
|
|
@ -280,6 +280,7 @@ class AppClients:
|
|||
self.langflow_client = None
|
||||
self.langflow_http_client = None
|
||||
self._patched_async_client = None # Private attribute
|
||||
self._client_init_lock = __import__('threading').Lock() # Lock for thread-safe initialization
|
||||
self.converter = None
|
||||
|
||||
async def initialize(self):
|
||||
|
|
@ -321,18 +322,12 @@ class AppClients:
|
|||
# Initialize patched OpenAI client if API key is available
|
||||
# This allows the app to start even if OPENAI_API_KEY is not set yet
|
||||
# (e.g., when it will be provided during onboarding)
|
||||
# The property will handle lazy initialization if needed later
|
||||
if self._patched_async_client is None:
|
||||
try:
|
||||
openai_key = os.getenv("OPENAI_API_KEY")
|
||||
if openai_key:
|
||||
self._patched_async_client = patch_openai_with_mcp(AsyncOpenAI())
|
||||
logger.info("OpenAI client initialized with API key from environment")
|
||||
else:
|
||||
logger.info("OpenAI API key not found in environment - will be initialized on first use if needed")
|
||||
except Exception as e:
|
||||
logger.warning("Failed to initialize OpenAI client", error=str(e))
|
||||
self._patched_async_client = None
|
||||
# The property will handle lazy initialization with probe when first accessed
|
||||
openai_key = os.getenv("OPENAI_API_KEY")
|
||||
if openai_key:
|
||||
logger.info("OpenAI API key found in environment - will be initialized lazily on first use with HTTP/2 probe")
|
||||
else:
|
||||
logger.info("OpenAI API key not found in environment - will be initialized on first use if needed")
|
||||
|
||||
# Initialize document converter
|
||||
self.converter = create_document_converter(ocr_engine=DOCLING_OCR_ENGINE)
|
||||
|
|
@ -368,35 +363,139 @@ class AppClients:
|
|||
"""
|
||||
Property that ensures OpenAI client is initialized on first access.
|
||||
This allows lazy initialization so the app can start without an API key.
|
||||
|
||||
Note: The client is a long-lived singleton that should be closed via cleanup().
|
||||
Thread-safe via lock to prevent concurrent initialization attempts.
|
||||
"""
|
||||
# Quick check without lock
|
||||
if self._patched_async_client is not None:
|
||||
return self._patched_async_client
|
||||
|
||||
# Try to initialize the client on-demand
|
||||
# First check if OPENAI_API_KEY is in environment
|
||||
openai_key = os.getenv("OPENAI_API_KEY")
|
||||
# Use lock to ensure only one thread initializes
|
||||
with self._client_init_lock:
|
||||
# Double-check after acquiring lock
|
||||
if self._patched_async_client is not None:
|
||||
return self._patched_async_client
|
||||
|
||||
# Try to initialize the client on-demand
|
||||
# First check if OPENAI_API_KEY is in environment
|
||||
openai_key = os.getenv("OPENAI_API_KEY")
|
||||
|
||||
if not openai_key:
|
||||
# Try to get from config (in case it was set during onboarding)
|
||||
try:
|
||||
config = get_openrag_config()
|
||||
if config and config.provider and config.provider.api_key:
|
||||
openai_key = config.provider.api_key
|
||||
# Set it in environment so AsyncOpenAI can pick it up
|
||||
os.environ["OPENAI_API_KEY"] = openai_key
|
||||
logger.info("Loaded OpenAI API key from config file")
|
||||
except Exception as e:
|
||||
logger.debug("Could not load OpenAI key from config", error=str(e))
|
||||
|
||||
# Try to initialize the client - AsyncOpenAI() will read from environment
|
||||
# We'll try HTTP/2 first with a probe, then fall back to HTTP/1.1 if it times out
|
||||
import asyncio
|
||||
import concurrent.futures
|
||||
import threading
|
||||
|
||||
async def probe_and_initialize():
|
||||
# Try HTTP/2 first (default)
|
||||
client_http2 = patch_openai_with_mcp(AsyncOpenAI())
|
||||
logger.info("Probing OpenAI client with HTTP/2...")
|
||||
|
||||
try:
|
||||
# Probe with a small embedding and short timeout
|
||||
await asyncio.wait_for(
|
||||
client_http2.embeddings.create(
|
||||
model='text-embedding-3-small',
|
||||
input=['test']
|
||||
),
|
||||
timeout=5.0
|
||||
)
|
||||
logger.info("OpenAI client initialized with HTTP/2 (probe successful)")
|
||||
return client_http2
|
||||
except (asyncio.TimeoutError, Exception) as probe_error:
|
||||
logger.warning("HTTP/2 probe failed, falling back to HTTP/1.1", error=str(probe_error))
|
||||
# Close the HTTP/2 client
|
||||
try:
|
||||
await client_http2.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Fall back to HTTP/1.1 with explicit timeout settings
|
||||
http_client = httpx.AsyncClient(
|
||||
http2=False,
|
||||
timeout=httpx.Timeout(60.0, connect=10.0)
|
||||
)
|
||||
client_http1 = patch_openai_with_mcp(
|
||||
AsyncOpenAI(http_client=http_client)
|
||||
)
|
||||
logger.info("OpenAI client initialized with HTTP/1.1 (fallback)")
|
||||
return client_http1
|
||||
|
||||
def run_probe_in_thread():
|
||||
"""Run the async probe in a new thread with its own event loop"""
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
try:
|
||||
return loop.run_until_complete(probe_and_initialize())
|
||||
finally:
|
||||
loop.close()
|
||||
|
||||
if not openai_key:
|
||||
# Try to get from config (in case it was set during onboarding)
|
||||
try:
|
||||
config = get_openrag_config()
|
||||
if config and config.provider and config.provider.api_key:
|
||||
openai_key = config.provider.api_key
|
||||
# Set it in environment so AsyncOpenAI can pick it up
|
||||
os.environ["OPENAI_API_KEY"] = openai_key
|
||||
logger.info("Loaded OpenAI API key from config file")
|
||||
# Run the probe in a separate thread with its own event loop
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
|
||||
future = executor.submit(run_probe_in_thread)
|
||||
self._patched_async_client = future.result(timeout=15)
|
||||
logger.info("Successfully initialized OpenAI client")
|
||||
except Exception as e:
|
||||
logger.debug("Could not load OpenAI key from config", error=str(e))
|
||||
logger.error(f"Failed to initialize OpenAI client: {e.__class__.__name__}: {str(e)}")
|
||||
raise ValueError(f"Failed to initialize OpenAI client: {str(e)}. Please complete onboarding or set OPENAI_API_KEY environment variable.")
|
||||
|
||||
# Try to initialize the client - AsyncOpenAI() will read from environment
|
||||
try:
|
||||
self._patched_async_client = patch_openai_with_mcp(AsyncOpenAI())
|
||||
logger.info("OpenAI client initialized on-demand")
|
||||
except Exception as e:
|
||||
logger.error("Failed to initialize OpenAI client on-demand", error=str(e))
|
||||
raise ValueError(f"Failed to initialize OpenAI client: {str(e)}. Please complete onboarding or set OPENAI_API_KEY environment variable.")
|
||||
return self._patched_async_client
|
||||
|
||||
return self._patched_async_client
|
||||
async def cleanup(self):
|
||||
"""Cleanup resources - should be called on application shutdown"""
|
||||
# Close AsyncOpenAI client if it was created
|
||||
if self._patched_async_client is not None:
|
||||
try:
|
||||
await self._patched_async_client.close()
|
||||
logger.info("Closed AsyncOpenAI client")
|
||||
except Exception as e:
|
||||
logger.error("Failed to close AsyncOpenAI client", error=str(e))
|
||||
finally:
|
||||
self._patched_async_client = None
|
||||
|
||||
# Close Langflow HTTP client if it exists
|
||||
if self.langflow_http_client is not None:
|
||||
try:
|
||||
await self.langflow_http_client.aclose()
|
||||
logger.info("Closed Langflow HTTP client")
|
||||
except Exception as e:
|
||||
logger.error("Failed to close Langflow HTTP client", error=str(e))
|
||||
finally:
|
||||
self.langflow_http_client = None
|
||||
|
||||
# Close OpenSearch client if it exists
|
||||
if self.opensearch is not None:
|
||||
try:
|
||||
await self.opensearch.close()
|
||||
logger.info("Closed OpenSearch client")
|
||||
except Exception as e:
|
||||
logger.error("Failed to close OpenSearch client", error=str(e))
|
||||
finally:
|
||||
self.opensearch = None
|
||||
|
||||
# Close Langflow client if it exists (also an AsyncOpenAI client)
|
||||
if self.langflow_client is not None:
|
||||
try:
|
||||
await self.langflow_client.close()
|
||||
logger.info("Closed Langflow client")
|
||||
except Exception as e:
|
||||
logger.error("Failed to close Langflow client", error=str(e))
|
||||
finally:
|
||||
self.langflow_client = None
|
||||
|
||||
async def langflow_request(self, method: str, endpoint: str, **kwargs):
|
||||
"""Central method for all Langflow API requests"""
|
||||
|
|
|
|||
|
|
@ -1108,6 +1108,8 @@ async def create_app():
|
|||
@app.on_event("shutdown")
|
||||
async def shutdown_event():
|
||||
await cleanup_subscriptions_proper(services)
|
||||
# Cleanup async clients
|
||||
await clients.cleanup()
|
||||
|
||||
return app
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue