From 10f6e6955f2ccc7369ba1c0a3ba69f1deb699b08 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 3 Nov 2025 13:09:45 +0800 Subject: [PATCH] Improve Langfuse integration and stream response cleanup handling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Check env vars before enabling Langfuse • Move imports after env check logic • Handle wrapper client aclose() issues • Add debug logs for cleanup failures --- lightrag/llm/openai.py | 67 +++++++++++++++++++++++++++--------------- 1 file changed, 43 insertions(+), 24 deletions(-) diff --git a/lightrag/llm/openai.py b/lightrag/llm/openai.py index 1eaf3bd5..66c3bfe4 100644 --- a/lightrag/llm/openai.py +++ b/lightrag/llm/openai.py @@ -27,18 +27,6 @@ from lightrag.utils import ( logger, ) -# Try to import Langfuse for LLM observability (optional) -# Falls back to standard OpenAI client if not available -try: - from langfuse.openai import AsyncOpenAI - - LANGFUSE_ENABLED = True - logger.info("Langfuse observability enabled for OpenAI client") -except ImportError: - from openai import AsyncOpenAI - - LANGFUSE_ENABLED = False - logger.debug("Langfuse not available, using standard OpenAI client") from lightrag.types import GPTKeywordExtractionFormat from lightrag.api import __api_version__ @@ -48,6 +36,32 @@ from typing import Any, Union from dotenv import load_dotenv +# Try to import Langfuse for LLM observability (optional) +# Falls back to standard OpenAI client if not available +# Langfuse requires proper configuration to work correctly +LANGFUSE_ENABLED = False +try: + # Check if required Langfuse environment variables are set + langfuse_public_key = os.environ.get("LANGFUSE_PUBLIC_KEY") + langfuse_secret_key = os.environ.get("LANGFUSE_SECRET_KEY") + + # Only enable Langfuse if both keys are configured + if langfuse_public_key and langfuse_secret_key: + from langfuse.openai import AsyncOpenAI + + LANGFUSE_ENABLED = True + logger.info("Langfuse observability enabled for OpenAI client") + else: + from openai import AsyncOpenAI + + logger.debug( + "Langfuse environment variables not configured, using standard OpenAI client" + ) +except ImportError: + from openai import AsyncOpenAI + + logger.debug("Langfuse not available, using standard OpenAI client") + # use the .env that is inside the current folder # allows to use different .env file for each lightrag instance # the OS environment variables take precedence over the .env file @@ -382,18 +396,23 @@ async def openai_complete_if_cache( ) # Ensure resources are released even if no exception occurs - if ( - iteration_started - and hasattr(response, "aclose") - and callable(getattr(response, "aclose", None)) - ): - try: - await response.aclose() - logger.debug("Successfully closed stream response") - except Exception as close_error: - logger.warning( - f"Failed to close stream response in finally block: {close_error}" - ) + # Note: Some wrapped clients (e.g., Langfuse) may not implement aclose() properly + if iteration_started and hasattr(response, "aclose"): + aclose_method = getattr(response, "aclose", None) + if callable(aclose_method): + try: + await response.aclose() + logger.debug("Successfully closed stream response") + except (AttributeError, TypeError) as close_error: + # Some wrapper objects may report hasattr(aclose) but fail when called + # This is expected behavior for certain client wrappers + logger.debug( + f"Stream response cleanup not supported by client wrapper: {close_error}" + ) + except Exception as close_error: + logger.warning( + f"Unexpected error during stream response cleanup: {close_error}" + ) # This prevents resource leaks since the caller doesn't handle closing try: