diff --git a/lightrag/evaluation/eval_rag_quality.py b/lightrag/evaluation/eval_rag_quality.py index b95265b0..0d9633b4 100644 --- a/lightrag/evaluation/eval_rag_quality.py +++ b/lightrag/evaluation/eval_rag_quality.py @@ -358,7 +358,7 @@ class RAGEvaluator: Args: idx: Test case index (1-based) test_case: Test case dictionary with question and ground_truth - rag_semaphore: Semaphore to control RAG generation concurrency (Stage 1) + rag_semaphore: Semaphore to control overall concurrency (covers entire function) eval_semaphore: Semaphore to control RAGAS evaluation concurrency (Stage 2) client: Shared httpx AsyncClient for connection pooling progress_counter: Shared dictionary for progress tracking @@ -366,11 +366,13 @@ class RAGEvaluator: Returns: Evaluation result dictionary """ - question = test_case["question"] - ground_truth = test_case["ground_truth"] - - # Stage 1: Generate RAG response (controlled by rag_semaphore) + # rag_semaphore controls the entire evaluation process to prevent + # all RAG responses from being generated at once when eval is slow async with rag_semaphore: + question = test_case["question"] + ground_truth = test_case["ground_truth"] + + # Stage 1: Generate RAG response try: rag_response = await self.generate_rag_response( question=question, client=client @@ -387,102 +389,104 @@ class RAGEvaluator: "timestamp": datetime.now().isoformat(), } - # *** CRITICAL FIX: Use actual retrieved contexts, NOT ground_truth *** - retrieved_contexts = rag_response["contexts"] + # *** CRITICAL FIX: Use actual retrieved contexts, NOT ground_truth *** + retrieved_contexts = rag_response["contexts"] - # DEBUG: Print what was actually retrieved (only in debug mode) - logger.debug("📝 Test %s: Retrieved %s contexts", idx, len(retrieved_contexts)) - - # Prepare dataset for RAGAS evaluation with CORRECT contexts - eval_dataset = Dataset.from_dict( - { - "question": [question], - "answer": [rag_response["answer"]], - "contexts": [retrieved_contexts], - "ground_truth": [ground_truth], - } - ) - - # Stage 2: Run RAGAS evaluation (controlled by eval_semaphore) - # IMPORTANT: Create fresh metric instances for each evaluation to avoid - # concurrent state conflicts when multiple tasks run in parallel - async with eval_semaphore: - pbar = None - try: - # Create standard tqdm progress bar for RAGAS evaluation - pbar = tqdm(total=4, desc=f"Eval-{idx}", leave=True) - - eval_results = evaluate( - dataset=eval_dataset, - metrics=[ - Faithfulness(), - AnswerRelevancy(), - ContextRecall(), - ContextPrecision(), - ], - llm=self.eval_llm, - embeddings=self.eval_embeddings, - _pbar=pbar, - ) - - # Convert to DataFrame (RAGAS v0.3+ API) - df = eval_results.to_pandas() - - # Extract scores from first row - scores_row = df.iloc[0] - - # Extract scores (RAGAS v0.3+ uses .to_pandas()) - result = { - "test_number": idx, - "question": question, - "answer": rag_response["answer"][:200] + "..." - if len(rag_response["answer"]) > 200 - else rag_response["answer"], - "ground_truth": ground_truth[:200] + "..." - if len(ground_truth) > 200 - else ground_truth, - "project": test_case.get("project", "unknown"), - "metrics": { - "faithfulness": float(scores_row.get("faithfulness", 0)), - "answer_relevance": float( - scores_row.get("answer_relevancy", 0) - ), - "context_recall": float(scores_row.get("context_recall", 0)), - "context_precision": float( - scores_row.get("context_precision", 0) - ), - }, - "timestamp": datetime.now().isoformat(), + # Prepare dataset for RAGAS evaluation with CORRECT contexts + eval_dataset = Dataset.from_dict( + { + "question": [question], + "answer": [rag_response["answer"]], + "contexts": [retrieved_contexts], + "ground_truth": [ground_truth], } + ) - # Calculate RAGAS score (average of all metrics, excluding NaN values) - metrics = result["metrics"] - valid_metrics = [v for v in metrics.values() if not _is_nan(v)] - ragas_score = ( - sum(valid_metrics) / len(valid_metrics) if valid_metrics else 0 - ) - result["ragas_score"] = round(ragas_score, 4) + # Stage 2: Run RAGAS evaluation (controlled by eval_semaphore) + # IMPORTANT: Create fresh metric instances for each evaluation to avoid + # concurrent state conflicts when multiple tasks run in parallel + async with eval_semaphore: + pbar = None + try: + # Create standard tqdm progress bar for RAGAS evaluation + pbar = tqdm(total=4, desc=f"Eval-{idx}", leave=True) - # Update progress counter - progress_counter["completed"] += 1 + eval_results = evaluate( + dataset=eval_dataset, + metrics=[ + Faithfulness(), + AnswerRelevancy(), + ContextRecall(), + ContextPrecision(), + ], + llm=self.eval_llm, + embeddings=self.eval_embeddings, + _pbar=pbar, + ) - return result - - except Exception as e: - logger.error("Error evaluating test %s: %s", idx, str(e)) - progress_counter["completed"] += 1 - return { - "test_number": idx, - "question": question, - "error": str(e), - "metrics": {}, - "ragas_score": 0, - "timestamp": datetime.now().isoformat(), - } - finally: - # Force close progress bar to ensure completion - if pbar is not None: pbar.close() + pbar = None + + # Convert to DataFrame (RAGAS v0.3+ API) + df = eval_results.to_pandas() + + # Extract scores from first row + scores_row = df.iloc[0] + + # Extract scores (RAGAS v0.3+ uses .to_pandas()) + result = { + "test_number": idx, + "question": question, + "answer": rag_response["answer"][:200] + "..." + if len(rag_response["answer"]) > 200 + else rag_response["answer"], + "ground_truth": ground_truth[:200] + "..." + if len(ground_truth) > 200 + else ground_truth, + "project": test_case.get("project", "unknown"), + "metrics": { + "faithfulness": float(scores_row.get("faithfulness", 0)), + "answer_relevance": float( + scores_row.get("answer_relevancy", 0) + ), + "context_recall": float( + scores_row.get("context_recall", 0) + ), + "context_precision": float( + scores_row.get("context_precision", 0) + ), + }, + "timestamp": datetime.now().isoformat(), + } + + # Calculate RAGAS score (average of all metrics, excluding NaN values) + metrics = result["metrics"] + valid_metrics = [v for v in metrics.values() if not _is_nan(v)] + ragas_score = ( + sum(valid_metrics) / len(valid_metrics) if valid_metrics else 0 + ) + result["ragas_score"] = round(ragas_score, 4) + + # Update progress counter + progress_counter["completed"] += 1 + + return result + + except Exception as e: + logger.error("Error evaluating test %s: %s", idx, str(e)) + progress_counter["completed"] += 1 + return { + "test_number": idx, + "question": question, + "error": str(e), + "metrics": {}, + "ragas_score": 0, + "timestamp": datetime.now().isoformat(), + } + finally: + # Force close progress bar to ensure completion + if pbar is not None: + pbar.close() async def evaluate_responses(self) -> List[Dict[str, Any]]: """ @@ -501,8 +505,8 @@ class RAGEvaluator: logger.info("%s", "=" * 70) # Create two-stage pipeline semaphores - # Stage 1: RAG generation - allow +1 concurrency to keep evaluation fed - rag_semaphore = asyncio.Semaphore(max_async + 1) + # Stage 1: RAG generation - allow x2 concurrency to keep evaluation fed + rag_semaphore = asyncio.Semaphore(max_async * 2) # Stage 2: RAGAS evaluation - primary bottleneck eval_semaphore = asyncio.Semaphore(max_async)