From 2debc9288dd1052552cdc868b7731fd3a6e98c12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20MANSUY?= Date: Thu, 4 Dec 2025 19:19:00 +0800 Subject: [PATCH] cherry-pick 6d61f70b --- lightrag/evaluation/eval_rag_quality.py | 411 ++++++++++++++++++------ 1 file changed, 319 insertions(+), 92 deletions(-) diff --git a/lightrag/evaluation/eval_rag_quality.py b/lightrag/evaluation/eval_rag_quality.py index d05005c8..d1889f34 100644 --- a/lightrag/evaluation/eval_rag_quality.py +++ b/lightrag/evaluation/eval_rag_quality.py @@ -16,6 +16,12 @@ Usage: Results are saved to: lightrag/evaluation/results/ - results_YYYYMMDD_HHMMSS.csv (CSV export for analysis) - results_YYYYMMDD_HHMMSS.json (Full results with details) + +Note on Custom OpenAI-Compatible Endpoints: + This script uses bypass_n=True mode for answer_relevancy metric to ensure + compatibility with custom endpoints that may not support OpenAI's 'n' parameter + for multiple completions. This generates multiple outputs through repeated prompts + instead, maintaining evaluation quality while supporting broader endpoint compatibility. """ import asyncio @@ -36,46 +42,31 @@ from lightrag.utils import logger # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent.parent.parent)) -# Load .env from project root -project_root = Path(__file__).parent.parent.parent -load_dotenv(project_root / ".env") - -# Setup OpenAI API key (required for RAGAS evaluation) -# Use LLM_BINDING_API_KEY when running with the OpenAI binding - -llm_binding = os.getenv("LLM_BINDING", "").lower() -llm_binding_key = os.getenv("LLM_BINDING_API_KEY") - -# Validate LLM_BINDING is set to openai -if llm_binding != "openai": - logger.error( - "❌ LLM_BINDING must be set to 'openai'. Current value: '%s'", - llm_binding or "(not set)", - ) - sys.exit(1) - -# Validate LLM_BINDING_API_KEY exists -if not llm_binding_key: - logger.error("❌ LLM_BINDING_API_KEY is not set. Cannot run RAGAS evaluation.") - sys.exit(1) - -# Set OPENAI_API_KEY from LLM_BINDING_API_KEY -os.environ["OPENAI_API_KEY"] = llm_binding_key -logger.info("✅ LLM_BINDING: openai") +# use the .env that is inside the current folder +# allows to use different .env file for each lightrag instance +# the OS environment variables take precedence over the .env file +load_dotenv(dotenv_path=".env", override=False) +# Conditional imports - will raise ImportError if dependencies not installed try: from datasets import Dataset from ragas import evaluate from ragas.metrics import ( - answer_relevancy, - context_precision, - context_recall, - faithfulness, + AnswerRelevancy, + ContextPrecision, + ContextRecall, + Faithfulness, ) -except ImportError as e: - logger.error("❌ RAGAS import error: %s", e) - logger.error(" Install with: pip install ragas datasets") - sys.exit(1) + from ragas.llms import LangchainLLMWrapper + from langchain_openai import ChatOpenAI, OpenAIEmbeddings + + RAGAS_AVAILABLE = True + +except ImportError: + RAGAS_AVAILABLE = False + Dataset = None + evaluate = None + LangchainLLMWrapper = None CONNECT_TIMEOUT_SECONDS = 180.0 @@ -99,7 +90,75 @@ class RAGEvaluator: test_dataset_path: Path to test dataset JSON file rag_api_url: Base URL of LightRAG API (e.g., http://localhost:9621) If None, will try to read from environment or use default + + Environment Variables: + EVAL_LLM_MODEL: LLM model for evaluation (default: gpt-4o-mini) + EVAL_EMBEDDING_MODEL: Embedding model for evaluation (default: text-embedding-3-small) + EVAL_LLM_BINDING_API_KEY: API key for evaluation models (fallback to OPENAI_API_KEY) + EVAL_LLM_BINDING_HOST: Custom endpoint URL for evaluation models (optional) + + Raises: + ImportError: If ragas or datasets packages are not installed + EnvironmentError: If EVAL_LLM_BINDING_API_KEY and OPENAI_API_KEY are both not set """ + # Validate RAGAS dependencies are installed + if not RAGAS_AVAILABLE: + raise ImportError( + "RAGAS dependencies not installed. " + "Install with: pip install ragas datasets" + ) + + # Configure evaluation models (for RAGAS scoring) + eval_api_key = os.getenv("EVAL_LLM_BINDING_API_KEY") or os.getenv( + "OPENAI_API_KEY" + ) + if not eval_api_key: + raise EnvironmentError( + "EVAL_LLM_BINDING_API_KEY or OPENAI_API_KEY is required for evaluation. " + "Set EVAL_LLM_BINDING_API_KEY to use a custom API key, " + "or ensure OPENAI_API_KEY is set." + ) + + eval_model = os.getenv("EVAL_LLM_MODEL", "gpt-4.1") + eval_embedding_model = os.getenv( + "EVAL_EMBEDDING_MODEL", "text-embedding-3-large" + ) + eval_base_url = os.getenv("EVAL_LLM_BINDING_HOST") + + # Create LLM and Embeddings instances for RAGAS + llm_kwargs = { + "model": eval_model, + "api_key": eval_api_key, + "max_retries": int(os.getenv("EVAL_LLM_MAX_RETRIES", "5")), + "request_timeout": int(os.getenv("EVAL_LLM_TIMEOUT", "180")), + } + embedding_kwargs = {"model": eval_embedding_model, "api_key": eval_api_key} + + if eval_base_url: + llm_kwargs["base_url"] = eval_base_url + embedding_kwargs["base_url"] = eval_base_url + + # Create base LangChain LLM + base_llm = ChatOpenAI(**llm_kwargs) + self.eval_embeddings = OpenAIEmbeddings(**embedding_kwargs) + + # Wrap LLM with LangchainLLMWrapper and enable bypass_n mode for custom endpoints + # This ensures compatibility with endpoints that don't support the 'n' parameter + # by generating multiple outputs through repeated prompts instead of using 'n' parameter + try: + self.eval_llm = LangchainLLMWrapper( + langchain_llm=base_llm, + bypass_n=True, # Enable bypass_n to avoid passing 'n' to OpenAI API + ) + logger.debug("Successfully configured bypass_n mode for LLM wrapper") + except Exception as e: + logger.warning( + "Could not configure LangchainLLMWrapper with bypass_n: %s. " + "Using base LLM directly, which may cause warnings with custom endpoints.", + e, + ) + self.eval_llm = base_llm + if test_dataset_path is None: test_dataset_path = Path(__file__).parent / "sample_dataset.json" @@ -114,6 +173,41 @@ class RAGEvaluator: # Load test dataset self.test_cases = self._load_test_dataset() + # Store configuration values for display + self.eval_model = eval_model + self.eval_embedding_model = eval_embedding_model + self.eval_base_url = eval_base_url + self.eval_max_retries = llm_kwargs["max_retries"] + self.eval_timeout = llm_kwargs["request_timeout"] + + # Display configuration + self._display_configuration() + + def _display_configuration(self): + """Display all evaluation configuration settings""" + logger.info("EVALUATION CONFIGURATION") + + logger.info(" Evaluation Models:") + logger.info(" • LLM Model: %s", self.eval_model) + logger.info(" • Embedding Model: %s", self.eval_embedding_model) + if self.eval_base_url: + logger.info(" • Custom Endpoint: %s", self.eval_base_url) + logger.info(" • Bypass N-Parameter: Enabled (for compatibility)") + else: + logger.info(" • Endpoint: OpenAI Official API") + + logger.info(" Concurrency & Rate Limiting:") + query_top_k = int(os.getenv("EVAL_QUERY_TOP_K", "10")) + logger.info(" • Query Top-K: %s Entities/Relations", query_top_k) + logger.info(" • LLM Max Retries: %s", self.eval_max_retries) + logger.info(" • LLM Timeout: %s seconds", self.eval_timeout) + + logger.info(" Test Configuration:") + logger.info(" • Total Test Cases: %s", len(self.test_cases)) + logger.info(" • Test Dataset: %s", self.test_dataset_path.name) + logger.info(" • LightRAG API: %s", self.rag_api_url) + logger.info(" • Results Directory: %s", self.results_dir.name) + def _load_test_dataset(self) -> List[Dict[str, str]]: """Load test cases from JSON file""" if not self.test_dataset_path.exists(): @@ -150,13 +244,22 @@ class RAGEvaluator: "include_references": True, "include_chunk_content": True, # NEW: Request chunk content in references "response_type": "Multiple Paragraphs", - "top_k": 10, + "top_k": int(os.getenv("EVAL_QUERY_TOP_K", "10")), } + # Get API key from environment for authentication + api_key = os.getenv("LIGHTRAG_API_KEY") + + # Prepare headers with optional authentication + headers = {} + if api_key: + headers["X-API-Key"] = api_key + # Single optimized API call - gets both answer AND chunk content response = await client.post( f"{self.rag_api_url}/query", json=payload, + headers=headers if headers else None, ) response.raise_for_status() result = response.json() @@ -222,6 +325,7 @@ class RAGEvaluator: test_case: Dict[str, str], semaphore: asyncio.Semaphore, client: httpx.AsyncClient, + progress_counter: Dict[str, int], ) -> Dict[str, Any]: """ Evaluate a single test case with concurrency control @@ -231,34 +335,39 @@ class RAGEvaluator: test_case: Test case dictionary with question and ground_truth semaphore: Semaphore to control concurrency client: Shared httpx AsyncClient for connection pooling + progress_counter: Shared dictionary for progress tracking Returns: Evaluation result dictionary """ - total_cases = len(self.test_cases) - async with semaphore: question = test_case["question"] ground_truth = test_case["ground_truth"] - logger.info("[%s/%s] Evaluating: %s...", idx, total_cases, question[:60]) - # Generate RAG response by calling actual LightRAG API - rag_response = await self.generate_rag_response( - question=question, client=client - ) + try: + rag_response = await self.generate_rag_response( + question=question, client=client + ) + except Exception as e: + logger.error("Error generating response for test %s: %s", idx, str(e)) + progress_counter["completed"] += 1 + return { + "test_number": idx, + "question": question, + "error": str(e), + "metrics": {}, + "ragas_score": 0, + "timestamp": datetime.now().isoformat(), + } # *** CRITICAL FIX: Use actual retrieved contexts, NOT ground_truth *** retrieved_contexts = rag_response["contexts"] - # DEBUG: Print what was actually retrieved - logger.debug("📝 Retrieved %s contexts", len(retrieved_contexts)) - if retrieved_contexts: - logger.debug( - "📄 First context preview: %s...", retrieved_contexts[0][:100] - ) - else: - logger.warning("⚠️ No contexts retrieved!") + # DEBUG: Print what was actually retrieved (only in debug mode) + logger.debug( + "📝 Test %s: Retrieved %s contexts", idx, len(retrieved_contexts) + ) # Prepare dataset for RAGAS evaluation with CORRECT contexts eval_dataset = Dataset.from_dict( @@ -271,15 +380,19 @@ class RAGEvaluator: ) # Run RAGAS evaluation + # IMPORTANT: Create fresh metric instances for each evaluation to avoid + # concurrent state conflicts when multiple tasks run in parallel try: eval_results = evaluate( dataset=eval_dataset, metrics=[ - faithfulness, - answer_relevancy, - context_recall, - context_precision, + Faithfulness(), + AnswerRelevancy(), + ContextRecall(), + ContextPrecision(), ], + llm=self.eval_llm, + embeddings=self.eval_embeddings, ) # Convert to DataFrame (RAGAS v0.3+ API) @@ -290,6 +403,7 @@ class RAGEvaluator: # Extract scores (RAGAS v0.3+ uses .to_pandas()) result = { + "test_number": idx, "question": question, "answer": rag_response["answer"][:200] + "..." if len(rag_response["answer"]) > 200 @@ -297,7 +411,7 @@ class RAGEvaluator: "ground_truth": ground_truth[:200] + "..." if len(ground_truth) > 200 else ground_truth, - "project": test_case.get("project_context", "unknown"), + "project": test_case.get("project", "unknown"), "metrics": { "faithfulness": float(scores_row.get("faithfulness", 0)), "answer_relevance": float( @@ -311,22 +425,24 @@ class RAGEvaluator: "timestamp": datetime.now().isoformat(), } - # Calculate RAGAS score (average of all metrics) + # Calculate RAGAS score (average of all metrics, excluding NaN values) metrics = result["metrics"] - ragas_score = sum(metrics.values()) / len(metrics) if metrics else 0 + valid_metrics = [v for v in metrics.values() if not _is_nan(v)] + ragas_score = ( + sum(valid_metrics) / len(valid_metrics) if valid_metrics else 0 + ) result["ragas_score"] = round(ragas_score, 4) - logger.info("✅ Faithfulness: %.4f", metrics["faithfulness"]) - logger.info("✅ Answer Relevance: %.4f", metrics["answer_relevance"]) - logger.info("✅ Context Recall: %.4f", metrics["context_recall"]) - logger.info("✅ Context Precision: %.4f", metrics["context_precision"]) - logger.info("📊 RAGAS Score: %.4f", result["ragas_score"]) + # Update progress counter + progress_counter["completed"] += 1 return result except Exception as e: - logger.exception("❌ Error evaluating: %s", e) + logger.error("Error evaluating test %s: %s", idx, str(e)) + progress_counter["completed"] += 1 return { + "test_number": idx, "question": question, "error": str(e), "metrics": {}, @@ -341,18 +457,21 @@ class RAGEvaluator: Returns: List of evaluation results with metrics """ - # Get MAX_ASYNC from environment (default to 4 if not set) - max_async = int(os.getenv("MAX_ASYNC", "4")) + # Get evaluation concurrency from environment (default to 1 for serial evaluation) + max_async = int(os.getenv("EVAL_MAX_CONCURRENT", "3")) logger.info("") logger.info("%s", "=" * 70) logger.info("🚀 Starting RAGAS Evaluation of Portfolio RAG System") - logger.info("🔧 Parallel evaluations: %s", max_async) + logger.info("🔧 Concurrent evaluations: %s", max_async) logger.info("%s", "=" * 70) # Create semaphore to limit concurrent evaluations semaphore = asyncio.Semaphore(max_async) + # Create progress counter (shared across all tasks) + progress_counter = {"completed": 0} + # Create shared HTTP client with connection pooling and proper timeouts # Timeout: 3 minutes for connect, 5 minutes for read (LLM can be slow) timeout = httpx.Timeout( @@ -368,7 +487,9 @@ class RAGEvaluator: async with httpx.AsyncClient(timeout=timeout, limits=limits) as client: # Create tasks for all test cases tasks = [ - self.evaluate_single_case(idx, test_case, semaphore, client) + self.evaluate_single_case( + idx, test_case, semaphore, client, progress_counter + ) for idx, test_case in enumerate(self.test_cases, 1) ] @@ -437,6 +558,95 @@ class RAGEvaluator: return csv_path + def _format_metric(self, value: float, width: int = 6) -> str: + """ + Format a metric value for display, handling NaN gracefully + + Args: + value: The metric value to format + width: The width of the formatted string + + Returns: + Formatted string (e.g., "0.8523" or " N/A ") + """ + if _is_nan(value): + return "N/A".center(width) + return f"{value:.4f}".rjust(width) + + def _display_results_table(self, results: List[Dict[str, Any]]): + """ + Display evaluation results in a formatted table + + Args: + results: List of evaluation results + """ + logger.info("") + logger.info("%s", "=" * 115) + logger.info("📊 EVALUATION RESULTS SUMMARY") + logger.info("%s", "=" * 115) + + # Table header + logger.info( + "%-4s | %-50s | %6s | %7s | %6s | %7s | %6s | %6s", + "#", + "Question", + "Faith", + "AnswRel", + "CtxRec", + "CtxPrec", + "RAGAS", + "Status", + ) + logger.info("%s", "-" * 115) + + # Table rows + for result in results: + test_num = result.get("test_number", 0) + question = result.get("question", "") + # Truncate question to 50 chars + question_display = ( + (question[:47] + "...") if len(question) > 50 else question + ) + + metrics = result.get("metrics", {}) + if metrics: + # Success case - format each metric, handling NaN values + faith = metrics.get("faithfulness", 0) + ans_rel = metrics.get("answer_relevance", 0) + ctx_rec = metrics.get("context_recall", 0) + ctx_prec = metrics.get("context_precision", 0) + ragas = result.get("ragas_score", 0) + status = "✓" + + logger.info( + "%-4d | %-50s | %s | %s | %s | %s | %s | %6s", + test_num, + question_display, + self._format_metric(faith, 6), + self._format_metric(ans_rel, 7), + self._format_metric(ctx_rec, 6), + self._format_metric(ctx_prec, 7), + self._format_metric(ragas, 6), + status, + ) + else: + # Error case + error = result.get("error", "Unknown error") + error_display = (error[:20] + "...") if len(error) > 23 else error + logger.info( + "%-4d | %-50s | %6s | %7s | %6s | %7s | %6s | ✗ %s", + test_num, + question_display, + "N/A", + "N/A", + "N/A", + "N/A", + "N/A", + error_display, + ) + + logger.info("%s", "=" * 115) + def _calculate_benchmark_stats( self, results: List[Dict[str, Any]] ) -> Dict[str, Any]: @@ -463,45 +673,55 @@ class RAGEvaluator: "success_rate": 0.0, } - # Calculate averages for each metric (handling NaN values) - metrics_sum = { - "faithfulness": 0.0, - "answer_relevance": 0.0, - "context_recall": 0.0, - "context_precision": 0.0, - "ragas_score": 0.0, + # Calculate averages for each metric (handling NaN values correctly) + # Track both sum and count for each metric to handle NaN values properly + metrics_data = { + "faithfulness": {"sum": 0.0, "count": 0}, + "answer_relevance": {"sum": 0.0, "count": 0}, + "context_recall": {"sum": 0.0, "count": 0}, + "context_precision": {"sum": 0.0, "count": 0}, + "ragas_score": {"sum": 0.0, "count": 0}, } for result in valid_results: metrics = result.get("metrics", {}) - # Skip NaN values when summing + + # For each metric, sum non-NaN values and count them faithfulness = metrics.get("faithfulness", 0) if not _is_nan(faithfulness): - metrics_sum["faithfulness"] += faithfulness + metrics_data["faithfulness"]["sum"] += faithfulness + metrics_data["faithfulness"]["count"] += 1 answer_relevance = metrics.get("answer_relevance", 0) if not _is_nan(answer_relevance): - metrics_sum["answer_relevance"] += answer_relevance + metrics_data["answer_relevance"]["sum"] += answer_relevance + metrics_data["answer_relevance"]["count"] += 1 context_recall = metrics.get("context_recall", 0) if not _is_nan(context_recall): - metrics_sum["context_recall"] += context_recall + metrics_data["context_recall"]["sum"] += context_recall + metrics_data["context_recall"]["count"] += 1 context_precision = metrics.get("context_precision", 0) if not _is_nan(context_precision): - metrics_sum["context_precision"] += context_precision + metrics_data["context_precision"]["sum"] += context_precision + metrics_data["context_precision"]["count"] += 1 ragas_score = result.get("ragas_score", 0) if not _is_nan(ragas_score): - metrics_sum["ragas_score"] += ragas_score + metrics_data["ragas_score"]["sum"] += ragas_score + metrics_data["ragas_score"]["count"] += 1 - # Calculate averages - n = len(valid_results) + # Calculate averages using actual counts for each metric avg_metrics = {} - for k, v in metrics_sum.items(): - avg_val = v / n if n > 0 else 0 - # Handle NaN in average - avg_metrics[k] = round(avg_val, 4) if not _is_nan(avg_val) else 0.0 + for metric_name, data in metrics_data.items(): + if data["count"] > 0: + avg_val = data["sum"] / data["count"] + avg_metrics[metric_name] = ( + round(avg_val, 4) if not _is_nan(avg_val) else 0.0 + ) + else: + avg_metrics[metric_name] = 0.0 # Find min and max RAGAS scores (filter out NaN) ragas_scores = [] @@ -534,6 +754,19 @@ class RAGEvaluator: elapsed_time = time.time() - start_time + # Add a small delay to ensure all buffered output is completely written + await asyncio.sleep(0.2) + # Flush all output buffers to ensure RAGAS progress bars are fully displayed + sys.stdout.flush() + sys.stderr.flush() + + await asyncio.sleep(0.2) + sys.stderr.write("\n") + sys.stderr.flush() + + # Display results table + self._display_results_table(results) + # Calculate benchmark statistics benchmark_stats = self._calculate_benchmark_stats(results) @@ -619,15 +852,9 @@ async def main(): if len(sys.argv) > 1: rag_api_url = sys.argv[1] - logger.info("") logger.info("%s", "=" * 70) logger.info("🔍 RAGAS Evaluation - Using Real LightRAG API") logger.info("%s", "=" * 70) - if rag_api_url: - logger.info("📡 RAG API URL: %s", rag_api_url) - else: - logger.info("📡 RAG API URL: http://localhost:9621 (default)") - logger.info("%s", "=" * 70) evaluator = RAGEvaluator(rag_api_url=rag_api_url) await evaluator.run()