From 83715a3ac12f30948e708271366834ae65cbc090 Mon Sep 17 00:00:00 2001 From: yangdx Date: Wed, 5 Nov 2025 00:36:09 +0800 Subject: [PATCH] Implement two-stage pipeline for RAG evaluation with separate semaphores MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Split RAG gen and eval stages • Add rag_semaphore for stage 1 • Add eval_semaphore for stage 2 • Improve concurrency control • Update connection pool limits --- lightrag/evaluation/eval_rag_quality.py | 80 ++++++++++++++----------- 1 file changed, 45 insertions(+), 35 deletions(-) diff --git a/lightrag/evaluation/eval_rag_quality.py b/lightrag/evaluation/eval_rag_quality.py index 292b07f9..b95265b0 100644 --- a/lightrag/evaluation/eval_rag_quality.py +++ b/lightrag/evaluation/eval_rag_quality.py @@ -347,28 +347,30 @@ class RAGEvaluator: self, idx: int, test_case: Dict[str, str], - semaphore: asyncio.Semaphore, + rag_semaphore: asyncio.Semaphore, + eval_semaphore: asyncio.Semaphore, client: httpx.AsyncClient, progress_counter: Dict[str, int], ) -> Dict[str, Any]: """ - Evaluate a single test case with concurrency control + Evaluate a single test case with two-stage pipeline concurrency control Args: idx: Test case index (1-based) test_case: Test case dictionary with question and ground_truth - semaphore: Semaphore to control concurrency + rag_semaphore: Semaphore to control RAG generation concurrency (Stage 1) + eval_semaphore: Semaphore to control RAGAS evaluation concurrency (Stage 2) client: Shared httpx AsyncClient for connection pooling progress_counter: Shared dictionary for progress tracking Returns: Evaluation result dictionary """ - async with semaphore: - question = test_case["question"] - ground_truth = test_case["ground_truth"] + question = test_case["question"] + ground_truth = test_case["ground_truth"] - # Generate RAG response by calling actual LightRAG API + # Stage 1: Generate RAG response (controlled by rag_semaphore) + async with rag_semaphore: try: rag_response = await self.generate_rag_response( question=question, client=client @@ -385,32 +387,31 @@ class RAGEvaluator: "timestamp": datetime.now().isoformat(), } - # *** CRITICAL FIX: Use actual retrieved contexts, NOT ground_truth *** - retrieved_contexts = rag_response["contexts"] + # *** CRITICAL FIX: Use actual retrieved contexts, NOT ground_truth *** + retrieved_contexts = rag_response["contexts"] - # DEBUG: Print what was actually retrieved (only in debug mode) - logger.debug( - "📝 Test %s: Retrieved %s contexts", idx, len(retrieved_contexts) - ) + # DEBUG: Print what was actually retrieved (only in debug mode) + logger.debug("📝 Test %s: Retrieved %s contexts", idx, len(retrieved_contexts)) - # Prepare dataset for RAGAS evaluation with CORRECT contexts - eval_dataset = Dataset.from_dict( - { - "question": [question], - "answer": [rag_response["answer"]], - "contexts": [retrieved_contexts], - "ground_truth": [ground_truth], - } - ) + # Prepare dataset for RAGAS evaluation with CORRECT contexts + eval_dataset = Dataset.from_dict( + { + "question": [question], + "answer": [rag_response["answer"]], + "contexts": [retrieved_contexts], + "ground_truth": [ground_truth], + } + ) - # Run RAGAS evaluation - # IMPORTANT: Create fresh metric instances for each evaluation to avoid - # concurrent state conflicts when multiple tasks run in parallel + # Stage 2: Run RAGAS evaluation (controlled by eval_semaphore) + # IMPORTANT: Create fresh metric instances for each evaluation to avoid + # concurrent state conflicts when multiple tasks run in parallel + async with eval_semaphore: pbar = None try: # Create standard tqdm progress bar for RAGAS evaluation pbar = tqdm(total=4, desc=f"Eval-{idx}", leave=True) - + eval_results = evaluate( dataset=eval_dataset, metrics=[ @@ -485,21 +486,25 @@ class RAGEvaluator: async def evaluate_responses(self) -> List[Dict[str, Any]]: """ - Evaluate all test cases in parallel and return metrics + Evaluate all test cases in parallel with two-stage pipeline and return metrics Returns: List of evaluation results with metrics """ - # Get evaluation concurrency from environment (default to 1 for serial evaluation) + # Get evaluation concurrency from environment (default to 2 for parallel evaluation) max_async = int(os.getenv("EVAL_MAX_CONCURRENT", "2")) logger.info("%s", "=" * 70) logger.info("🚀 Starting RAGAS Evaluation of LightRAG System") - logger.info("🔧 Concurrent evaluations: %s", max_async) + logger.info("🔧 Two-Stage Pipeline Configuration:") + logger.info(" • RAGAS Evaluation (Stage 2): %s concurrent", max_async) logger.info("%s", "=" * 70) - # Create semaphore to limit concurrent evaluations - semaphore = asyncio.Semaphore(max_async) + # Create two-stage pipeline semaphores + # Stage 1: RAG generation - allow +1 concurrency to keep evaluation fed + rag_semaphore = asyncio.Semaphore(max_async + 1) + # Stage 2: RAGAS evaluation - primary bottleneck + eval_semaphore = asyncio.Semaphore(max_async) # Create progress counter (shared across all tasks) progress_counter = {"completed": 0} @@ -512,20 +517,25 @@ class RAGEvaluator: read=READ_TIMEOUT_SECONDS, ) limits = httpx.Limits( - max_connections=max_async * 2, # Allow some buffer - max_keepalive_connections=max_async, + max_connections=(max_async + 1) * 2, # Allow buffer for RAG stage + max_keepalive_connections=max_async + 1, ) async with httpx.AsyncClient(timeout=timeout, limits=limits) as client: # Create tasks for all test cases tasks = [ self.evaluate_single_case( - idx, test_case, semaphore, client, progress_counter + idx, + test_case, + rag_semaphore, + eval_semaphore, + client, + progress_counter, ) for idx, test_case in enumerate(self.test_cases, 1) ] - # Run all evaluations in parallel (limited by semaphore) + # Run all evaluations in parallel (limited by two-stage semaphores) results = await asyncio.gather(*tasks) return list(results)