From c459caed2645cac475267d0016bab87ed81d47fc Mon Sep 17 00:00:00 2001 From: yangdx Date: Wed, 5 Nov 2025 00:36:09 +0800 Subject: [PATCH] Implement two-stage pipeline for RAG evaluation with separate semaphores MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Split RAG gen and eval stages • Add rag_semaphore for stage 1 • Add eval_semaphore for stage 2 • Improve concurrency control • Update connection pool limits (cherry picked from commit 83715a3ac12f30948e708271366834ae65cbc090) --- lightrag/evaluation/eval_rag_quality.py | 699 ++++++++++++++++++------ 1 file changed, 518 insertions(+), 181 deletions(-) diff --git a/lightrag/evaluation/eval_rag_quality.py b/lightrag/evaluation/eval_rag_quality.py index 35947615..b95265b0 100644 --- a/lightrag/evaluation/eval_rag_quality.py +++ b/lightrag/evaluation/eval_rag_quality.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 """ -RAGAS Evaluation Script for Portfolio RAG System +RAGAS Evaluation Script for LightRAG System Evaluates RAG response quality using RAGAS metrics: - Faithfulness: Is the answer factually accurate based on context? @@ -9,56 +9,98 @@ Evaluates RAG response quality using RAGAS metrics: - Context Precision: Is retrieved context clean without noise? Usage: + # Use defaults (sample_dataset.json, http://localhost:9621) python lightrag/evaluation/eval_rag_quality.py - python lightrag/evaluation/eval_rag_quality.py http://localhost:9621 - python lightrag/evaluation/eval_rag_quality.py http://your-rag-server.com:9621 + + # Specify custom dataset + python lightrag/evaluation/eval_rag_quality.py --dataset my_test.json + python lightrag/evaluation/eval_rag_quality.py -d my_test.json + + # Specify custom RAG endpoint + python lightrag/evaluation/eval_rag_quality.py --ragendpoint http://my-server.com:9621 + python lightrag/evaluation/eval_rag_quality.py -r http://my-server.com:9621 + + # Specify both + python lightrag/evaluation/eval_rag_quality.py -d my_test.json -r http://localhost:9621 + + # Get help + python lightrag/evaluation/eval_rag_quality.py --help Results are saved to: lightrag/evaluation/results/ - results_YYYYMMDD_HHMMSS.csv (CSV export for analysis) - results_YYYYMMDD_HHMMSS.json (Full results with details) + +Technical Notes: + - Uses stable RAGAS API (LangchainLLMWrapper) for maximum compatibility + - Supports custom OpenAI-compatible endpoints via EVAL_LLM_BINDING_HOST + - Enables bypass_n mode for endpoints that don't support 'n' parameter + - Deprecation warnings are suppressed for cleaner output """ +import argparse import asyncio import csv import json +import math import os import sys import time +import warnings from datetime import datetime from pathlib import Path from typing import Any, Dict, List import httpx from dotenv import load_dotenv +from lightrag.utils import logger + +# Suppress LangchainLLMWrapper deprecation warning +# We use LangchainLLMWrapper for stability and compatibility with all RAGAS versions +warnings.filterwarnings( + "ignore", + message=".*LangchainLLMWrapper is deprecated.*", + category=DeprecationWarning, +) # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent.parent.parent)) -# Load .env from project root -project_root = Path(__file__).parent.parent.parent -load_dotenv(project_root / ".env") - -# Setup OpenAI API key (required for RAGAS evaluation) -# Use LLM_BINDING_API_KEY if OPENAI_API_KEY is not set -if "OPENAI_API_KEY" not in os.environ: - if "LLM_BINDING_API_KEY" in os.environ: - os.environ["OPENAI_API_KEY"] = os.environ["LLM_BINDING_API_KEY"] - else: - os.environ["OPENAI_API_KEY"] = input("Enter your OpenAI API key: ") +# use the .env that is inside the current folder +# allows to use different .env file for each lightrag instance +# the OS environment variables take precedence over the .env file +load_dotenv(dotenv_path=".env", override=False) +# Conditional imports - will raise ImportError if dependencies not installed try: from datasets import Dataset from ragas import evaluate from ragas.metrics import ( - answer_relevancy, - context_precision, - context_recall, - faithfulness, + AnswerRelevancy, + ContextPrecision, + ContextRecall, + Faithfulness, ) -except ImportError as e: - print(f"❌ RAGAS import error: {e}") - print(" Install with: pip install ragas datasets") - sys.exit(1) + from ragas.llms import LangchainLLMWrapper + from langchain_openai import ChatOpenAI, OpenAIEmbeddings + from tqdm import tqdm + + RAGAS_AVAILABLE = True + +except ImportError: + RAGAS_AVAILABLE = False + Dataset = None + evaluate = None + LangchainLLMWrapper = None + + +CONNECT_TIMEOUT_SECONDS = 180.0 +READ_TIMEOUT_SECONDS = 300.0 +TOTAL_TIMEOUT_SECONDS = 180.0 + + +def _is_nan(value: Any) -> bool: + """Return True when value is a float NaN.""" + return isinstance(value, float) and math.isnan(value) class RAGEvaluator: @@ -72,7 +114,75 @@ class RAGEvaluator: test_dataset_path: Path to test dataset JSON file rag_api_url: Base URL of LightRAG API (e.g., http://localhost:9621) If None, will try to read from environment or use default + + Environment Variables: + EVAL_LLM_MODEL: LLM model for evaluation (default: gpt-4o-mini) + EVAL_EMBEDDING_MODEL: Embedding model for evaluation (default: text-embedding-3-small) + EVAL_LLM_BINDING_API_KEY: API key for evaluation models (fallback to OPENAI_API_KEY) + EVAL_LLM_BINDING_HOST: Custom endpoint URL for evaluation models (optional) + + Raises: + ImportError: If ragas or datasets packages are not installed + EnvironmentError: If EVAL_LLM_BINDING_API_KEY and OPENAI_API_KEY are both not set """ + # Validate RAGAS dependencies are installed + if not RAGAS_AVAILABLE: + raise ImportError( + "RAGAS dependencies not installed. " + "Install with: pip install ragas datasets" + ) + + # Configure evaluation models (for RAGAS scoring) + eval_api_key = os.getenv("EVAL_LLM_BINDING_API_KEY") or os.getenv( + "OPENAI_API_KEY" + ) + if not eval_api_key: + raise EnvironmentError( + "EVAL_LLM_BINDING_API_KEY or OPENAI_API_KEY is required for evaluation. " + "Set EVAL_LLM_BINDING_API_KEY to use a custom API key, " + "or ensure OPENAI_API_KEY is set." + ) + + eval_model = os.getenv("EVAL_LLM_MODEL", "gpt-4o-mini") + eval_embedding_model = os.getenv( + "EVAL_EMBEDDING_MODEL", "text-embedding-3-large" + ) + eval_base_url = os.getenv("EVAL_LLM_BINDING_HOST") + + # Create LLM and Embeddings instances for RAGAS + llm_kwargs = { + "model": eval_model, + "api_key": eval_api_key, + "max_retries": int(os.getenv("EVAL_LLM_MAX_RETRIES", "5")), + "request_timeout": int(os.getenv("EVAL_LLM_TIMEOUT", "180")), + } + embedding_kwargs = {"model": eval_embedding_model, "api_key": eval_api_key} + + if eval_base_url: + llm_kwargs["base_url"] = eval_base_url + embedding_kwargs["base_url"] = eval_base_url + + # Create base LangChain LLM + base_llm = ChatOpenAI(**llm_kwargs) + self.eval_embeddings = OpenAIEmbeddings(**embedding_kwargs) + + # Wrap LLM with LangchainLLMWrapper and enable bypass_n mode for custom endpoints + # This ensures compatibility with endpoints that don't support the 'n' parameter + # by generating multiple outputs through repeated prompts instead of using 'n' parameter + try: + self.eval_llm = LangchainLLMWrapper( + langchain_llm=base_llm, + bypass_n=True, # Enable bypass_n to avoid passing 'n' to OpenAI API + ) + logger.debug("Successfully configured bypass_n mode for LLM wrapper") + except Exception as e: + logger.warning( + "Could not configure LangchainLLMWrapper with bypass_n: %s. " + "Using base LLM directly, which may cause warnings with custom endpoints.", + e, + ) + self.eval_llm = base_llm + if test_dataset_path is None: test_dataset_path = Path(__file__).parent / "sample_dataset.json" @@ -87,6 +197,41 @@ class RAGEvaluator: # Load test dataset self.test_cases = self._load_test_dataset() + # Store configuration values for display + self.eval_model = eval_model + self.eval_embedding_model = eval_embedding_model + self.eval_base_url = eval_base_url + self.eval_max_retries = llm_kwargs["max_retries"] + self.eval_timeout = llm_kwargs["request_timeout"] + + # Display configuration + self._display_configuration() + + def _display_configuration(self): + """Display all evaluation configuration settings""" + logger.info("Evaluation Models:") + logger.info(" • LLM Model: %s", self.eval_model) + logger.info(" • Embedding Model: %s", self.eval_embedding_model) + if self.eval_base_url: + logger.info(" • Custom Endpoint: %s", self.eval_base_url) + logger.info( + " • Bypass N-Parameter: Enabled (use LangchainLLMWrapperfor compatibility)" + ) + else: + logger.info(" • Endpoint: OpenAI Official API") + + logger.info("Concurrency & Rate Limiting:") + query_top_k = int(os.getenv("EVAL_QUERY_TOP_K", "10")) + logger.info(" • Query Top-K: %s Entities/Relations", query_top_k) + logger.info(" • LLM Max Retries: %s", self.eval_max_retries) + logger.info(" • LLM Timeout: %s seconds", self.eval_timeout) + + logger.info("Test Configuration:") + logger.info(" • Total Test Cases: %s", len(self.test_cases)) + logger.info(" • Test Dataset: %s", self.test_dataset_path.name) + logger.info(" • LightRAG API: %s", self.rag_api_url) + logger.info(" • Results Directory: %s", self.results_dir.name) + def _load_test_dataset(self) -> List[Dict[str, str]]: """Load test cases from JSON file""" if not self.test_dataset_path.exists(): @@ -123,13 +268,22 @@ class RAGEvaluator: "include_references": True, "include_chunk_content": True, # NEW: Request chunk content in references "response_type": "Multiple Paragraphs", - "top_k": 10, + "top_k": int(os.getenv("EVAL_QUERY_TOP_K", "10")), } + # Get API key from environment for authentication + api_key = os.getenv("LIGHTRAG_API_KEY") + + # Prepare headers with optional authentication + headers = {} + if api_key: + headers["X-API-Key"] = api_key + # Single optimized API call - gets both answer AND chunk content response = await client.post( f"{self.rag_api_url}/query", json=payload, + headers=headers if headers else None, ) response.raise_for_status() result = response.json() @@ -138,17 +292,31 @@ class RAGEvaluator: references = result.get("references", []) # DEBUG: Inspect the API response - print(f" 🔍 References Count: {len(references)}") + logger.debug("🔍 References Count: %s", len(references)) if references: first_ref = references[0] - print(f" 🔍 First Reference Keys: {list(first_ref.keys())}") + logger.debug("🔍 First Reference Keys: %s", list(first_ref.keys())) if "content" in first_ref: - print(f" 🔍 Content Preview: {first_ref['content'][:100]}...") + content_preview = first_ref["content"] + if isinstance(content_preview, list) and content_preview: + logger.debug( + "🔍 Content Preview (first chunk): %s...", + content_preview[0][:100], + ) + elif isinstance(content_preview, str): + logger.debug("🔍 Content Preview: %s...", content_preview[:100]) # Extract chunk content from enriched references - contexts = [ - ref.get("content", "") for ref in references if ref.get("content") - ] + # Note: content is now a list of chunks per reference (one file may have multiple chunks) + contexts = [] + for ref in references: + content = ref.get("content", []) + if isinstance(content, list): + # Flatten the list: each chunk becomes a separate context + contexts.extend(content) + elif isinstance(content, str): + # Backward compatibility: if content is still a string (shouldn't happen) + contexts.append(content) return { "answer": answer, @@ -179,62 +347,82 @@ class RAGEvaluator: self, idx: int, test_case: Dict[str, str], - semaphore: asyncio.Semaphore, + rag_semaphore: asyncio.Semaphore, + eval_semaphore: asyncio.Semaphore, client: httpx.AsyncClient, + progress_counter: Dict[str, int], ) -> Dict[str, Any]: """ - Evaluate a single test case with concurrency control + Evaluate a single test case with two-stage pipeline concurrency control Args: idx: Test case index (1-based) test_case: Test case dictionary with question and ground_truth - semaphore: Semaphore to control concurrency + rag_semaphore: Semaphore to control RAG generation concurrency (Stage 1) + eval_semaphore: Semaphore to control RAGAS evaluation concurrency (Stage 2) client: Shared httpx AsyncClient for connection pooling + progress_counter: Shared dictionary for progress tracking Returns: Evaluation result dictionary """ - async with semaphore: - question = test_case["question"] - ground_truth = test_case["ground_truth"] + question = test_case["question"] + ground_truth = test_case["ground_truth"] - print(f"[{idx}/{len(self.test_cases)}] Evaluating: {question[:60]}...") - - # Generate RAG response by calling actual LightRAG API - rag_response = await self.generate_rag_response( - question=question, client=client - ) - - # *** CRITICAL FIX: Use actual retrieved contexts, NOT ground_truth *** - retrieved_contexts = rag_response["contexts"] - - # DEBUG: Print what was actually retrieved - print(f" 📝 Retrieved {len(retrieved_contexts)} contexts") - if retrieved_contexts: - print(f" 📄 First context preview: {retrieved_contexts[0][:100]}...") - else: - print(" ⚠️ WARNING: No contexts retrieved!") - - # Prepare dataset for RAGAS evaluation with CORRECT contexts - eval_dataset = Dataset.from_dict( - { - "question": [question], - "answer": [rag_response["answer"]], - "contexts": [retrieved_contexts], - "ground_truth": [ground_truth], - } - ) - - # Run RAGAS evaluation + # Stage 1: Generate RAG response (controlled by rag_semaphore) + async with rag_semaphore: try: + rag_response = await self.generate_rag_response( + question=question, client=client + ) + except Exception as e: + logger.error("Error generating response for test %s: %s", idx, str(e)) + progress_counter["completed"] += 1 + return { + "test_number": idx, + "question": question, + "error": str(e), + "metrics": {}, + "ragas_score": 0, + "timestamp": datetime.now().isoformat(), + } + + # *** CRITICAL FIX: Use actual retrieved contexts, NOT ground_truth *** + retrieved_contexts = rag_response["contexts"] + + # DEBUG: Print what was actually retrieved (only in debug mode) + logger.debug("📝 Test %s: Retrieved %s contexts", idx, len(retrieved_contexts)) + + # Prepare dataset for RAGAS evaluation with CORRECT contexts + eval_dataset = Dataset.from_dict( + { + "question": [question], + "answer": [rag_response["answer"]], + "contexts": [retrieved_contexts], + "ground_truth": [ground_truth], + } + ) + + # Stage 2: Run RAGAS evaluation (controlled by eval_semaphore) + # IMPORTANT: Create fresh metric instances for each evaluation to avoid + # concurrent state conflicts when multiple tasks run in parallel + async with eval_semaphore: + pbar = None + try: + # Create standard tqdm progress bar for RAGAS evaluation + pbar = tqdm(total=4, desc=f"Eval-{idx}", leave=True) + eval_results = evaluate( dataset=eval_dataset, metrics=[ - faithfulness, - answer_relevancy, - context_recall, - context_precision, + Faithfulness(), + AnswerRelevancy(), + ContextRecall(), + ContextPrecision(), ], + llm=self.eval_llm, + embeddings=self.eval_embeddings, + _pbar=pbar, ) # Convert to DataFrame (RAGAS v0.3+ API) @@ -245,6 +433,7 @@ class RAGEvaluator: # Extract scores (RAGAS v0.3+ uses .to_pandas()) result = { + "test_number": idx, "question": question, "answer": rag_response["answer"][:200] + "..." if len(rag_response["answer"]) > 200 @@ -252,7 +441,7 @@ class RAGEvaluator: "ground_truth": ground_truth[:200] + "..." if len(ground_truth) > 200 else ground_truth, - "project": test_case.get("project_context", "unknown"), + "project": test_case.get("project", "unknown"), "metrics": { "faithfulness": float(scores_row.get("faithfulness", 0)), "answer_relevance": float( @@ -266,67 +455,87 @@ class RAGEvaluator: "timestamp": datetime.now().isoformat(), } - # Calculate RAGAS score (average of all metrics) + # Calculate RAGAS score (average of all metrics, excluding NaN values) metrics = result["metrics"] - ragas_score = sum(metrics.values()) / len(metrics) if metrics else 0 + valid_metrics = [v for v in metrics.values() if not _is_nan(v)] + ragas_score = ( + sum(valid_metrics) / len(valid_metrics) if valid_metrics else 0 + ) result["ragas_score"] = round(ragas_score, 4) - # Print metrics - print(f" ✅ Faithfulness: {metrics['faithfulness']:.4f}") - print(f" ✅ Answer Relevance: {metrics['answer_relevance']:.4f}") - print(f" ✅ Context Recall: {metrics['context_recall']:.4f}") - print(f" ✅ Context Precision: {metrics['context_precision']:.4f}") - print(f" 📊 RAGAS Score: {result['ragas_score']:.4f}\n") + # Update progress counter + progress_counter["completed"] += 1 return result except Exception as e: - import traceback - - print(f" ❌ Error evaluating: {str(e)}") - print(f" 🔍 Full traceback:\n{traceback.format_exc()}\n") + logger.error("Error evaluating test %s: %s", idx, str(e)) + progress_counter["completed"] += 1 return { + "test_number": idx, "question": question, "error": str(e), "metrics": {}, "ragas_score": 0, "timestamp": datetime.now().isoformat(), } + finally: + # Force close progress bar to ensure completion + if pbar is not None: + pbar.close() async def evaluate_responses(self) -> List[Dict[str, Any]]: """ - Evaluate all test cases in parallel and return metrics + Evaluate all test cases in parallel with two-stage pipeline and return metrics Returns: List of evaluation results with metrics """ - # Get MAX_ASYNC from environment (default to 4 if not set) - max_async = int(os.getenv("MAX_ASYNC", "4")) + # Get evaluation concurrency from environment (default to 2 for parallel evaluation) + max_async = int(os.getenv("EVAL_MAX_CONCURRENT", "2")) - print("\n" + "=" * 70) - print("🚀 Starting RAGAS Evaluation of Portfolio RAG System") - print(f"🔧 Parallel evaluations: {max_async}") - print("=" * 70 + "\n") + logger.info("%s", "=" * 70) + logger.info("🚀 Starting RAGAS Evaluation of LightRAG System") + logger.info("🔧 Two-Stage Pipeline Configuration:") + logger.info(" • RAGAS Evaluation (Stage 2): %s concurrent", max_async) + logger.info("%s", "=" * 70) - # Create semaphore to limit concurrent evaluations - semaphore = asyncio.Semaphore(max_async) + # Create two-stage pipeline semaphores + # Stage 1: RAG generation - allow +1 concurrency to keep evaluation fed + rag_semaphore = asyncio.Semaphore(max_async + 1) + # Stage 2: RAGAS evaluation - primary bottleneck + eval_semaphore = asyncio.Semaphore(max_async) + + # Create progress counter (shared across all tasks) + progress_counter = {"completed": 0} # Create shared HTTP client with connection pooling and proper timeouts # Timeout: 3 minutes for connect, 5 minutes for read (LLM can be slow) - timeout = httpx.Timeout(180.0, connect=180.0, read=300.0) + timeout = httpx.Timeout( + TOTAL_TIMEOUT_SECONDS, + connect=CONNECT_TIMEOUT_SECONDS, + read=READ_TIMEOUT_SECONDS, + ) limits = httpx.Limits( - max_connections=max_async * 2, # Allow some buffer - max_keepalive_connections=max_async, + max_connections=(max_async + 1) * 2, # Allow buffer for RAG stage + max_keepalive_connections=max_async + 1, ) async with httpx.AsyncClient(timeout=timeout, limits=limits) as client: # Create tasks for all test cases tasks = [ - self.evaluate_single_case(idx, test_case, semaphore, client) + self.evaluate_single_case( + idx, + test_case, + rag_semaphore, + eval_semaphore, + client, + progress_counter, + ) for idx, test_case in enumerate(self.test_cases, 1) ] - # Run all evaluations in parallel (limited by semaphore) + # Run all evaluations in parallel (limited by two-stage semaphores) results = await asyncio.gather(*tasks) return list(results) @@ -391,6 +600,94 @@ class RAGEvaluator: return csv_path + def _format_metric(self, value: float, width: int = 6) -> str: + """ + Format a metric value for display, handling NaN gracefully + + Args: + value: The metric value to format + width: The width of the formatted string + + Returns: + Formatted string (e.g., "0.8523" or " N/A ") + """ + if _is_nan(value): + return "N/A".center(width) + return f"{value:.4f}".rjust(width) + + def _display_results_table(self, results: List[Dict[str, Any]]): + """ + Display evaluation results in a formatted table + + Args: + results: List of evaluation results + """ + logger.info("%s", "=" * 115) + logger.info("📊 EVALUATION RESULTS SUMMARY") + logger.info("%s", "=" * 115) + + # Table header + logger.info( + "%-4s | %-50s | %6s | %7s | %6s | %7s | %6s | %6s", + "#", + "Question", + "Faith", + "AnswRel", + "CtxRec", + "CtxPrec", + "RAGAS", + "Status", + ) + logger.info("%s", "-" * 115) + + # Table rows + for result in results: + test_num = result.get("test_number", 0) + question = result.get("question", "") + # Truncate question to 50 chars + question_display = ( + (question[:47] + "...") if len(question) > 50 else question + ) + + metrics = result.get("metrics", {}) + if metrics: + # Success case - format each metric, handling NaN values + faith = metrics.get("faithfulness", 0) + ans_rel = metrics.get("answer_relevance", 0) + ctx_rec = metrics.get("context_recall", 0) + ctx_prec = metrics.get("context_precision", 0) + ragas = result.get("ragas_score", 0) + status = "✓" + + logger.info( + "%-4d | %-50s | %s | %s | %s | %s | %s | %6s", + test_num, + question_display, + self._format_metric(faith, 6), + self._format_metric(ans_rel, 7), + self._format_metric(ctx_rec, 6), + self._format_metric(ctx_prec, 7), + self._format_metric(ragas, 6), + status, + ) + else: + # Error case + error = result.get("error", "Unknown error") + error_display = (error[:20] + "...") if len(error) > 23 else error + logger.info( + "%-4d | %-50s | %6s | %7s | %6s | %7s | %6s | ✗ %s", + test_num, + question_display, + "N/A", + "N/A", + "N/A", + "N/A", + "N/A", + error_display, + ) + + logger.info("%s", "=" * 115) + def _calculate_benchmark_stats( self, results: List[Dict[str, Any]] ) -> Dict[str, Any]: @@ -417,69 +714,61 @@ class RAGEvaluator: "success_rate": 0.0, } - # Calculate averages for each metric (handling NaN values) - import math - - metrics_sum = { - "faithfulness": 0.0, - "answer_relevance": 0.0, - "context_recall": 0.0, - "context_precision": 0.0, - "ragas_score": 0.0, + # Calculate averages for each metric (handling NaN values correctly) + # Track both sum and count for each metric to handle NaN values properly + metrics_data = { + "faithfulness": {"sum": 0.0, "count": 0}, + "answer_relevance": {"sum": 0.0, "count": 0}, + "context_recall": {"sum": 0.0, "count": 0}, + "context_precision": {"sum": 0.0, "count": 0}, + "ragas_score": {"sum": 0.0, "count": 0}, } for result in valid_results: metrics = result.get("metrics", {}) - # Skip NaN values when summing + + # For each metric, sum non-NaN values and count them faithfulness = metrics.get("faithfulness", 0) - if ( - not math.isnan(faithfulness) - if isinstance(faithfulness, float) - else True - ): - metrics_sum["faithfulness"] += faithfulness + if not _is_nan(faithfulness): + metrics_data["faithfulness"]["sum"] += faithfulness + metrics_data["faithfulness"]["count"] += 1 answer_relevance = metrics.get("answer_relevance", 0) - if ( - not math.isnan(answer_relevance) - if isinstance(answer_relevance, float) - else True - ): - metrics_sum["answer_relevance"] += answer_relevance + if not _is_nan(answer_relevance): + metrics_data["answer_relevance"]["sum"] += answer_relevance + metrics_data["answer_relevance"]["count"] += 1 context_recall = metrics.get("context_recall", 0) - if ( - not math.isnan(context_recall) - if isinstance(context_recall, float) - else True - ): - metrics_sum["context_recall"] += context_recall + if not _is_nan(context_recall): + metrics_data["context_recall"]["sum"] += context_recall + metrics_data["context_recall"]["count"] += 1 context_precision = metrics.get("context_precision", 0) - if ( - not math.isnan(context_precision) - if isinstance(context_precision, float) - else True - ): - metrics_sum["context_precision"] += context_precision + if not _is_nan(context_precision): + metrics_data["context_precision"]["sum"] += context_precision + metrics_data["context_precision"]["count"] += 1 ragas_score = result.get("ragas_score", 0) - if not math.isnan(ragas_score) if isinstance(ragas_score, float) else True: - metrics_sum["ragas_score"] += ragas_score + if not _is_nan(ragas_score): + metrics_data["ragas_score"]["sum"] += ragas_score + metrics_data["ragas_score"]["count"] += 1 - # Calculate averages - n = len(valid_results) + # Calculate averages using actual counts for each metric avg_metrics = {} - for k, v in metrics_sum.items(): - avg_val = v / n if n > 0 else 0 - # Handle NaN in average - avg_metrics[k] = round(avg_val, 4) if not math.isnan(avg_val) else 0.0 + for metric_name, data in metrics_data.items(): + if data["count"] > 0: + avg_val = data["sum"] / data["count"] + avg_metrics[metric_name] = ( + round(avg_val, 4) if not _is_nan(avg_val) else 0.0 + ) + else: + avg_metrics[metric_name] = 0.0 # Find min and max RAGAS scores (filter out NaN) ragas_scores = [] for r in valid_results: score = r.get("ragas_score", 0) - if isinstance(score, float) and math.isnan(score): + if _is_nan(score): continue # Skip NaN values ragas_scores.append(score) @@ -525,43 +814,57 @@ class RAGEvaluator: ) with open(json_path, "w") as f: json.dump(summary, f, indent=2) - print(f"✅ JSON results saved to: {json_path}") + + # Display results table + self._display_results_table(results) + + logger.info("✅ JSON results saved to: %s", json_path) # Export to CSV csv_path = self._export_to_csv(results) - print(f"✅ CSV results saved to: {csv_path}") + logger.info("✅ CSV results saved to: %s", csv_path) # Print summary - print("\n" + "=" * 70) - print("📊 EVALUATION COMPLETE") - print("=" * 70) - print(f"Total Tests: {len(results)}") - print(f"Successful: {benchmark_stats['successful_tests']}") - print(f"Failed: {benchmark_stats['failed_tests']}") - print(f"Success Rate: {benchmark_stats['success_rate']:.2f}%") - print(f"Elapsed Time: {elapsed_time:.2f} seconds") - print(f"Avg Time/Test: {elapsed_time / len(results):.2f} seconds") + logger.info("") + logger.info("%s", "=" * 70) + logger.info("📊 EVALUATION COMPLETE") + logger.info("%s", "=" * 70) + logger.info("Total Tests: %s", len(results)) + logger.info("Successful: %s", benchmark_stats["successful_tests"]) + logger.info("Failed: %s", benchmark_stats["failed_tests"]) + logger.info("Success Rate: %.2f%%", benchmark_stats["success_rate"]) + logger.info("Elapsed Time: %.2f seconds", elapsed_time) + logger.info("Avg Time/Test: %.2f seconds", elapsed_time / len(results)) # Print benchmark metrics - print("\n" + "=" * 70) - print("📈 BENCHMARK RESULTS (Averages)") - print("=" * 70) + logger.info("") + logger.info("%s", "=" * 70) + logger.info("📈 BENCHMARK RESULTS (Average)") + logger.info("%s", "=" * 70) avg = benchmark_stats["average_metrics"] - print(f"Average Faithfulness: {avg['faithfulness']:.4f}") - print(f"Average Answer Relevance: {avg['answer_relevance']:.4f}") - print(f"Average Context Recall: {avg['context_recall']:.4f}") - print(f"Average Context Precision: {avg['context_precision']:.4f}") - print(f"Average RAGAS Score: {avg['ragas_score']:.4f}") - print(f"\nMin RAGAS Score: {benchmark_stats['min_ragas_score']:.4f}") - print(f"Max RAGAS Score: {benchmark_stats['max_ragas_score']:.4f}") + logger.info("Average Faithfulness: %.4f", avg["faithfulness"]) + logger.info("Average Answer Relevance: %.4f", avg["answer_relevance"]) + logger.info("Average Context Recall: %.4f", avg["context_recall"]) + logger.info("Average Context Precision: %.4f", avg["context_precision"]) + logger.info("Average RAGAS Score: %.4f", avg["ragas_score"]) + logger.info("") + logger.info( + "Min RAGAS Score: %.4f", + benchmark_stats["min_ragas_score"], + ) + logger.info( + "Max RAGAS Score: %.4f", + benchmark_stats["max_ragas_score"], + ) - print("\n" + "=" * 70) - print("📁 GENERATED FILES") - print("=" * 70) - print(f"Results Dir: {self.results_dir.absolute()}") - print(f" • CSV: {csv_path.name}") - print(f" • JSON: {json_path.name}") - print("=" * 70 + "\n") + logger.info("") + logger.info("%s", "=" * 70) + logger.info("📁 GENERATED FILES") + logger.info("%s", "=" * 70) + logger.info("Results Dir: %s", self.results_dir.absolute()) + logger.info(" • CSV: %s", csv_path.name) + logger.info(" • JSON: %s", json_path.name) + logger.info("%s", "=" * 70) return summary @@ -570,30 +873,64 @@ async def main(): """ Main entry point for RAGAS evaluation + Command-line arguments: + --dataset, -d: Path to test dataset JSON file (default: sample_dataset.json) + --ragendpoint, -r: LightRAG API endpoint URL (default: http://localhost:9621 or $LIGHTRAG_API_URL) + Usage: python lightrag/evaluation/eval_rag_quality.py - python lightrag/evaluation/eval_rag_quality.py http://localhost:9621 - python lightrag/evaluation/eval_rag_quality.py http://your-server.com:9621 + python lightrag/evaluation/eval_rag_quality.py --dataset my_test.json + python lightrag/evaluation/eval_rag_quality.py -d my_test.json -r http://localhost:9621 """ try: - # Get RAG API URL from command line or environment - rag_api_url = None - if len(sys.argv) > 1: - rag_api_url = sys.argv[1] + # Parse command-line arguments + parser = argparse.ArgumentParser( + description="RAGAS Evaluation Script for LightRAG System", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Use defaults + python lightrag/evaluation/eval_rag_quality.py - print("\n" + "=" * 70) - print("🔍 RAGAS Evaluation - Using Real LightRAG API") - print("=" * 70) - if rag_api_url: - print(f"📡 RAG API URL: {rag_api_url}") - else: - print("📡 RAG API URL: http://localhost:9621 (default)") - print("=" * 70 + "\n") + # Specify custom dataset + python lightrag/evaluation/eval_rag_quality.py --dataset my_test.json - evaluator = RAGEvaluator(rag_api_url=rag_api_url) + # Specify custom RAG endpoint + python lightrag/evaluation/eval_rag_quality.py --ragendpoint http://my-server.com:9621 + + # Specify both + python lightrag/evaluation/eval_rag_quality.py -d my_test.json -r http://localhost:9621 + """, + ) + + parser.add_argument( + "--dataset", + "-d", + type=str, + default=None, + help="Path to test dataset JSON file (default: sample_dataset.json in evaluation directory)", + ) + + parser.add_argument( + "--ragendpoint", + "-r", + type=str, + default=None, + help="LightRAG API endpoint URL (default: http://localhost:9621 or $LIGHTRAG_API_URL environment variable)", + ) + + args = parser.parse_args() + + logger.info("%s", "=" * 70) + logger.info("🔍 RAGAS Evaluation - Using Real LightRAG API") + logger.info("%s", "=" * 70) + + evaluator = RAGEvaluator( + test_dataset_path=args.dataset, rag_api_url=args.ragendpoint + ) await evaluator.run() except Exception as e: - print(f"\n❌ Error: {str(e)}\n") + logger.exception("❌ Error: %s", e) sys.exit(1)