From c7173baf3ded7eac874bb59619a78ddb27e8c59a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20MANSUY?= Date: Thu, 4 Dec 2025 19:14:25 +0800 Subject: [PATCH] cherry-pick ec40b17e --- lightrag/llm/openai.py | 108 ++++++++++------------------------------- 1 file changed, 25 insertions(+), 83 deletions(-) diff --git a/lightrag/llm/openai.py b/lightrag/llm/openai.py index a2bbfa23..f7b759ad 100644 --- a/lightrag/llm/openai.py +++ b/lightrag/llm/openai.py @@ -11,6 +11,7 @@ if not pm.is_installed("openai"): pm.install("openai") from openai import ( + AsyncOpenAI, APIConnectionError, RateLimitError, APITimeoutError, @@ -26,7 +27,6 @@ from lightrag.utils import ( safe_unicode_decode, logger, ) - from lightrag.types import GPTKeywordExtractionFormat from lightrag.api import __api_version__ @@ -36,32 +36,6 @@ from typing import Any, Union from dotenv import load_dotenv -# Try to import Langfuse for LLM observability (optional) -# Falls back to standard OpenAI client if not available -# Langfuse requires proper configuration to work correctly -LANGFUSE_ENABLED = False -try: - # Check if required Langfuse environment variables are set - langfuse_public_key = os.environ.get("LANGFUSE_PUBLIC_KEY") - langfuse_secret_key = os.environ.get("LANGFUSE_SECRET_KEY") - - # Only enable Langfuse if both keys are configured - if langfuse_public_key and langfuse_secret_key: - from langfuse.openai import AsyncOpenAI - - LANGFUSE_ENABLED = True - logger.info("Langfuse observability enabled for OpenAI client") - else: - from openai import AsyncOpenAI - - logger.debug( - "Langfuse environment variables not configured, using standard OpenAI client" - ) -except ImportError: - from openai import AsyncOpenAI - - logger.debug("Langfuse not available, using standard OpenAI client") - # use the .env that is inside the current folder # allows to use different .env file for each lightrag instance # the OS environment variables take precedence over the .env file @@ -138,9 +112,6 @@ async def openai_complete_if_cache( base_url: str | None = None, api_key: str | None = None, token_tracker: Any | None = None, - stream: bool | None = None, - timeout: int | None = None, - keyword_extraction: bool = False, **kwargs: Any, ) -> str: """Complete a prompt using OpenAI's API with caching support and Chain of Thought (COT) integration. @@ -170,15 +141,13 @@ async def openai_complete_if_cache( api_key: Optional OpenAI API key. If None, uses the OPENAI_API_KEY environment variable. token_tracker: Optional token usage tracker for monitoring API usage. enable_cot: Whether to enable Chain of Thought (COT) processing. Default is False. - stream: Whether to stream the response. Default is False. - timeout: Request timeout in seconds. Default is None. - keyword_extraction: Whether to enable keyword extraction mode. When True, triggers - special response formatting for keyword extraction. Default is False. **kwargs: Additional keyword arguments to pass to the OpenAI API. Special kwargs: - openai_client_configs: Dict of configuration options for the AsyncOpenAI client. These will be passed to the client constructor but will be overridden by explicit parameters (api_key, base_url). + - hashing_kv: Will be removed from kwargs before passing to OpenAI. + - keyword_extraction: Will be removed from kwargs before passing to OpenAI. Returns: The completed text (with integrated COT content if available) or an async iterator @@ -199,6 +168,7 @@ async def openai_complete_if_cache( # Remove special kwargs that shouldn't be passed to OpenAI kwargs.pop("hashing_kv", None) + kwargs.pop("keyword_extraction", None) # Extract client configuration options client_configs = kwargs.pop("openai_client_configs", {}) @@ -228,12 +198,6 @@ async def openai_complete_if_cache( messages = kwargs.pop("messages", messages) - # Add explicit parameters back to kwargs so they're passed to OpenAI API - if stream is not None: - kwargs["stream"] = stream - if timeout is not None: - kwargs["timeout"] = timeout - try: # Don't use async with context manager, use client directly if "response_format" in kwargs: @@ -406,23 +370,18 @@ async def openai_complete_if_cache( ) # Ensure resources are released even if no exception occurs - # Note: Some wrapped clients (e.g., Langfuse) may not implement aclose() properly - if iteration_started and hasattr(response, "aclose"): - aclose_method = getattr(response, "aclose", None) - if callable(aclose_method): - try: - await response.aclose() - logger.debug("Successfully closed stream response") - except (AttributeError, TypeError) as close_error: - # Some wrapper objects may report hasattr(aclose) but fail when called - # This is expected behavior for certain client wrappers - logger.debug( - f"Stream response cleanup not supported by client wrapper: {close_error}" - ) - except Exception as close_error: - logger.warning( - f"Unexpected error during stream response cleanup: {close_error}" - ) + if ( + iteration_started + and hasattr(response, "aclose") + and callable(getattr(response, "aclose", None)) + ): + try: + await response.aclose() + logger.debug("Successfully closed stream response") + except Exception as close_error: + logger.warning( + f"Failed to close stream response in finally block: {close_error}" + ) # This prevents resource leaks since the caller doesn't handle closing try: @@ -522,6 +481,7 @@ async def openai_complete( ) -> Union[str, AsyncIterator[str]]: if history_messages is None: history_messages = [] + keyword_extraction = kwargs.pop("keyword_extraction", None) if keyword_extraction: kwargs["response_format"] = "json" model_name = kwargs["hashing_kv"].global_config["llm_model_name"] @@ -530,7 +490,6 @@ async def openai_complete( prompt, system_prompt=system_prompt, history_messages=history_messages, - keyword_extraction=keyword_extraction, **kwargs, ) @@ -545,6 +504,7 @@ async def gpt_4o_complete( ) -> str: if history_messages is None: history_messages = [] + keyword_extraction = kwargs.pop("keyword_extraction", None) if keyword_extraction: kwargs["response_format"] = GPTKeywordExtractionFormat return await openai_complete_if_cache( @@ -553,7 +513,6 @@ async def gpt_4o_complete( system_prompt=system_prompt, history_messages=history_messages, enable_cot=enable_cot, - keyword_extraction=keyword_extraction, **kwargs, ) @@ -568,6 +527,7 @@ async def gpt_4o_mini_complete( ) -> str: if history_messages is None: history_messages = [] + keyword_extraction = kwargs.pop("keyword_extraction", None) if keyword_extraction: kwargs["response_format"] = GPTKeywordExtractionFormat return await openai_complete_if_cache( @@ -576,7 +536,6 @@ async def gpt_4o_mini_complete( system_prompt=system_prompt, history_messages=history_messages, enable_cot=enable_cot, - keyword_extraction=keyword_extraction, **kwargs, ) @@ -591,13 +550,13 @@ async def nvidia_openai_complete( ) -> str: if history_messages is None: history_messages = [] + kwargs.pop("keyword_extraction", None) result = await openai_complete_if_cache( "nvidia/llama-3.1-nemotron-70b-instruct", # context length 128k prompt, system_prompt=system_prompt, history_messages=history_messages, enable_cot=enable_cot, - keyword_extraction=keyword_extraction, base_url="https://integrate.api.nvidia.com/v1", **kwargs, ) @@ -619,7 +578,6 @@ async def openai_embed( model: str = "text-embedding-3-small", base_url: str | None = None, api_key: str | None = None, - embedding_dim: int | None = None, client_configs: dict[str, Any] | None = None, token_tracker: Any | None = None, ) -> np.ndarray: @@ -630,12 +588,6 @@ async def openai_embed( model: The OpenAI embedding model to use. base_url: Optional base URL for the OpenAI API. api_key: Optional OpenAI API key. If None, uses the OPENAI_API_KEY environment variable. - embedding_dim: Optional embedding dimension for dynamic dimension reduction. - **IMPORTANT**: This parameter is automatically injected by the EmbeddingFunc wrapper. - Do NOT manually pass this parameter when calling the function directly. - The dimension is controlled by the @wrap_embedding_func_with_attrs decorator. - Manually passing a different value will trigger a warning and be ignored. - When provided (by EmbeddingFunc), it will be passed to the OpenAI API for dimension reduction. client_configs: Additional configuration options for the AsyncOpenAI client. These will override any default configurations but will be overridden by explicit parameters (api_key, base_url). @@ -655,27 +607,17 @@ async def openai_embed( ) async with openai_async_client: - # Prepare API call parameters - api_params = { - "model": model, - "input": texts, - "encoding_format": "base64", - } - - # Add dimensions parameter only if embedding_dim is provided - if embedding_dim is not None: - api_params["dimensions"] = embedding_dim - - # Make API call - response = await openai_async_client.embeddings.create(**api_params) - + response = await openai_async_client.embeddings.create( + model=model, input=texts, encoding_format="base64" + ) + if token_tracker and hasattr(response, "usage"): token_counts = { "prompt_tokens": getattr(response.usage, "prompt_tokens", 0), "total_tokens": getattr(response.usage, "total_tokens", 0), } token_tracker.add_usage(token_counts) - + return np.array( [ np.array(dp.embedding, dtype=np.float32)