From 41c26a3677997fd04a2166cfd5e70f2c390bf4b3 Mon Sep 17 00:00:00 2001 From: yangdx Date: Tue, 4 Nov 2025 21:40:27 +0800 Subject: [PATCH 1/6] feat: add command-line args to RAG evaluation script - Add --dataset and --ragendpoint flags - Support short forms -d and -r - Update README with usage examples --- lightrag/evaluation/README.md | 66 ++++++++++++++- lightrag/evaluation/eval_rag_quality.py | 102 ++++++++++++++++++------ 2 files changed, 141 insertions(+), 27 deletions(-) diff --git a/lightrag/evaluation/README.md b/lightrag/evaluation/README.md index 7beed38e..0296e305 100644 --- a/lightrag/evaluation/README.md +++ b/lightrag/evaluation/README.md @@ -60,15 +60,30 @@ pip install -e ".[offline-llm]" ### 2. Run Evaluation +**Basic usage (uses defaults):** ```bash cd /path/to/LightRAG -python -m lightrag.evaluation.eval_rag_quality +python lightrag/evaluation/eval_rag_quality.py ``` -Or directly: - +**Specify custom dataset:** ```bash -python lightrag/evaluation/eval_rag_quality.py +python lightrag/evaluation/eval_rag_quality.py --dataset my_test.json +``` + +**Specify custom RAG endpoint:** +```bash +python lightrag/evaluation/eval_rag_quality.py --ragendpoint http://my-server.com:9621 +``` + +**Specify both (short form):** +```bash +python lightrag/evaluation/eval_rag_quality.py -d my_test.json -r http://localhost:9621 +``` + +**Get help:** +```bash +python lightrag/evaluation/eval_rag_quality.py --help ``` ### 3. View Results @@ -89,6 +104,49 @@ results/ --- +## 📋 Command-Line Arguments + +The evaluation script supports command-line arguments for easy configuration: + +| Argument | Short | Default | Description | +|----------|-------|---------|-------------| +| `--dataset` | `-d` | `sample_dataset.json` | Path to test dataset JSON file | +| `--ragendpoint` | `-r` | `http://localhost:9621` or `$LIGHTRAG_API_URL` | LightRAG API endpoint URL | + +### Usage Examples + +**Use default dataset and endpoint:** +```bash +python lightrag/evaluation/eval_rag_quality.py +``` + +**Custom dataset with default endpoint:** +```bash +python lightrag/evaluation/eval_rag_quality.py --dataset path/to/my_dataset.json +``` + +**Default dataset with custom endpoint:** +```bash +python lightrag/evaluation/eval_rag_quality.py --ragendpoint http://my-server.com:9621 +``` + +**Custom dataset and endpoint:** +```bash +python lightrag/evaluation/eval_rag_quality.py -d my_dataset.json -r http://localhost:9621 +``` + +**Absolute path to dataset:** +```bash +python lightrag/evaluation/eval_rag_quality.py -d /path/to/custom_dataset.json +``` + +**Show help message:** +```bash +python lightrag/evaluation/eval_rag_quality.py --help +``` + +--- + ## ⚙️ Configuration ### Environment Variables diff --git a/lightrag/evaluation/eval_rag_quality.py b/lightrag/evaluation/eval_rag_quality.py index d8f95c7e..b5008ea0 100644 --- a/lightrag/evaluation/eval_rag_quality.py +++ b/lightrag/evaluation/eval_rag_quality.py @@ -9,9 +9,22 @@ Evaluates RAG response quality using RAGAS metrics: - Context Precision: Is retrieved context clean without noise? Usage: + # Use defaults (sample_dataset.json, http://localhost:9621) python lightrag/evaluation/eval_rag_quality.py - python lightrag/evaluation/eval_rag_quality.py http://localhost:9621 - python lightrag/evaluation/eval_rag_quality.py http://your-rag-server.com:9621 + + # Specify custom dataset + python lightrag/evaluation/eval_rag_quality.py --dataset my_test.json + python lightrag/evaluation/eval_rag_quality.py -d my_test.json + + # Specify custom RAG endpoint + python lightrag/evaluation/eval_rag_quality.py --ragendpoint http://my-server.com:9621 + python lightrag/evaluation/eval_rag_quality.py -r http://my-server.com:9621 + + # Specify both + python lightrag/evaluation/eval_rag_quality.py -d my_test.json -r http://localhost:9621 + + # Get help + python lightrag/evaluation/eval_rag_quality.py --help Results are saved to: lightrag/evaluation/results/ - results_YYYYMMDD_HHMMSS.csv (CSV export for analysis) @@ -24,6 +37,7 @@ Technical Notes: - Deprecation warnings are suppressed for cleaner output """ +import argparse import asyncio import csv import json @@ -199,7 +213,9 @@ class RAGEvaluator: logger.info(" • Embedding Model: %s", self.eval_embedding_model) if self.eval_base_url: logger.info(" • Custom Endpoint: %s", self.eval_base_url) - logger.info(" • Bypass N-Parameter: Enabled (use LangchainLLMWrapperfor compatibility)") + logger.info( + " • Bypass N-Parameter: Enabled (use LangchainLLMWrapperfor compatibility)" + ) else: logger.info(" • Endpoint: OpenAI Official API") @@ -759,19 +775,6 @@ class RAGEvaluator: elapsed_time = time.time() - start_time - # Add a small delay to ensure all buffered output is completely written - await asyncio.sleep(0.5) - # Flush all output buffers to ensure RAGAS progress bars are fully displayed - sys.stdout.flush() - sys.stderr.flush() - sys.stdout.write("\n") - sys.stderr.write("\n") - sys.stdout.flush() - sys.stderr.flush() - - # Display results table - self._display_results_table(results) - # Calculate benchmark statistics benchmark_stats = self._calculate_benchmark_stats(results) @@ -791,6 +794,20 @@ class RAGEvaluator: ) with open(json_path, "w") as f: json.dump(summary, f, indent=2) + + # Add a small delay to ensure all buffered output is completely written + await asyncio.sleep(0.8) + # Flush all output buffers to ensure RAGAS progress bars are fully displayed + sys.stdout.flush() + sys.stderr.flush() + sys.stdout.write("\n") + sys.stderr.write("\n") + sys.stdout.flush() + sys.stderr.flush() + + # Display results table + self._display_results_table(results) + logger.info("✅ JSON results saved to: %s", json_path) # Export to CSV @@ -846,22 +863,61 @@ async def main(): """ Main entry point for RAGAS evaluation + Command-line arguments: + --dataset, -d: Path to test dataset JSON file (default: sample_dataset.json) + --ragendpoint, -r: LightRAG API endpoint URL (default: http://localhost:9621 or $LIGHTRAG_API_URL) + Usage: python lightrag/evaluation/eval_rag_quality.py - python lightrag/evaluation/eval_rag_quality.py http://localhost:9621 - python lightrag/evaluation/eval_rag_quality.py http://your-server.com:9621 + python lightrag/evaluation/eval_rag_quality.py --dataset my_test.json + python lightrag/evaluation/eval_rag_quality.py -d my_test.json -r http://localhost:9621 """ try: - # Get RAG API URL from command line or environment - rag_api_url = None - if len(sys.argv) > 1: - rag_api_url = sys.argv[1] + # Parse command-line arguments + parser = argparse.ArgumentParser( + description="RAGAS Evaluation Script for LightRAG System", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Use defaults + python lightrag/evaluation/eval_rag_quality.py + + # Specify custom dataset + python lightrag/evaluation/eval_rag_quality.py --dataset my_test.json + + # Specify custom RAG endpoint + python lightrag/evaluation/eval_rag_quality.py --ragendpoint http://my-server.com:9621 + + # Specify both + python lightrag/evaluation/eval_rag_quality.py -d my_test.json -r http://localhost:9621 + """, + ) + + parser.add_argument( + "--dataset", + "-d", + type=str, + default=None, + help="Path to test dataset JSON file (default: sample_dataset.json in evaluation directory)", + ) + + parser.add_argument( + "--ragendpoint", + "-r", + type=str, + default=None, + help="LightRAG API endpoint URL (default: http://localhost:9621 or $LIGHTRAG_API_URL environment variable)", + ) + + args = parser.parse_args() logger.info("%s", "=" * 70) logger.info("🔍 RAGAS Evaluation - Using Real LightRAG API") logger.info("%s", "=" * 70) - evaluator = RAGEvaluator(rag_api_url=rag_api_url) + evaluator = RAGEvaluator( + test_dataset_path=args.dataset, rag_api_url=args.ragendpoint + ) await evaluator.run() except Exception as e: logger.exception("❌ Error: %s", e) From c358f405a9f3dfd688cd3da4caaaeb3f7b5fcb47 Mon Sep 17 00:00:00 2001 From: yangdx Date: Tue, 4 Nov 2025 22:17:17 +0800 Subject: [PATCH 2/6] Update evaluation defaults and expand sample dataset MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Lower concurrent evals from 3 to 2 • Standardize project names in samples • Add 3 new evaluation questions • Expand ground truth detail coverage • Improve dataset comprehensiveness --- env.example | 5 ++--- lightrag/evaluation/eval_rag_quality.py | 2 +- lightrag/evaluation/sample_dataset.json | 23 +++++++++++++++++++---- 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/env.example b/env.example index be214ae9..cf9d472a 100644 --- a/env.example +++ b/env.example @@ -407,10 +407,9 @@ MEMGRAPH_DATABASE=memgraph ### Custom endpoint for evaluation models (optional, for OpenAI-compatible services) # EVAL_LLM_BINDING_HOST=https://api.openai.com/v1 -### Evaluation concurrency and rate limiting -### Number of concurrent test case evaluations (default: 1 for serial evaluation) +### Number of concurrent test case evaluations ### Lower values reduce API rate limit issues but increase evaluation time -# EVAL_MAX_CONCURRENT=3 +# EVAL_MAX_CONCURRENT=2 ### TOP_K query parameter of LightRAG (default: 10) ### Number of entities or relations retrieved from KG # EVAL_QUERY_TOP_K=10 diff --git a/lightrag/evaluation/eval_rag_quality.py b/lightrag/evaluation/eval_rag_quality.py index b5008ea0..21f9b770 100644 --- a/lightrag/evaluation/eval_rag_quality.py +++ b/lightrag/evaluation/eval_rag_quality.py @@ -481,7 +481,7 @@ class RAGEvaluator: List of evaluation results with metrics """ # Get evaluation concurrency from environment (default to 1 for serial evaluation) - max_async = int(os.getenv("EVAL_MAX_CONCURRENT", "3")) + max_async = int(os.getenv("EVAL_MAX_CONCURRENT", "2")) logger.info("%s", "=" * 70) logger.info("🚀 Starting RAGAS Evaluation of LightRAG System") diff --git a/lightrag/evaluation/sample_dataset.json b/lightrag/evaluation/sample_dataset.json index 1968df23..0e452467 100644 --- a/lightrag/evaluation/sample_dataset.json +++ b/lightrag/evaluation/sample_dataset.json @@ -3,17 +3,32 @@ { "question": "How does LightRAG solve the hallucination problem in large language models?", "ground_truth": "LightRAG solves the hallucination problem by combining large language models with external knowledge retrieval. The framework ensures accurate responses by grounding LLM outputs in actual documents. LightRAG provides contextual responses that reduce hallucinations significantly.", - "project": "lightrag_overview" + "project": "lightrag_evaluation_sample" }, { "question": "What are the three main components required in a RAG system?", "ground_truth": "A RAG system requires three main components: a retrieval system (vector database or search engine) to find relevant documents, an embedding model to convert text into vector representations for similarity search, and a large language model (LLM) to generate responses based on retrieved context.", - "project": "rag_architecture" + "project": "lightrag_evaluation_sample" }, { "question": "How does LightRAG's retrieval performance compare to traditional RAG approaches?", - "ground_truth": "LightRAG delivers faster retrieval performance than traditional RAG approaches. The framework optimizes document retrieval operations for speed, while traditional RAG systems often suffer from slow query response times. LightRAG achieves high quality results with improved performance.", - "project": "lightrag_improvements" + "ground_truth": "LightRAG delivers faster retrieval performance than traditional RAG approaches. The framework optimizes document retrieval operations for speed. Traditional RAG systems often suffer from slow query response times. LightRAG achieves high quality results with improved performance. The framework combines speed with accuracy in retrieval operations, prioritizing ease of use without sacrificing quality.", + "project": "lightrag_evaluation_sample" + }, + { + "question": "What vector databases does LightRAG support and what are their key characteristics?", + "ground_truth": "LightRAG supports multiple vector databases including ChromaDB for simple deployment and efficient similarity search, Neo4j for graph-based knowledge representation with vector capabilities, Milvus for high-performance vector search at scale, Qdrant for fast similarity search with filtering and production-ready infrastructure, MongoDB Atlas for combined document storage and vector search, Redis for in-memory low-latency vector search, and a built-in nano-vectordb that eliminates external dependencies for small projects. This multi-database support enables developers to choose appropriate backends based on scale, performance, and infrastructure requirements.", + "project": "lightrag_evaluation_sample" + }, + { + "question": "What are the four key metrics for evaluating RAG system quality and what does each metric measure?", + "ground_truth": "RAG system quality is measured through four key metrics: Faithfulness measures whether answers are factually grounded in retrieved context and detects hallucinations. Answer Relevance measures how well answers address the user question and evaluates response appropriateness. Context Recall measures completeness of retrieval and whether all relevant information was retrieved from documents. Context Precision measures quality and relevance of retrieved documents without noise or irrelevant content.", + "project": "lightrag_evaluation_sample" + }, + { + "question": "What are the core benefits of LightRAG and how does it improve upon traditional RAG systems?", + "ground_truth": "LightRAG offers five core benefits: accuracy through document-grounded responses, up-to-date information without model retraining, domain expertise through specialized document collections, cost-effectiveness by avoiding expensive fine-tuning, and transparency by showing source documents. Compared to traditional RAG systems, LightRAG provides a simpler API with intuitive interfaces, faster retrieval performance with optimized operations, better integration with multiple vector database backends for flexible selection, and optimized prompting strategies with refined templates. LightRAG prioritizes ease of use while maintaining quality and combines speed with accuracy.", + "project": "lightrag_evaluation_sample" } ] } From d36be1f4995bc4a4c93f2250da30a81dff1e50fc Mon Sep 17 00:00:00 2001 From: yangdx Date: Wed, 5 Nov 2025 00:16:02 +0800 Subject: [PATCH 3/6] Improve RAGAS evaluation progress tracking and clean up output handling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Add tqdm progress bar for eval steps • Pass progress bar to RAGAS evaluate • Ensure progress bar cleanup in finally • Remove redundant output buffer flushes --- lightrag/evaluation/eval_rag_quality.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/lightrag/evaluation/eval_rag_quality.py b/lightrag/evaluation/eval_rag_quality.py index 21f9b770..292b07f9 100644 --- a/lightrag/evaluation/eval_rag_quality.py +++ b/lightrag/evaluation/eval_rag_quality.py @@ -82,6 +82,7 @@ try: ) from ragas.llms import LangchainLLMWrapper from langchain_openai import ChatOpenAI, OpenAIEmbeddings + from tqdm import tqdm RAGAS_AVAILABLE = True @@ -405,7 +406,11 @@ class RAGEvaluator: # Run RAGAS evaluation # IMPORTANT: Create fresh metric instances for each evaluation to avoid # concurrent state conflicts when multiple tasks run in parallel + pbar = None try: + # Create standard tqdm progress bar for RAGAS evaluation + pbar = tqdm(total=4, desc=f"Eval-{idx}", leave=True) + eval_results = evaluate( dataset=eval_dataset, metrics=[ @@ -416,6 +421,7 @@ class RAGEvaluator: ], llm=self.eval_llm, embeddings=self.eval_embeddings, + _pbar=pbar, ) # Convert to DataFrame (RAGAS v0.3+ API) @@ -472,6 +478,10 @@ class RAGEvaluator: "ragas_score": 0, "timestamp": datetime.now().isoformat(), } + finally: + # Force close progress bar to ensure completion + if pbar is not None: + pbar.close() async def evaluate_responses(self) -> List[Dict[str, Any]]: """ @@ -795,16 +805,6 @@ class RAGEvaluator: with open(json_path, "w") as f: json.dump(summary, f, indent=2) - # Add a small delay to ensure all buffered output is completely written - await asyncio.sleep(0.8) - # Flush all output buffers to ensure RAGAS progress bars are fully displayed - sys.stdout.flush() - sys.stderr.flush() - sys.stdout.write("\n") - sys.stderr.write("\n") - sys.stdout.flush() - sys.stderr.flush() - # Display results table self._display_results_table(results) From 83715a3ac12f30948e708271366834ae65cbc090 Mon Sep 17 00:00:00 2001 From: yangdx Date: Wed, 5 Nov 2025 00:36:09 +0800 Subject: [PATCH 4/6] Implement two-stage pipeline for RAG evaluation with separate semaphores MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Split RAG gen and eval stages • Add rag_semaphore for stage 1 • Add eval_semaphore for stage 2 • Improve concurrency control • Update connection pool limits --- lightrag/evaluation/eval_rag_quality.py | 80 ++++++++++++++----------- 1 file changed, 45 insertions(+), 35 deletions(-) diff --git a/lightrag/evaluation/eval_rag_quality.py b/lightrag/evaluation/eval_rag_quality.py index 292b07f9..b95265b0 100644 --- a/lightrag/evaluation/eval_rag_quality.py +++ b/lightrag/evaluation/eval_rag_quality.py @@ -347,28 +347,30 @@ class RAGEvaluator: self, idx: int, test_case: Dict[str, str], - semaphore: asyncio.Semaphore, + rag_semaphore: asyncio.Semaphore, + eval_semaphore: asyncio.Semaphore, client: httpx.AsyncClient, progress_counter: Dict[str, int], ) -> Dict[str, Any]: """ - Evaluate a single test case with concurrency control + Evaluate a single test case with two-stage pipeline concurrency control Args: idx: Test case index (1-based) test_case: Test case dictionary with question and ground_truth - semaphore: Semaphore to control concurrency + rag_semaphore: Semaphore to control RAG generation concurrency (Stage 1) + eval_semaphore: Semaphore to control RAGAS evaluation concurrency (Stage 2) client: Shared httpx AsyncClient for connection pooling progress_counter: Shared dictionary for progress tracking Returns: Evaluation result dictionary """ - async with semaphore: - question = test_case["question"] - ground_truth = test_case["ground_truth"] + question = test_case["question"] + ground_truth = test_case["ground_truth"] - # Generate RAG response by calling actual LightRAG API + # Stage 1: Generate RAG response (controlled by rag_semaphore) + async with rag_semaphore: try: rag_response = await self.generate_rag_response( question=question, client=client @@ -385,32 +387,31 @@ class RAGEvaluator: "timestamp": datetime.now().isoformat(), } - # *** CRITICAL FIX: Use actual retrieved contexts, NOT ground_truth *** - retrieved_contexts = rag_response["contexts"] + # *** CRITICAL FIX: Use actual retrieved contexts, NOT ground_truth *** + retrieved_contexts = rag_response["contexts"] - # DEBUG: Print what was actually retrieved (only in debug mode) - logger.debug( - "📝 Test %s: Retrieved %s contexts", idx, len(retrieved_contexts) - ) + # DEBUG: Print what was actually retrieved (only in debug mode) + logger.debug("📝 Test %s: Retrieved %s contexts", idx, len(retrieved_contexts)) - # Prepare dataset for RAGAS evaluation with CORRECT contexts - eval_dataset = Dataset.from_dict( - { - "question": [question], - "answer": [rag_response["answer"]], - "contexts": [retrieved_contexts], - "ground_truth": [ground_truth], - } - ) + # Prepare dataset for RAGAS evaluation with CORRECT contexts + eval_dataset = Dataset.from_dict( + { + "question": [question], + "answer": [rag_response["answer"]], + "contexts": [retrieved_contexts], + "ground_truth": [ground_truth], + } + ) - # Run RAGAS evaluation - # IMPORTANT: Create fresh metric instances for each evaluation to avoid - # concurrent state conflicts when multiple tasks run in parallel + # Stage 2: Run RAGAS evaluation (controlled by eval_semaphore) + # IMPORTANT: Create fresh metric instances for each evaluation to avoid + # concurrent state conflicts when multiple tasks run in parallel + async with eval_semaphore: pbar = None try: # Create standard tqdm progress bar for RAGAS evaluation pbar = tqdm(total=4, desc=f"Eval-{idx}", leave=True) - + eval_results = evaluate( dataset=eval_dataset, metrics=[ @@ -485,21 +486,25 @@ class RAGEvaluator: async def evaluate_responses(self) -> List[Dict[str, Any]]: """ - Evaluate all test cases in parallel and return metrics + Evaluate all test cases in parallel with two-stage pipeline and return metrics Returns: List of evaluation results with metrics """ - # Get evaluation concurrency from environment (default to 1 for serial evaluation) + # Get evaluation concurrency from environment (default to 2 for parallel evaluation) max_async = int(os.getenv("EVAL_MAX_CONCURRENT", "2")) logger.info("%s", "=" * 70) logger.info("🚀 Starting RAGAS Evaluation of LightRAG System") - logger.info("🔧 Concurrent evaluations: %s", max_async) + logger.info("🔧 Two-Stage Pipeline Configuration:") + logger.info(" • RAGAS Evaluation (Stage 2): %s concurrent", max_async) logger.info("%s", "=" * 70) - # Create semaphore to limit concurrent evaluations - semaphore = asyncio.Semaphore(max_async) + # Create two-stage pipeline semaphores + # Stage 1: RAG generation - allow +1 concurrency to keep evaluation fed + rag_semaphore = asyncio.Semaphore(max_async + 1) + # Stage 2: RAGAS evaluation - primary bottleneck + eval_semaphore = asyncio.Semaphore(max_async) # Create progress counter (shared across all tasks) progress_counter = {"completed": 0} @@ -512,20 +517,25 @@ class RAGEvaluator: read=READ_TIMEOUT_SECONDS, ) limits = httpx.Limits( - max_connections=max_async * 2, # Allow some buffer - max_keepalive_connections=max_async, + max_connections=(max_async + 1) * 2, # Allow buffer for RAG stage + max_keepalive_connections=max_async + 1, ) async with httpx.AsyncClient(timeout=timeout, limits=limits) as client: # Create tasks for all test cases tasks = [ self.evaluate_single_case( - idx, test_case, semaphore, client, progress_counter + idx, + test_case, + rag_semaphore, + eval_semaphore, + client, + progress_counter, ) for idx, test_case in enumerate(self.test_cases, 1) ] - # Run all evaluations in parallel (limited by semaphore) + # Run all evaluations in parallel (limited by two-stage semaphores) results = await asyncio.gather(*tasks) return list(results) From e5abe9dd3dd7e782acd0a75c8e256e5b06cb57ac Mon Sep 17 00:00:00 2001 From: yangdx Date: Wed, 5 Nov 2025 01:07:53 +0800 Subject: [PATCH 5/6] Restructure semaphore control to manage entire evaluation pipeline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Move rag_semaphore to wrap full function • Increase RAG concurrency to 2x eval limit • Prevent memory buildup from slow evals • Keep eval_semaphore for RAGAS control --- lightrag/evaluation/eval_rag_quality.py | 198 ++++++++++++------------ 1 file changed, 101 insertions(+), 97 deletions(-) diff --git a/lightrag/evaluation/eval_rag_quality.py b/lightrag/evaluation/eval_rag_quality.py index b95265b0..0d9633b4 100644 --- a/lightrag/evaluation/eval_rag_quality.py +++ b/lightrag/evaluation/eval_rag_quality.py @@ -358,7 +358,7 @@ class RAGEvaluator: Args: idx: Test case index (1-based) test_case: Test case dictionary with question and ground_truth - rag_semaphore: Semaphore to control RAG generation concurrency (Stage 1) + rag_semaphore: Semaphore to control overall concurrency (covers entire function) eval_semaphore: Semaphore to control RAGAS evaluation concurrency (Stage 2) client: Shared httpx AsyncClient for connection pooling progress_counter: Shared dictionary for progress tracking @@ -366,11 +366,13 @@ class RAGEvaluator: Returns: Evaluation result dictionary """ - question = test_case["question"] - ground_truth = test_case["ground_truth"] - - # Stage 1: Generate RAG response (controlled by rag_semaphore) + # rag_semaphore controls the entire evaluation process to prevent + # all RAG responses from being generated at once when eval is slow async with rag_semaphore: + question = test_case["question"] + ground_truth = test_case["ground_truth"] + + # Stage 1: Generate RAG response try: rag_response = await self.generate_rag_response( question=question, client=client @@ -387,102 +389,104 @@ class RAGEvaluator: "timestamp": datetime.now().isoformat(), } - # *** CRITICAL FIX: Use actual retrieved contexts, NOT ground_truth *** - retrieved_contexts = rag_response["contexts"] + # *** CRITICAL FIX: Use actual retrieved contexts, NOT ground_truth *** + retrieved_contexts = rag_response["contexts"] - # DEBUG: Print what was actually retrieved (only in debug mode) - logger.debug("📝 Test %s: Retrieved %s contexts", idx, len(retrieved_contexts)) - - # Prepare dataset for RAGAS evaluation with CORRECT contexts - eval_dataset = Dataset.from_dict( - { - "question": [question], - "answer": [rag_response["answer"]], - "contexts": [retrieved_contexts], - "ground_truth": [ground_truth], - } - ) - - # Stage 2: Run RAGAS evaluation (controlled by eval_semaphore) - # IMPORTANT: Create fresh metric instances for each evaluation to avoid - # concurrent state conflicts when multiple tasks run in parallel - async with eval_semaphore: - pbar = None - try: - # Create standard tqdm progress bar for RAGAS evaluation - pbar = tqdm(total=4, desc=f"Eval-{idx}", leave=True) - - eval_results = evaluate( - dataset=eval_dataset, - metrics=[ - Faithfulness(), - AnswerRelevancy(), - ContextRecall(), - ContextPrecision(), - ], - llm=self.eval_llm, - embeddings=self.eval_embeddings, - _pbar=pbar, - ) - - # Convert to DataFrame (RAGAS v0.3+ API) - df = eval_results.to_pandas() - - # Extract scores from first row - scores_row = df.iloc[0] - - # Extract scores (RAGAS v0.3+ uses .to_pandas()) - result = { - "test_number": idx, - "question": question, - "answer": rag_response["answer"][:200] + "..." - if len(rag_response["answer"]) > 200 - else rag_response["answer"], - "ground_truth": ground_truth[:200] + "..." - if len(ground_truth) > 200 - else ground_truth, - "project": test_case.get("project", "unknown"), - "metrics": { - "faithfulness": float(scores_row.get("faithfulness", 0)), - "answer_relevance": float( - scores_row.get("answer_relevancy", 0) - ), - "context_recall": float(scores_row.get("context_recall", 0)), - "context_precision": float( - scores_row.get("context_precision", 0) - ), - }, - "timestamp": datetime.now().isoformat(), + # Prepare dataset for RAGAS evaluation with CORRECT contexts + eval_dataset = Dataset.from_dict( + { + "question": [question], + "answer": [rag_response["answer"]], + "contexts": [retrieved_contexts], + "ground_truth": [ground_truth], } + ) - # Calculate RAGAS score (average of all metrics, excluding NaN values) - metrics = result["metrics"] - valid_metrics = [v for v in metrics.values() if not _is_nan(v)] - ragas_score = ( - sum(valid_metrics) / len(valid_metrics) if valid_metrics else 0 - ) - result["ragas_score"] = round(ragas_score, 4) + # Stage 2: Run RAGAS evaluation (controlled by eval_semaphore) + # IMPORTANT: Create fresh metric instances for each evaluation to avoid + # concurrent state conflicts when multiple tasks run in parallel + async with eval_semaphore: + pbar = None + try: + # Create standard tqdm progress bar for RAGAS evaluation + pbar = tqdm(total=4, desc=f"Eval-{idx}", leave=True) - # Update progress counter - progress_counter["completed"] += 1 + eval_results = evaluate( + dataset=eval_dataset, + metrics=[ + Faithfulness(), + AnswerRelevancy(), + ContextRecall(), + ContextPrecision(), + ], + llm=self.eval_llm, + embeddings=self.eval_embeddings, + _pbar=pbar, + ) - return result - - except Exception as e: - logger.error("Error evaluating test %s: %s", idx, str(e)) - progress_counter["completed"] += 1 - return { - "test_number": idx, - "question": question, - "error": str(e), - "metrics": {}, - "ragas_score": 0, - "timestamp": datetime.now().isoformat(), - } - finally: - # Force close progress bar to ensure completion - if pbar is not None: pbar.close() + pbar = None + + # Convert to DataFrame (RAGAS v0.3+ API) + df = eval_results.to_pandas() + + # Extract scores from first row + scores_row = df.iloc[0] + + # Extract scores (RAGAS v0.3+ uses .to_pandas()) + result = { + "test_number": idx, + "question": question, + "answer": rag_response["answer"][:200] + "..." + if len(rag_response["answer"]) > 200 + else rag_response["answer"], + "ground_truth": ground_truth[:200] + "..." + if len(ground_truth) > 200 + else ground_truth, + "project": test_case.get("project", "unknown"), + "metrics": { + "faithfulness": float(scores_row.get("faithfulness", 0)), + "answer_relevance": float( + scores_row.get("answer_relevancy", 0) + ), + "context_recall": float( + scores_row.get("context_recall", 0) + ), + "context_precision": float( + scores_row.get("context_precision", 0) + ), + }, + "timestamp": datetime.now().isoformat(), + } + + # Calculate RAGAS score (average of all metrics, excluding NaN values) + metrics = result["metrics"] + valid_metrics = [v for v in metrics.values() if not _is_nan(v)] + ragas_score = ( + sum(valid_metrics) / len(valid_metrics) if valid_metrics else 0 + ) + result["ragas_score"] = round(ragas_score, 4) + + # Update progress counter + progress_counter["completed"] += 1 + + return result + + except Exception as e: + logger.error("Error evaluating test %s: %s", idx, str(e)) + progress_counter["completed"] += 1 + return { + "test_number": idx, + "question": question, + "error": str(e), + "metrics": {}, + "ragas_score": 0, + "timestamp": datetime.now().isoformat(), + } + finally: + # Force close progress bar to ensure completion + if pbar is not None: + pbar.close() async def evaluate_responses(self) -> List[Dict[str, Any]]: """ @@ -501,8 +505,8 @@ class RAGEvaluator: logger.info("%s", "=" * 70) # Create two-stage pipeline semaphores - # Stage 1: RAG generation - allow +1 concurrency to keep evaluation fed - rag_semaphore = asyncio.Semaphore(max_async + 1) + # Stage 1: RAG generation - allow x2 concurrency to keep evaluation fed + rag_semaphore = asyncio.Semaphore(max_async * 2) # Stage 2: RAGAS evaluation - primary bottleneck eval_semaphore = asyncio.Semaphore(max_async) From 2823f92fb6ee63125dc8ab037828883b45c149b6 Mon Sep 17 00:00:00 2001 From: yangdx Date: Wed, 5 Nov 2025 02:04:13 +0800 Subject: [PATCH 6/6] Fix tqdm progress bar conflicts in concurrent RAG evaluation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Add position pool for tqdm bars • Serialize tqdm creation with lock • Set leave=False to clear completed bars • Pass position/lock to eval tasks • Import tqdm.auto for better display --- lightrag/evaluation/eval_rag_quality.py | 41 +++++++++++++++++++++---- 1 file changed, 35 insertions(+), 6 deletions(-) diff --git a/lightrag/evaluation/eval_rag_quality.py b/lightrag/evaluation/eval_rag_quality.py index 0d9633b4..0338a4f0 100644 --- a/lightrag/evaluation/eval_rag_quality.py +++ b/lightrag/evaluation/eval_rag_quality.py @@ -82,7 +82,7 @@ try: ) from ragas.llms import LangchainLLMWrapper from langchain_openai import ChatOpenAI, OpenAIEmbeddings - from tqdm import tqdm + from tqdm.auto import tqdm RAGAS_AVAILABLE = True @@ -351,6 +351,8 @@ class RAGEvaluator: eval_semaphore: asyncio.Semaphore, client: httpx.AsyncClient, progress_counter: Dict[str, int], + position_pool: asyncio.Queue, + pbar_creation_lock: asyncio.Lock, ) -> Dict[str, Any]: """ Evaluate a single test case with two-stage pipeline concurrency control @@ -362,6 +364,8 @@ class RAGEvaluator: eval_semaphore: Semaphore to control RAGAS evaluation concurrency (Stage 2) client: Shared httpx AsyncClient for connection pooling progress_counter: Shared dictionary for progress tracking + position_pool: Queue of available tqdm position indices + pbar_creation_lock: Lock to serialize tqdm creation and prevent race conditions Returns: Evaluation result dictionary @@ -407,9 +411,22 @@ class RAGEvaluator: # concurrent state conflicts when multiple tasks run in parallel async with eval_semaphore: pbar = None + position = None try: - # Create standard tqdm progress bar for RAGAS evaluation - pbar = tqdm(total=4, desc=f"Eval-{idx}", leave=True) + # Acquire a position from the pool for this tqdm progress bar + position = await position_pool.get() + + # Serialize tqdm creation to prevent race conditions + # Multiple tasks creating tqdm simultaneously can cause display conflicts + async with pbar_creation_lock: + # Create tqdm progress bar with assigned position to avoid overlapping + # leave=False ensures the progress bar is cleared after completion, + # preventing accumulation of completed bars and allowing position reuse + pbar = tqdm( + total=4, desc=f"Eval-{idx}", position=position, leave=False + ) + # Give tqdm time to initialize and claim its screen position + await asyncio.sleep(0.05) eval_results = evaluate( dataset=eval_dataset, @@ -424,9 +441,6 @@ class RAGEvaluator: _pbar=pbar, ) - pbar.close() - pbar = None - # Convert to DataFrame (RAGAS v0.3+ API) df = eval_results.to_pandas() @@ -487,6 +501,9 @@ class RAGEvaluator: # Force close progress bar to ensure completion if pbar is not None: pbar.close() + # Release the position back to the pool for reuse + if position is not None: + await position_pool.put(position) async def evaluate_responses(self) -> List[Dict[str, Any]]: """ @@ -513,6 +530,16 @@ class RAGEvaluator: # Create progress counter (shared across all tasks) progress_counter = {"completed": 0} + # Create position pool for tqdm progress bars + # Positions range from 0 to max_async-1, ensuring no overlapping displays + position_pool = asyncio.Queue() + for i in range(max_async): + await position_pool.put(i) + + # Create lock to serialize tqdm creation and prevent race conditions + # This ensures progress bars are created one at a time, avoiding display conflicts + pbar_creation_lock = asyncio.Lock() + # Create shared HTTP client with connection pooling and proper timeouts # Timeout: 3 minutes for connect, 5 minutes for read (LLM can be slow) timeout = httpx.Timeout( @@ -535,6 +562,8 @@ class RAGEvaluator: eval_semaphore, client, progress_counter, + position_pool, + pbar_creation_lock, ) for idx, test_case in enumerate(self.test_cases, 1) ]