From 6b0fe03dcf09f4c9f94b0d96faee875801b8877a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20MANSUY?= Date: Thu, 4 Dec 2025 19:19:23 +0800 Subject: [PATCH] cherry-pick 06b91d00 --- lightrag/evaluation/eval_rag_quality.py | 309 ++++++++++++++++-------- 1 file changed, 205 insertions(+), 104 deletions(-) diff --git a/lightrag/evaluation/eval_rag_quality.py b/lightrag/evaluation/eval_rag_quality.py index d8f95c7e..5c49b631 100644 --- a/lightrag/evaluation/eval_rag_quality.py +++ b/lightrag/evaluation/eval_rag_quality.py @@ -9,9 +9,22 @@ Evaluates RAG response quality using RAGAS metrics: - Context Precision: Is retrieved context clean without noise? Usage: + # Use defaults (sample_dataset.json, http://localhost:9621) python lightrag/evaluation/eval_rag_quality.py - python lightrag/evaluation/eval_rag_quality.py http://localhost:9621 - python lightrag/evaluation/eval_rag_quality.py http://your-rag-server.com:9621 + + # Specify custom dataset + python lightrag/evaluation/eval_rag_quality.py --dataset my_test.json + python lightrag/evaluation/eval_rag_quality.py -d my_test.json + + # Specify custom RAG endpoint + python lightrag/evaluation/eval_rag_quality.py --ragendpoint http://my-server.com:9621 + python lightrag/evaluation/eval_rag_quality.py -r http://my-server.com:9621 + + # Specify both + python lightrag/evaluation/eval_rag_quality.py -d my_test.json -r http://localhost:9621 + + # Get help + python lightrag/evaluation/eval_rag_quality.py --help Results are saved to: lightrag/evaluation/results/ - results_YYYYMMDD_HHMMSS.csv (CSV export for analysis) @@ -24,6 +37,7 @@ Technical Notes: - Deprecation warnings are suppressed for cleaner output """ +import argparse import asyncio import csv import json @@ -68,6 +82,7 @@ try: ) from ragas.llms import LangchainLLMWrapper from langchain_openai import ChatOpenAI, OpenAIEmbeddings + from tqdm.auto import tqdm RAGAS_AVAILABLE = True @@ -199,7 +214,9 @@ class RAGEvaluator: logger.info(" • Embedding Model: %s", self.eval_embedding_model) if self.eval_base_url: logger.info(" • Custom Endpoint: %s", self.eval_base_url) - logger.info(" • Bypass N-Parameter: Enabled (use LangchainLLMWrapperfor compatibility)") + logger.info( + " • Bypass N-Parameter: Enabled (use LangchainLLMWrapperfor compatibility)" + ) else: logger.info(" • Endpoint: OpenAI Official API") @@ -330,28 +347,36 @@ class RAGEvaluator: self, idx: int, test_case: Dict[str, str], - semaphore: asyncio.Semaphore, + rag_semaphore: asyncio.Semaphore, + eval_semaphore: asyncio.Semaphore, client: httpx.AsyncClient, progress_counter: Dict[str, int], + position_pool: asyncio.Queue, + pbar_creation_lock: asyncio.Lock, ) -> Dict[str, Any]: """ - Evaluate a single test case with concurrency control + Evaluate a single test case with two-stage pipeline concurrency control Args: idx: Test case index (1-based) test_case: Test case dictionary with question and ground_truth - semaphore: Semaphore to control concurrency + rag_semaphore: Semaphore to control overall concurrency (covers entire function) + eval_semaphore: Semaphore to control RAGAS evaluation concurrency (Stage 2) client: Shared httpx AsyncClient for connection pooling progress_counter: Shared dictionary for progress tracking + position_pool: Queue of available tqdm position indices + pbar_creation_lock: Lock to serialize tqdm creation and prevent race conditions Returns: Evaluation result dictionary """ - async with semaphore: + # rag_semaphore controls the entire evaluation process to prevent + # all RAG responses from being generated at once when eval is slow + async with rag_semaphore: question = test_case["question"] ground_truth = test_case["ground_truth"] - # Generate RAG response by calling actual LightRAG API + # Stage 1: Generate RAG response try: rag_response = await self.generate_rag_response( question=question, client=client @@ -371,11 +396,6 @@ class RAGEvaluator: # *** CRITICAL FIX: Use actual retrieved contexts, NOT ground_truth *** retrieved_contexts = rag_response["contexts"] - # DEBUG: Print what was actually retrieved (only in debug mode) - logger.debug( - "📝 Test %s: Retrieved %s contexts", idx, len(retrieved_contexts) - ) - # Prepare dataset for RAGAS evaluation with CORRECT contexts eval_dataset = Dataset.from_dict( { @@ -386,98 +406,142 @@ class RAGEvaluator: } ) - # Run RAGAS evaluation + # Stage 2: Run RAGAS evaluation (controlled by eval_semaphore) # IMPORTANT: Create fresh metric instances for each evaluation to avoid # concurrent state conflicts when multiple tasks run in parallel - try: - eval_results = evaluate( - dataset=eval_dataset, - metrics=[ - Faithfulness(), - AnswerRelevancy(), - ContextRecall(), - ContextPrecision(), - ], - llm=self.eval_llm, - embeddings=self.eval_embeddings, - ) + async with eval_semaphore: + pbar = None + position = None + try: + # Acquire a position from the pool for this tqdm progress bar + position = await position_pool.get() - # Convert to DataFrame (RAGAS v0.3+ API) - df = eval_results.to_pandas() + # Serialize tqdm creation to prevent race conditions + # Multiple tasks creating tqdm simultaneously can cause display conflicts + async with pbar_creation_lock: + # Create tqdm progress bar with assigned position to avoid overlapping + # leave=False ensures the progress bar is cleared after completion, + # preventing accumulation of completed bars and allowing position reuse + pbar = tqdm( + total=4, + desc=f"Eval-{idx:02d}", + position=position, + leave=False, + ) + # Give tqdm time to initialize and claim its screen position + await asyncio.sleep(0.05) - # Extract scores from first row - scores_row = df.iloc[0] + eval_results = evaluate( + dataset=eval_dataset, + metrics=[ + Faithfulness(), + AnswerRelevancy(), + ContextRecall(), + ContextPrecision(), + ], + llm=self.eval_llm, + embeddings=self.eval_embeddings, + _pbar=pbar, + ) - # Extract scores (RAGAS v0.3+ uses .to_pandas()) - result = { - "test_number": idx, - "question": question, - "answer": rag_response["answer"][:200] + "..." - if len(rag_response["answer"]) > 200 - else rag_response["answer"], - "ground_truth": ground_truth[:200] + "..." - if len(ground_truth) > 200 - else ground_truth, - "project": test_case.get("project", "unknown"), - "metrics": { - "faithfulness": float(scores_row.get("faithfulness", 0)), - "answer_relevance": float( - scores_row.get("answer_relevancy", 0) - ), - "context_recall": float(scores_row.get("context_recall", 0)), - "context_precision": float( - scores_row.get("context_precision", 0) - ), - }, - "timestamp": datetime.now().isoformat(), - } + # Convert to DataFrame (RAGAS v0.3+ API) + df = eval_results.to_pandas() - # Calculate RAGAS score (average of all metrics, excluding NaN values) - metrics = result["metrics"] - valid_metrics = [v for v in metrics.values() if not _is_nan(v)] - ragas_score = ( - sum(valid_metrics) / len(valid_metrics) if valid_metrics else 0 - ) - result["ragas_score"] = round(ragas_score, 4) + # Extract scores from first row + scores_row = df.iloc[0] - # Update progress counter - progress_counter["completed"] += 1 + # Extract scores (RAGAS v0.3+ uses .to_pandas()) + result = { + "test_number": idx, + "question": question, + "answer": rag_response["answer"][:200] + "..." + if len(rag_response["answer"]) > 200 + else rag_response["answer"], + "ground_truth": ground_truth[:200] + "..." + if len(ground_truth) > 200 + else ground_truth, + "project": test_case.get("project", "unknown"), + "metrics": { + "faithfulness": float(scores_row.get("faithfulness", 0)), + "answer_relevance": float( + scores_row.get("answer_relevancy", 0) + ), + "context_recall": float( + scores_row.get("context_recall", 0) + ), + "context_precision": float( + scores_row.get("context_precision", 0) + ), + }, + "timestamp": datetime.now().isoformat(), + } - return result + # Calculate RAGAS score (average of all metrics, excluding NaN values) + metrics = result["metrics"] + valid_metrics = [v for v in metrics.values() if not _is_nan(v)] + ragas_score = ( + sum(valid_metrics) / len(valid_metrics) if valid_metrics else 0 + ) + result["ragas_score"] = round(ragas_score, 4) - except Exception as e: - logger.error("Error evaluating test %s: %s", idx, str(e)) - progress_counter["completed"] += 1 - return { - "test_number": idx, - "question": question, - "error": str(e), - "metrics": {}, - "ragas_score": 0, - "timestamp": datetime.now().isoformat(), - } + # Update progress counter + progress_counter["completed"] += 1 + + return result + + except Exception as e: + logger.error("Error evaluating test %s: %s", idx, str(e)) + progress_counter["completed"] += 1 + return { + "test_number": idx, + "question": question, + "error": str(e), + "metrics": {}, + "ragas_score": 0, + "timestamp": datetime.now().isoformat(), + } + finally: + # Force close progress bar to ensure completion + if pbar is not None: + pbar.close() + # Release the position back to the pool for reuse + if position is not None: + await position_pool.put(position) async def evaluate_responses(self) -> List[Dict[str, Any]]: """ - Evaluate all test cases in parallel and return metrics + Evaluate all test cases in parallel with two-stage pipeline and return metrics Returns: List of evaluation results with metrics """ - # Get evaluation concurrency from environment (default to 1 for serial evaluation) - max_async = int(os.getenv("EVAL_MAX_CONCURRENT", "3")) + # Get evaluation concurrency from environment (default to 2 for parallel evaluation) + max_async = int(os.getenv("EVAL_MAX_CONCURRENT", "2")) logger.info("%s", "=" * 70) logger.info("🚀 Starting RAGAS Evaluation of LightRAG System") - logger.info("🔧 Concurrent evaluations: %s", max_async) + logger.info("🔧 RAGAS Evaluation (Stage 2): %s concurrent", max_async) logger.info("%s", "=" * 70) - # Create semaphore to limit concurrent evaluations - semaphore = asyncio.Semaphore(max_async) + # Create two-stage pipeline semaphores + # Stage 1: RAG generation - allow x2 concurrency to keep evaluation fed + rag_semaphore = asyncio.Semaphore(max_async * 2) + # Stage 2: RAGAS evaluation - primary bottleneck + eval_semaphore = asyncio.Semaphore(max_async) # Create progress counter (shared across all tasks) progress_counter = {"completed": 0} + # Create position pool for tqdm progress bars + # Positions range from 0 to max_async-1, ensuring no overlapping displays + position_pool = asyncio.Queue() + for i in range(max_async): + await position_pool.put(i) + + # Create lock to serialize tqdm creation and prevent race conditions + # This ensures progress bars are created one at a time, avoiding display conflicts + pbar_creation_lock = asyncio.Lock() + # Create shared HTTP client with connection pooling and proper timeouts # Timeout: 3 minutes for connect, 5 minutes for read (LLM can be slow) timeout = httpx.Timeout( @@ -486,20 +550,27 @@ class RAGEvaluator: read=READ_TIMEOUT_SECONDS, ) limits = httpx.Limits( - max_connections=max_async * 2, # Allow some buffer - max_keepalive_connections=max_async, + max_connections=(max_async + 1) * 2, # Allow buffer for RAG stage + max_keepalive_connections=max_async + 1, ) async with httpx.AsyncClient(timeout=timeout, limits=limits) as client: # Create tasks for all test cases tasks = [ self.evaluate_single_case( - idx, test_case, semaphore, client, progress_counter + idx, + test_case, + rag_semaphore, + eval_semaphore, + client, + progress_counter, + position_pool, + pbar_creation_lock, ) for idx, test_case in enumerate(self.test_cases, 1) ] - # Run all evaluations in parallel (limited by semaphore) + # Run all evaluations in parallel (limited by two-stage semaphores) results = await asyncio.gather(*tasks) return list(results) @@ -759,19 +830,6 @@ class RAGEvaluator: elapsed_time = time.time() - start_time - # Add a small delay to ensure all buffered output is completely written - await asyncio.sleep(0.5) - # Flush all output buffers to ensure RAGAS progress bars are fully displayed - sys.stdout.flush() - sys.stderr.flush() - sys.stdout.write("\n") - sys.stderr.write("\n") - sys.stdout.flush() - sys.stderr.flush() - - # Display results table - self._display_results_table(results) - # Calculate benchmark statistics benchmark_stats = self._calculate_benchmark_stats(results) @@ -791,6 +849,10 @@ class RAGEvaluator: ) with open(json_path, "w") as f: json.dump(summary, f, indent=2) + + # Display results table + self._display_results_table(results) + logger.info("✅ JSON results saved to: %s", json_path) # Export to CSV @@ -846,22 +908,61 @@ async def main(): """ Main entry point for RAGAS evaluation + Command-line arguments: + --dataset, -d: Path to test dataset JSON file (default: sample_dataset.json) + --ragendpoint, -r: LightRAG API endpoint URL (default: http://localhost:9621 or $LIGHTRAG_API_URL) + Usage: python lightrag/evaluation/eval_rag_quality.py - python lightrag/evaluation/eval_rag_quality.py http://localhost:9621 - python lightrag/evaluation/eval_rag_quality.py http://your-server.com:9621 + python lightrag/evaluation/eval_rag_quality.py --dataset my_test.json + python lightrag/evaluation/eval_rag_quality.py -d my_test.json -r http://localhost:9621 """ try: - # Get RAG API URL from command line or environment - rag_api_url = None - if len(sys.argv) > 1: - rag_api_url = sys.argv[1] + # Parse command-line arguments + parser = argparse.ArgumentParser( + description="RAGAS Evaluation Script for LightRAG System", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Use defaults + python lightrag/evaluation/eval_rag_quality.py + + # Specify custom dataset + python lightrag/evaluation/eval_rag_quality.py --dataset my_test.json + + # Specify custom RAG endpoint + python lightrag/evaluation/eval_rag_quality.py --ragendpoint http://my-server.com:9621 + + # Specify both + python lightrag/evaluation/eval_rag_quality.py -d my_test.json -r http://localhost:9621 + """, + ) + + parser.add_argument( + "--dataset", + "-d", + type=str, + default=None, + help="Path to test dataset JSON file (default: sample_dataset.json in evaluation directory)", + ) + + parser.add_argument( + "--ragendpoint", + "-r", + type=str, + default=None, + help="LightRAG API endpoint URL (default: http://localhost:9621 or $LIGHTRAG_API_URL environment variable)", + ) + + args = parser.parse_args() logger.info("%s", "=" * 70) logger.info("🔍 RAGAS Evaluation - Using Real LightRAG API") logger.info("%s", "=" * 70) - evaluator = RAGEvaluator(rag_api_url=rag_api_url) + evaluator = RAGEvaluator( + test_dataset_path=args.dataset, rag_api_url=args.ragendpoint + ) await evaluator.run() except Exception as e: logger.exception("❌ Error: %s", e)