From af49572886d818c115c2f32ff74245c2d0bc40b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20MANSUY?= Date: Thu, 4 Dec 2025 19:19:00 +0800 Subject: [PATCH] cherry-pick 363f3051 --- lightrag/evaluation/eval_rag_quality.py | 724 ++++++------------------ 1 file changed, 168 insertions(+), 556 deletions(-) diff --git a/lightrag/evaluation/eval_rag_quality.py b/lightrag/evaluation/eval_rag_quality.py index 7b415090..9cc3d5be 100644 --- a/lightrag/evaluation/eval_rag_quality.py +++ b/lightrag/evaluation/eval_rag_quality.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 """ -RAGAS Evaluation Script for LightRAG System +RAGAS Evaluation Script for Portfolio RAG System Evaluates RAG response quality using RAGAS metrics: - Faithfulness: Is the answer factually accurate based on context? @@ -9,35 +9,15 @@ Evaluates RAG response quality using RAGAS metrics: - Context Precision: Is retrieved context clean without noise? Usage: - # Use defaults (sample_dataset.json, http://localhost:9621) python lightrag/evaluation/eval_rag_quality.py - - # Specify custom dataset - python lightrag/evaluation/eval_rag_quality.py --dataset my_test.json - python lightrag/evaluation/eval_rag_quality.py -d my_test.json - - # Specify custom RAG endpoint - python lightrag/evaluation/eval_rag_quality.py --ragendpoint http://my-server.com:9621 - python lightrag/evaluation/eval_rag_quality.py -r http://my-server.com:9621 - - # Specify both - python lightrag/evaluation/eval_rag_quality.py -d my_test.json -r http://localhost:9621 - - # Get help - python lightrag/evaluation/eval_rag_quality.py --help + python lightrag/evaluation/eval_rag_quality.py http://localhost:9621 + python lightrag/evaluation/eval_rag_quality.py http://your-rag-server.com:9621 Results are saved to: lightrag/evaluation/results/ - results_YYYYMMDD_HHMMSS.csv (CSV export for analysis) - results_YYYYMMDD_HHMMSS.json (Full results with details) - -Technical Notes: - - Uses stable RAGAS API (LangchainLLMWrapper) for maximum compatibility - - Supports custom OpenAI-compatible endpoints via EVAL_LLM_BINDING_HOST - - Enables bypass_n mode for endpoints that don't support 'n' parameter - - Deprecation warnings are suppressed for cleaner output """ -import argparse import asyncio import csv import json @@ -45,7 +25,6 @@ import math import os import sys import time -import warnings from datetime import datetime from pathlib import Path from typing import Any, Dict, List @@ -54,52 +33,49 @@ import httpx from dotenv import load_dotenv from lightrag.utils import logger -# Suppress LangchainLLMWrapper deprecation warning -# We use LangchainLLMWrapper for stability and compatibility with all RAGAS versions -warnings.filterwarnings( - "ignore", - message=".*LangchainLLMWrapper is deprecated.*", - category=DeprecationWarning, -) - -# Suppress token usage warning for custom OpenAI-compatible endpoints -# Custom endpoints (vLLM, SGLang, etc.) often don't return usage information -# This is non-critical as token tracking is not required for RAGAS evaluation -warnings.filterwarnings( - "ignore", - message=".*Unexpected type for token usage.*", - category=UserWarning, -) - # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent.parent.parent)) -# use the .env that is inside the current folder -# allows to use different .env file for each lightrag instance -# the OS environment variables take precedence over the .env file -load_dotenv(dotenv_path=".env", override=False) +# Load .env from project root +project_root = Path(__file__).parent.parent.parent +load_dotenv(project_root / ".env") + +# Setup OpenAI API key (required for RAGAS evaluation) +# Use LLM_BINDING_API_KEY when running with the OpenAI binding + +llm_binding = os.getenv("LLM_BINDING", "").lower() +llm_binding_key = os.getenv("LLM_BINDING_API_KEY") + +# Validate LLM_BINDING is set to openai +if llm_binding != "openai": + logger.error( + "❌ LLM_BINDING must be set to 'openai'. Current value: '%s'", + llm_binding or "(not set)", + ) + sys.exit(1) + +# Validate LLM_BINDING_API_KEY exists +if not llm_binding_key: + logger.error("❌ LLM_BINDING_API_KEY is not set. Cannot run RAGAS evaluation.") + sys.exit(1) + +# Set OPENAI_API_KEY from LLM_BINDING_API_KEY +os.environ["OPENAI_API_KEY"] = llm_binding_key +logger.info("✅ LLM_BINDING: openai") -# Conditional imports - will raise ImportError if dependencies not installed try: from datasets import Dataset from ragas import evaluate from ragas.metrics import ( - AnswerRelevancy, - ContextPrecision, - ContextRecall, - Faithfulness, + answer_relevancy, + context_precision, + context_recall, + faithfulness, ) - from ragas.llms import LangchainLLMWrapper - from langchain_openai import ChatOpenAI, OpenAIEmbeddings - from tqdm.auto import tqdm - - RAGAS_AVAILABLE = True - -except ImportError: - RAGAS_AVAILABLE = False - Dataset = None - evaluate = None - LangchainLLMWrapper = None +except ImportError as e: + logger.error("❌ RAGAS import error: %s", e) + logger.error(" Install with: pip install ragas datasets") + sys.exit(1) CONNECT_TIMEOUT_SECONDS = 180.0 @@ -123,94 +99,7 @@ class RAGEvaluator: test_dataset_path: Path to test dataset JSON file rag_api_url: Base URL of LightRAG API (e.g., http://localhost:9621) If None, will try to read from environment or use default - - Environment Variables: - EVAL_LLM_MODEL: LLM model for evaluation (default: gpt-4o-mini) - EVAL_EMBEDDING_MODEL: Embedding model for evaluation (default: text-embedding-3-small) - EVAL_LLM_BINDING_API_KEY: API key for LLM (fallback to OPENAI_API_KEY) - EVAL_LLM_BINDING_HOST: Custom endpoint URL for LLM (optional) - EVAL_EMBEDDING_BINDING_API_KEY: API key for embeddings (fallback: EVAL_LLM_BINDING_API_KEY -> OPENAI_API_KEY) - EVAL_EMBEDDING_BINDING_HOST: Custom endpoint URL for embeddings (fallback: EVAL_LLM_BINDING_HOST) - - Raises: - ImportError: If ragas or datasets packages are not installed - EnvironmentError: If EVAL_LLM_BINDING_API_KEY and OPENAI_API_KEY are both not set """ - # Validate RAGAS dependencies are installed - if not RAGAS_AVAILABLE: - raise ImportError( - "RAGAS dependencies not installed. " - "Install with: pip install ragas datasets" - ) - - # Configure evaluation LLM (for RAGAS scoring) - eval_llm_api_key = os.getenv("EVAL_LLM_BINDING_API_KEY") or os.getenv( - "OPENAI_API_KEY" - ) - if not eval_llm_api_key: - raise EnvironmentError( - "EVAL_LLM_BINDING_API_KEY or OPENAI_API_KEY is required for evaluation. " - "Set EVAL_LLM_BINDING_API_KEY to use a custom API key, " - "or ensure OPENAI_API_KEY is set." - ) - - eval_model = os.getenv("EVAL_LLM_MODEL", "gpt-4o-mini") - eval_llm_base_url = os.getenv("EVAL_LLM_BINDING_HOST") - - # Configure evaluation embeddings (for RAGAS scoring) - # Fallback chain: EVAL_EMBEDDING_BINDING_API_KEY -> EVAL_LLM_BINDING_API_KEY -> OPENAI_API_KEY - eval_embedding_api_key = ( - os.getenv("EVAL_EMBEDDING_BINDING_API_KEY") - or os.getenv("EVAL_LLM_BINDING_API_KEY") - or os.getenv("OPENAI_API_KEY") - ) - eval_embedding_model = os.getenv( - "EVAL_EMBEDDING_MODEL", "text-embedding-3-large" - ) - # Fallback chain: EVAL_EMBEDDING_BINDING_HOST -> EVAL_LLM_BINDING_HOST -> None - eval_embedding_base_url = os.getenv("EVAL_EMBEDDING_BINDING_HOST") or os.getenv( - "EVAL_LLM_BINDING_HOST" - ) - - # Create LLM and Embeddings instances for RAGAS - llm_kwargs = { - "model": eval_model, - "api_key": eval_llm_api_key, - "max_retries": int(os.getenv("EVAL_LLM_MAX_RETRIES", "5")), - "request_timeout": int(os.getenv("EVAL_LLM_TIMEOUT", "180")), - } - embedding_kwargs = { - "model": eval_embedding_model, - "api_key": eval_embedding_api_key, - } - - if eval_llm_base_url: - llm_kwargs["base_url"] = eval_llm_base_url - - if eval_embedding_base_url: - embedding_kwargs["base_url"] = eval_embedding_base_url - - # Create base LangChain LLM - base_llm = ChatOpenAI(**llm_kwargs) - self.eval_embeddings = OpenAIEmbeddings(**embedding_kwargs) - - # Wrap LLM with LangchainLLMWrapper and enable bypass_n mode for custom endpoints - # This ensures compatibility with endpoints that don't support the 'n' parameter - # by generating multiple outputs through repeated prompts instead of using 'n' parameter - try: - self.eval_llm = LangchainLLMWrapper( - langchain_llm=base_llm, - bypass_n=True, # Enable bypass_n to avoid passing 'n' to OpenAI API - ) - logger.debug("Successfully configured bypass_n mode for LLM wrapper") - except Exception as e: - logger.warning( - "Could not configure LangchainLLMWrapper with bypass_n: %s. " - "Using base LLM directly, which may cause warnings with custom endpoints.", - e, - ) - self.eval_llm = base_llm - if test_dataset_path is None: test_dataset_path = Path(__file__).parent / "sample_dataset.json" @@ -225,58 +114,6 @@ class RAGEvaluator: # Load test dataset self.test_cases = self._load_test_dataset() - # Store configuration values for display - self.eval_model = eval_model - self.eval_embedding_model = eval_embedding_model - self.eval_llm_base_url = eval_llm_base_url - self.eval_embedding_base_url = eval_embedding_base_url - self.eval_max_retries = llm_kwargs["max_retries"] - self.eval_timeout = llm_kwargs["request_timeout"] - - # Display configuration - self._display_configuration() - - def _display_configuration(self): - """Display all evaluation configuration settings""" - logger.info("Evaluation Models:") - logger.info(" • LLM Model: %s", self.eval_model) - logger.info(" • Embedding Model: %s", self.eval_embedding_model) - - # Display LLM endpoint - if self.eval_llm_base_url: - logger.info(" • LLM Endpoint: %s", self.eval_llm_base_url) - logger.info( - " • Bypass N-Parameter: Enabled (use LangchainLLMWrapper for compatibility)" - ) - else: - logger.info(" • LLM Endpoint: OpenAI Official API") - - # Display Embedding endpoint (only if different from LLM) - if self.eval_embedding_base_url: - if self.eval_embedding_base_url != self.eval_llm_base_url: - logger.info( - " • Embedding Endpoint: %s", self.eval_embedding_base_url - ) - # If same as LLM endpoint, no need to display separately - elif not self.eval_llm_base_url: - # Both using OpenAI - already displayed above - pass - else: - # LLM uses custom endpoint, but embeddings use OpenAI - logger.info(" • Embedding Endpoint: OpenAI Official API") - - logger.info("Concurrency & Rate Limiting:") - query_top_k = int(os.getenv("EVAL_QUERY_TOP_K", "10")) - logger.info(" • Query Top-K: %s Entities/Relations", query_top_k) - logger.info(" • LLM Max Retries: %s", self.eval_max_retries) - logger.info(" • LLM Timeout: %s seconds", self.eval_timeout) - - logger.info("Test Configuration:") - logger.info(" • Total Test Cases: %s", len(self.test_cases)) - logger.info(" • Test Dataset: %s", self.test_dataset_path.name) - logger.info(" • LightRAG API: %s", self.rag_api_url) - logger.info(" • Results Directory: %s", self.results_dir.name) - def _load_test_dataset(self) -> List[Dict[str, str]]: """Load test cases from JSON file""" if not self.test_dataset_path.exists(): @@ -313,22 +150,13 @@ class RAGEvaluator: "include_references": True, "include_chunk_content": True, # NEW: Request chunk content in references "response_type": "Multiple Paragraphs", - "top_k": int(os.getenv("EVAL_QUERY_TOP_K", "10")), + "top_k": 10, } - # Get API key from environment for authentication - api_key = os.getenv("LIGHTRAG_API_KEY") - - # Prepare headers with optional authentication - headers = {} - if api_key: - headers["X-API-Key"] = api_key - # Single optimized API call - gets both answer AND chunk content response = await client.post( f"{self.rag_api_url}/query", json=payload, - headers=headers if headers else None, ) response.raise_for_status() result = response.json() @@ -342,26 +170,14 @@ class RAGEvaluator: first_ref = references[0] logger.debug("🔍 First Reference Keys: %s", list(first_ref.keys())) if "content" in first_ref: - content_preview = first_ref["content"] - if isinstance(content_preview, list) and content_preview: - logger.debug( - "🔍 Content Preview (first chunk): %s...", - content_preview[0][:100], - ) - elif isinstance(content_preview, str): - logger.debug("🔍 Content Preview: %s...", content_preview[:100]) + logger.debug( + "🔍 Content Preview: %s...", first_ref["content"][:100] + ) # Extract chunk content from enriched references - # Note: content is now a list of chunks per reference (one file may have multiple chunks) - contexts = [] - for ref in references: - content = ref.get("content", []) - if isinstance(content, list): - # Flatten the list: each chunk becomes a separate context - contexts.extend(content) - elif isinstance(content, str): - # Backward compatibility: if content is still a string (shouldn't happen) - contexts.append(content) + contexts = [ + ref.get("content", "") for ref in references if ref.get("content") + ] return { "answer": answer, @@ -392,55 +208,46 @@ class RAGEvaluator: self, idx: int, test_case: Dict[str, str], - rag_semaphore: asyncio.Semaphore, - eval_semaphore: asyncio.Semaphore, + semaphore: asyncio.Semaphore, client: httpx.AsyncClient, - progress_counter: Dict[str, int], - position_pool: asyncio.Queue, - pbar_creation_lock: asyncio.Lock, ) -> Dict[str, Any]: """ - Evaluate a single test case with two-stage pipeline concurrency control + Evaluate a single test case with concurrency control Args: idx: Test case index (1-based) test_case: Test case dictionary with question and ground_truth - rag_semaphore: Semaphore to control overall concurrency (covers entire function) - eval_semaphore: Semaphore to control RAGAS evaluation concurrency (Stage 2) + semaphore: Semaphore to control concurrency client: Shared httpx AsyncClient for connection pooling - progress_counter: Shared dictionary for progress tracking - position_pool: Queue of available tqdm position indices - pbar_creation_lock: Lock to serialize tqdm creation and prevent race conditions Returns: Evaluation result dictionary """ - # rag_semaphore controls the entire evaluation process to prevent - # all RAG responses from being generated at once when eval is slow - async with rag_semaphore: + total_cases = len(self.test_cases) + + async with semaphore: question = test_case["question"] ground_truth = test_case["ground_truth"] - # Stage 1: Generate RAG response - try: - rag_response = await self.generate_rag_response( - question=question, client=client - ) - except Exception as e: - logger.error("Error generating response for test %s: %s", idx, str(e)) - progress_counter["completed"] += 1 - return { - "test_number": idx, - "question": question, - "error": str(e), - "metrics": {}, - "ragas_score": 0, - "timestamp": datetime.now().isoformat(), - } + logger.info("[%s/%s] Evaluating: %s...", idx, total_cases, question[:60]) + + # Generate RAG response by calling actual LightRAG API + rag_response = await self.generate_rag_response( + question=question, client=client + ) # *** CRITICAL FIX: Use actual retrieved contexts, NOT ground_truth *** retrieved_contexts = rag_response["contexts"] + # DEBUG: Print what was actually retrieved + logger.debug("📝 Retrieved %s contexts", len(retrieved_contexts)) + if retrieved_contexts: + logger.debug( + "📄 First context preview: %s...", retrieved_contexts[0][:100] + ) + else: + logger.warning("⚠️ No contexts retrieved!") + # Prepare dataset for RAGAS evaluation with CORRECT contexts eval_dataset = Dataset.from_dict( { @@ -451,141 +258,88 @@ class RAGEvaluator: } ) - # Stage 2: Run RAGAS evaluation (controlled by eval_semaphore) - # IMPORTANT: Create fresh metric instances for each evaluation to avoid - # concurrent state conflicts when multiple tasks run in parallel - async with eval_semaphore: - pbar = None - position = None - try: - # Acquire a position from the pool for this tqdm progress bar - position = await position_pool.get() + # Run RAGAS evaluation + try: + eval_results = evaluate( + dataset=eval_dataset, + metrics=[ + faithfulness, + answer_relevancy, + context_recall, + context_precision, + ], + ) - # Serialize tqdm creation to prevent race conditions - # Multiple tasks creating tqdm simultaneously can cause display conflicts - async with pbar_creation_lock: - # Create tqdm progress bar with assigned position to avoid overlapping - # leave=False ensures the progress bar is cleared after completion, - # preventing accumulation of completed bars and allowing position reuse - pbar = tqdm( - total=4, - desc=f"Eval-{idx:02d}", - position=position, - leave=False, - ) - # Give tqdm time to initialize and claim its screen position - await asyncio.sleep(0.05) + # Convert to DataFrame (RAGAS v0.3+ API) + df = eval_results.to_pandas() - eval_results = evaluate( - dataset=eval_dataset, - metrics=[ - Faithfulness(), - AnswerRelevancy(), - ContextRecall(), - ContextPrecision(), - ], - llm=self.eval_llm, - embeddings=self.eval_embeddings, - _pbar=pbar, - ) + # Extract scores from first row + scores_row = df.iloc[0] - # Convert to DataFrame (RAGAS v0.3+ API) - df = eval_results.to_pandas() + # Extract scores (RAGAS v0.3+ uses .to_pandas()) + result = { + "question": question, + "answer": rag_response["answer"][:200] + "..." + if len(rag_response["answer"]) > 200 + else rag_response["answer"], + "ground_truth": ground_truth[:200] + "..." + if len(ground_truth) > 200 + else ground_truth, + "project": test_case.get("project_context", "unknown"), + "metrics": { + "faithfulness": float(scores_row.get("faithfulness", 0)), + "answer_relevance": float( + scores_row.get("answer_relevancy", 0) + ), + "context_recall": float(scores_row.get("context_recall", 0)), + "context_precision": float( + scores_row.get("context_precision", 0) + ), + }, + "timestamp": datetime.now().isoformat(), + } - # Extract scores from first row - scores_row = df.iloc[0] + # Calculate RAGAS score (average of all metrics) + metrics = result["metrics"] + ragas_score = sum(metrics.values()) / len(metrics) if metrics else 0 + result["ragas_score"] = round(ragas_score, 4) - # Extract scores (RAGAS v0.3+ uses .to_pandas()) - result = { - "test_number": idx, - "question": question, - "answer": rag_response["answer"][:200] + "..." - if len(rag_response["answer"]) > 200 - else rag_response["answer"], - "ground_truth": ground_truth[:200] + "..." - if len(ground_truth) > 200 - else ground_truth, - "project": test_case.get("project", "unknown"), - "metrics": { - "faithfulness": float(scores_row.get("faithfulness", 0)), - "answer_relevance": float( - scores_row.get("answer_relevancy", 0) - ), - "context_recall": float( - scores_row.get("context_recall", 0) - ), - "context_precision": float( - scores_row.get("context_precision", 0) - ), - }, - "timestamp": datetime.now().isoformat(), - } + logger.info("✅ Faithfulness: %.4f", metrics["faithfulness"]) + logger.info("✅ Answer Relevance: %.4f", metrics["answer_relevance"]) + logger.info("✅ Context Recall: %.4f", metrics["context_recall"]) + logger.info("✅ Context Precision: %.4f", metrics["context_precision"]) + logger.info("📊 RAGAS Score: %.4f", result["ragas_score"]) - # Calculate RAGAS score (average of all metrics, excluding NaN values) - metrics = result["metrics"] - valid_metrics = [v for v in metrics.values() if not _is_nan(v)] - ragas_score = ( - sum(valid_metrics) / len(valid_metrics) if valid_metrics else 0 - ) - result["ragas_score"] = round(ragas_score, 4) + return result - # Update progress counter - progress_counter["completed"] += 1 - - return result - - except Exception as e: - logger.error("Error evaluating test %s: %s", idx, str(e)) - progress_counter["completed"] += 1 - return { - "test_number": idx, - "question": question, - "error": str(e), - "metrics": {}, - "ragas_score": 0, - "timestamp": datetime.now().isoformat(), - } - finally: - # Force close progress bar to ensure completion - if pbar is not None: - pbar.close() - # Release the position back to the pool for reuse - if position is not None: - await position_pool.put(position) + except Exception as e: + logger.exception("❌ Error evaluating: %s", e) + return { + "question": question, + "error": str(e), + "metrics": {}, + "ragas_score": 0, + "timestamp": datetime.now().isoformat(), + } async def evaluate_responses(self) -> List[Dict[str, Any]]: """ - Evaluate all test cases in parallel with two-stage pipeline and return metrics + Evaluate all test cases in parallel and return metrics Returns: List of evaluation results with metrics """ - # Get evaluation concurrency from environment (default to 2 for parallel evaluation) - max_async = int(os.getenv("EVAL_MAX_CONCURRENT", "2")) + # Get MAX_ASYNC from environment (default to 4 if not set) + max_async = int(os.getenv("MAX_ASYNC", "4")) + logger.info("") logger.info("%s", "=" * 70) - logger.info("🚀 Starting RAGAS Evaluation of LightRAG System") - logger.info("🔧 RAGAS Evaluation (Stage 2): %s concurrent", max_async) + logger.info("🚀 Starting RAGAS Evaluation of Portfolio RAG System") + logger.info("🔧 Parallel evaluations: %s", max_async) logger.info("%s", "=" * 70) - # Create two-stage pipeline semaphores - # Stage 1: RAG generation - allow x2 concurrency to keep evaluation fed - rag_semaphore = asyncio.Semaphore(max_async * 2) - # Stage 2: RAGAS evaluation - primary bottleneck - eval_semaphore = asyncio.Semaphore(max_async) - - # Create progress counter (shared across all tasks) - progress_counter = {"completed": 0} - - # Create position pool for tqdm progress bars - # Positions range from 0 to max_async-1, ensuring no overlapping displays - position_pool = asyncio.Queue() - for i in range(max_async): - await position_pool.put(i) - - # Create lock to serialize tqdm creation and prevent race conditions - # This ensures progress bars are created one at a time, avoiding display conflicts - pbar_creation_lock = asyncio.Lock() + # Create semaphore to limit concurrent evaluations + semaphore = asyncio.Semaphore(max_async) # Create shared HTTP client with connection pooling and proper timeouts # Timeout: 3 minutes for connect, 5 minutes for read (LLM can be slow) @@ -595,27 +349,18 @@ class RAGEvaluator: read=READ_TIMEOUT_SECONDS, ) limits = httpx.Limits( - max_connections=(max_async + 1) * 2, # Allow buffer for RAG stage - max_keepalive_connections=max_async + 1, + max_connections=max_async * 2, # Allow some buffer + max_keepalive_connections=max_async, ) async with httpx.AsyncClient(timeout=timeout, limits=limits) as client: # Create tasks for all test cases tasks = [ - self.evaluate_single_case( - idx, - test_case, - rag_semaphore, - eval_semaphore, - client, - progress_counter, - position_pool, - pbar_creation_lock, - ) + self.evaluate_single_case(idx, test_case, semaphore, client) for idx, test_case in enumerate(self.test_cases, 1) ] - # Run all evaluations in parallel (limited by two-stage semaphores) + # Run all evaluations in parallel (limited by semaphore) results = await asyncio.gather(*tasks) return list(results) @@ -680,95 +425,6 @@ class RAGEvaluator: return csv_path - def _format_metric(self, value: float, width: int = 6) -> str: - """ - Format a metric value for display, handling NaN gracefully - - Args: - value: The metric value to format - width: The width of the formatted string - - Returns: - Formatted string (e.g., "0.8523" or " N/A ") - """ - if _is_nan(value): - return "N/A".center(width) - return f"{value:.4f}".rjust(width) - - def _display_results_table(self, results: List[Dict[str, Any]]): - """ - Display evaluation results in a formatted table - - Args: - results: List of evaluation results - """ - logger.info("") - logger.info("%s", "=" * 115) - logger.info("📊 EVALUATION RESULTS SUMMARY") - logger.info("%s", "=" * 115) - - # Table header - logger.info( - "%-4s | %-50s | %6s | %7s | %6s | %7s | %6s | %6s", - "#", - "Question", - "Faith", - "AnswRel", - "CtxRec", - "CtxPrec", - "RAGAS", - "Status", - ) - logger.info("%s", "-" * 115) - - # Table rows - for result in results: - test_num = result.get("test_number", 0) - question = result.get("question", "") - # Truncate question to 50 chars - question_display = ( - (question[:47] + "...") if len(question) > 50 else question - ) - - metrics = result.get("metrics", {}) - if metrics: - # Success case - format each metric, handling NaN values - faith = metrics.get("faithfulness", 0) - ans_rel = metrics.get("answer_relevance", 0) - ctx_rec = metrics.get("context_recall", 0) - ctx_prec = metrics.get("context_precision", 0) - ragas = result.get("ragas_score", 0) - status = "✓" - - logger.info( - "%-4d | %-50s | %s | %s | %s | %s | %s | %6s", - test_num, - question_display, - self._format_metric(faith, 6), - self._format_metric(ans_rel, 7), - self._format_metric(ctx_rec, 6), - self._format_metric(ctx_prec, 7), - self._format_metric(ragas, 6), - status, - ) - else: - # Error case - error = result.get("error", "Unknown error") - error_display = (error[:20] + "...") if len(error) > 23 else error - logger.info( - "%-4d | %-50s | %6s | %7s | %6s | %7s | %6s | ✗ %s", - test_num, - question_display, - "N/A", - "N/A", - "N/A", - "N/A", - "N/A", - error_display, - ) - - logger.info("%s", "=" * 115) - def _calculate_benchmark_stats( self, results: List[Dict[str, Any]] ) -> Dict[str, Any]: @@ -795,55 +451,45 @@ class RAGEvaluator: "success_rate": 0.0, } - # Calculate averages for each metric (handling NaN values correctly) - # Track both sum and count for each metric to handle NaN values properly - metrics_data = { - "faithfulness": {"sum": 0.0, "count": 0}, - "answer_relevance": {"sum": 0.0, "count": 0}, - "context_recall": {"sum": 0.0, "count": 0}, - "context_precision": {"sum": 0.0, "count": 0}, - "ragas_score": {"sum": 0.0, "count": 0}, + # Calculate averages for each metric (handling NaN values) + metrics_sum = { + "faithfulness": 0.0, + "answer_relevance": 0.0, + "context_recall": 0.0, + "context_precision": 0.0, + "ragas_score": 0.0, } for result in valid_results: metrics = result.get("metrics", {}) - - # For each metric, sum non-NaN values and count them + # Skip NaN values when summing faithfulness = metrics.get("faithfulness", 0) if not _is_nan(faithfulness): - metrics_data["faithfulness"]["sum"] += faithfulness - metrics_data["faithfulness"]["count"] += 1 + metrics_sum["faithfulness"] += faithfulness answer_relevance = metrics.get("answer_relevance", 0) if not _is_nan(answer_relevance): - metrics_data["answer_relevance"]["sum"] += answer_relevance - metrics_data["answer_relevance"]["count"] += 1 + metrics_sum["answer_relevance"] += answer_relevance context_recall = metrics.get("context_recall", 0) if not _is_nan(context_recall): - metrics_data["context_recall"]["sum"] += context_recall - metrics_data["context_recall"]["count"] += 1 + metrics_sum["context_recall"] += context_recall context_precision = metrics.get("context_precision", 0) if not _is_nan(context_precision): - metrics_data["context_precision"]["sum"] += context_precision - metrics_data["context_precision"]["count"] += 1 + metrics_sum["context_precision"] += context_precision ragas_score = result.get("ragas_score", 0) if not _is_nan(ragas_score): - metrics_data["ragas_score"]["sum"] += ragas_score - metrics_data["ragas_score"]["count"] += 1 + metrics_sum["ragas_score"] += ragas_score - # Calculate averages using actual counts for each metric + # Calculate averages + n = len(valid_results) avg_metrics = {} - for metric_name, data in metrics_data.items(): - if data["count"] > 0: - avg_val = data["sum"] / data["count"] - avg_metrics[metric_name] = ( - round(avg_val, 4) if not _is_nan(avg_val) else 0.0 - ) - else: - avg_metrics[metric_name] = 0.0 + for k, v in metrics_sum.items(): + avg_val = v / n if n > 0 else 0 + # Handle NaN in average + avg_metrics[k] = round(avg_val, 4) if not _is_nan(avg_val) else 0.0 # Find min and max RAGAS scores (filter out NaN) ragas_scores = [] @@ -888,9 +534,6 @@ class RAGEvaluator: "results": results, } - # Display results table - self._display_results_table(results) - # Save JSON results json_path = ( self.results_dir @@ -898,9 +541,11 @@ class RAGEvaluator: ) with open(json_path, "w") as f: json.dump(summary, f, indent=2) + logger.info("✅ JSON results saved to: %s", json_path) # Export to CSV csv_path = self._export_to_csv(results) + logger.info("✅ CSV results saved to: %s", csv_path) # Print summary logger.info("") @@ -925,7 +570,7 @@ class RAGEvaluator: logger.info("Average Context Recall: %.4f", avg["context_recall"]) logger.info("Average Context Precision: %.4f", avg["context_precision"]) logger.info("Average RAGAS Score: %.4f", avg["ragas_score"]) - logger.info("%s", "-" * 70) + logger.info("") logger.info( "Min RAGAS Score: %.4f", benchmark_stats["min_ragas_score"], @@ -951,61 +596,28 @@ async def main(): """ Main entry point for RAGAS evaluation - Command-line arguments: - --dataset, -d: Path to test dataset JSON file (default: sample_dataset.json) - --ragendpoint, -r: LightRAG API endpoint URL (default: http://localhost:9621 or $LIGHTRAG_API_URL) - Usage: python lightrag/evaluation/eval_rag_quality.py - python lightrag/evaluation/eval_rag_quality.py --dataset my_test.json - python lightrag/evaluation/eval_rag_quality.py -d my_test.json -r http://localhost:9621 + python lightrag/evaluation/eval_rag_quality.py http://localhost:9621 + python lightrag/evaluation/eval_rag_quality.py http://your-server.com:9621 """ try: - # Parse command-line arguments - parser = argparse.ArgumentParser( - description="RAGAS Evaluation Script for LightRAG System", - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=""" -Examples: - # Use defaults - python lightrag/evaluation/eval_rag_quality.py - - # Specify custom dataset - python lightrag/evaluation/eval_rag_quality.py --dataset my_test.json - - # Specify custom RAG endpoint - python lightrag/evaluation/eval_rag_quality.py --ragendpoint http://my-server.com:9621 - - # Specify both - python lightrag/evaluation/eval_rag_quality.py -d my_test.json -r http://localhost:9621 - """, - ) - - parser.add_argument( - "--dataset", - "-d", - type=str, - default=None, - help="Path to test dataset JSON file (default: sample_dataset.json in evaluation directory)", - ) - - parser.add_argument( - "--ragendpoint", - "-r", - type=str, - default=None, - help="LightRAG API endpoint URL (default: http://localhost:9621 or $LIGHTRAG_API_URL environment variable)", - ) - - args = parser.parse_args() + # Get RAG API URL from command line or environment + rag_api_url = None + if len(sys.argv) > 1: + rag_api_url = sys.argv[1] + logger.info("") logger.info("%s", "=" * 70) logger.info("🔍 RAGAS Evaluation - Using Real LightRAG API") logger.info("%s", "=" * 70) + if rag_api_url: + logger.info("📡 RAG API URL: %s", rag_api_url) + else: + logger.info("📡 RAG API URL: http://localhost:9621 (default)") + logger.info("%s", "=" * 70) - evaluator = RAGEvaluator( - test_dataset_path=args.dataset, rag_api_url=args.ragendpoint - ) + evaluator = RAGEvaluator(rag_api_url=rag_api_url) await evaluator.run() except Exception as e: logger.exception("❌ Error: %s", e)