Implement two-stage pipeline for RAG evaluation with separate semaphores

• Split RAG gen and eval stages
• Add rag_semaphore for stage 1
• Add eval_semaphore for stage 2
• Improve concurrency control
• Update connection pool limits

(cherry picked from commit 83715a3ac1)
This commit is contained in:
yangdx 2025-11-05 00:36:09 +08:00 committed by Raphaël MANSUY
parent dd425e5513
commit c459caed26

View file

@ -1,6 +1,6 @@
#!/usr/bin/env python3
"""
RAGAS Evaluation Script for Portfolio RAG System
RAGAS Evaluation Script for LightRAG System
Evaluates RAG response quality using RAGAS metrics:
- Faithfulness: Is the answer factually accurate based on context?
@ -9,56 +9,98 @@ Evaluates RAG response quality using RAGAS metrics:
- Context Precision: Is retrieved context clean without noise?
Usage:
# Use defaults (sample_dataset.json, http://localhost:9621)
python lightrag/evaluation/eval_rag_quality.py
python lightrag/evaluation/eval_rag_quality.py http://localhost:9621
python lightrag/evaluation/eval_rag_quality.py http://your-rag-server.com:9621
# Specify custom dataset
python lightrag/evaluation/eval_rag_quality.py --dataset my_test.json
python lightrag/evaluation/eval_rag_quality.py -d my_test.json
# Specify custom RAG endpoint
python lightrag/evaluation/eval_rag_quality.py --ragendpoint http://my-server.com:9621
python lightrag/evaluation/eval_rag_quality.py -r http://my-server.com:9621
# Specify both
python lightrag/evaluation/eval_rag_quality.py -d my_test.json -r http://localhost:9621
# Get help
python lightrag/evaluation/eval_rag_quality.py --help
Results are saved to: lightrag/evaluation/results/
- results_YYYYMMDD_HHMMSS.csv (CSV export for analysis)
- results_YYYYMMDD_HHMMSS.json (Full results with details)
Technical Notes:
- Uses stable RAGAS API (LangchainLLMWrapper) for maximum compatibility
- Supports custom OpenAI-compatible endpoints via EVAL_LLM_BINDING_HOST
- Enables bypass_n mode for endpoints that don't support 'n' parameter
- Deprecation warnings are suppressed for cleaner output
"""
import argparse
import asyncio
import csv
import json
import math
import os
import sys
import time
import warnings
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List
import httpx
from dotenv import load_dotenv
from lightrag.utils import logger
# Suppress LangchainLLMWrapper deprecation warning
# We use LangchainLLMWrapper for stability and compatibility with all RAGAS versions
warnings.filterwarnings(
"ignore",
message=".*LangchainLLMWrapper is deprecated.*",
category=DeprecationWarning,
)
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
# Load .env from project root
project_root = Path(__file__).parent.parent.parent
load_dotenv(project_root / ".env")
# Setup OpenAI API key (required for RAGAS evaluation)
# Use LLM_BINDING_API_KEY if OPENAI_API_KEY is not set
if "OPENAI_API_KEY" not in os.environ:
if "LLM_BINDING_API_KEY" in os.environ:
os.environ["OPENAI_API_KEY"] = os.environ["LLM_BINDING_API_KEY"]
else:
os.environ["OPENAI_API_KEY"] = input("Enter your OpenAI API key: ")
# use the .env that is inside the current folder
# allows to use different .env file for each lightrag instance
# the OS environment variables take precedence over the .env file
load_dotenv(dotenv_path=".env", override=False)
# Conditional imports - will raise ImportError if dependencies not installed
try:
from datasets import Dataset
from ragas import evaluate
from ragas.metrics import (
answer_relevancy,
context_precision,
context_recall,
faithfulness,
AnswerRelevancy,
ContextPrecision,
ContextRecall,
Faithfulness,
)
except ImportError as e:
print(f"❌ RAGAS import error: {e}")
print(" Install with: pip install ragas datasets")
sys.exit(1)
from ragas.llms import LangchainLLMWrapper
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from tqdm import tqdm
RAGAS_AVAILABLE = True
except ImportError:
RAGAS_AVAILABLE = False
Dataset = None
evaluate = None
LangchainLLMWrapper = None
CONNECT_TIMEOUT_SECONDS = 180.0
READ_TIMEOUT_SECONDS = 300.0
TOTAL_TIMEOUT_SECONDS = 180.0
def _is_nan(value: Any) -> bool:
"""Return True when value is a float NaN."""
return isinstance(value, float) and math.isnan(value)
class RAGEvaluator:
@ -72,7 +114,75 @@ class RAGEvaluator:
test_dataset_path: Path to test dataset JSON file
rag_api_url: Base URL of LightRAG API (e.g., http://localhost:9621)
If None, will try to read from environment or use default
Environment Variables:
EVAL_LLM_MODEL: LLM model for evaluation (default: gpt-4o-mini)
EVAL_EMBEDDING_MODEL: Embedding model for evaluation (default: text-embedding-3-small)
EVAL_LLM_BINDING_API_KEY: API key for evaluation models (fallback to OPENAI_API_KEY)
EVAL_LLM_BINDING_HOST: Custom endpoint URL for evaluation models (optional)
Raises:
ImportError: If ragas or datasets packages are not installed
EnvironmentError: If EVAL_LLM_BINDING_API_KEY and OPENAI_API_KEY are both not set
"""
# Validate RAGAS dependencies are installed
if not RAGAS_AVAILABLE:
raise ImportError(
"RAGAS dependencies not installed. "
"Install with: pip install ragas datasets"
)
# Configure evaluation models (for RAGAS scoring)
eval_api_key = os.getenv("EVAL_LLM_BINDING_API_KEY") or os.getenv(
"OPENAI_API_KEY"
)
if not eval_api_key:
raise EnvironmentError(
"EVAL_LLM_BINDING_API_KEY or OPENAI_API_KEY is required for evaluation. "
"Set EVAL_LLM_BINDING_API_KEY to use a custom API key, "
"or ensure OPENAI_API_KEY is set."
)
eval_model = os.getenv("EVAL_LLM_MODEL", "gpt-4o-mini")
eval_embedding_model = os.getenv(
"EVAL_EMBEDDING_MODEL", "text-embedding-3-large"
)
eval_base_url = os.getenv("EVAL_LLM_BINDING_HOST")
# Create LLM and Embeddings instances for RAGAS
llm_kwargs = {
"model": eval_model,
"api_key": eval_api_key,
"max_retries": int(os.getenv("EVAL_LLM_MAX_RETRIES", "5")),
"request_timeout": int(os.getenv("EVAL_LLM_TIMEOUT", "180")),
}
embedding_kwargs = {"model": eval_embedding_model, "api_key": eval_api_key}
if eval_base_url:
llm_kwargs["base_url"] = eval_base_url
embedding_kwargs["base_url"] = eval_base_url
# Create base LangChain LLM
base_llm = ChatOpenAI(**llm_kwargs)
self.eval_embeddings = OpenAIEmbeddings(**embedding_kwargs)
# Wrap LLM with LangchainLLMWrapper and enable bypass_n mode for custom endpoints
# This ensures compatibility with endpoints that don't support the 'n' parameter
# by generating multiple outputs through repeated prompts instead of using 'n' parameter
try:
self.eval_llm = LangchainLLMWrapper(
langchain_llm=base_llm,
bypass_n=True, # Enable bypass_n to avoid passing 'n' to OpenAI API
)
logger.debug("Successfully configured bypass_n mode for LLM wrapper")
except Exception as e:
logger.warning(
"Could not configure LangchainLLMWrapper with bypass_n: %s. "
"Using base LLM directly, which may cause warnings with custom endpoints.",
e,
)
self.eval_llm = base_llm
if test_dataset_path is None:
test_dataset_path = Path(__file__).parent / "sample_dataset.json"
@ -87,6 +197,41 @@ class RAGEvaluator:
# Load test dataset
self.test_cases = self._load_test_dataset()
# Store configuration values for display
self.eval_model = eval_model
self.eval_embedding_model = eval_embedding_model
self.eval_base_url = eval_base_url
self.eval_max_retries = llm_kwargs["max_retries"]
self.eval_timeout = llm_kwargs["request_timeout"]
# Display configuration
self._display_configuration()
def _display_configuration(self):
"""Display all evaluation configuration settings"""
logger.info("Evaluation Models:")
logger.info(" • LLM Model: %s", self.eval_model)
logger.info(" • Embedding Model: %s", self.eval_embedding_model)
if self.eval_base_url:
logger.info(" • Custom Endpoint: %s", self.eval_base_url)
logger.info(
" • Bypass N-Parameter: Enabled (use LangchainLLMWrapperfor compatibility)"
)
else:
logger.info(" • Endpoint: OpenAI Official API")
logger.info("Concurrency & Rate Limiting:")
query_top_k = int(os.getenv("EVAL_QUERY_TOP_K", "10"))
logger.info(" • Query Top-K: %s Entities/Relations", query_top_k)
logger.info(" • LLM Max Retries: %s", self.eval_max_retries)
logger.info(" • LLM Timeout: %s seconds", self.eval_timeout)
logger.info("Test Configuration:")
logger.info(" • Total Test Cases: %s", len(self.test_cases))
logger.info(" • Test Dataset: %s", self.test_dataset_path.name)
logger.info(" • LightRAG API: %s", self.rag_api_url)
logger.info(" • Results Directory: %s", self.results_dir.name)
def _load_test_dataset(self) -> List[Dict[str, str]]:
"""Load test cases from JSON file"""
if not self.test_dataset_path.exists():
@ -123,13 +268,22 @@ class RAGEvaluator:
"include_references": True,
"include_chunk_content": True, # NEW: Request chunk content in references
"response_type": "Multiple Paragraphs",
"top_k": 10,
"top_k": int(os.getenv("EVAL_QUERY_TOP_K", "10")),
}
# Get API key from environment for authentication
api_key = os.getenv("LIGHTRAG_API_KEY")
# Prepare headers with optional authentication
headers = {}
if api_key:
headers["X-API-Key"] = api_key
# Single optimized API call - gets both answer AND chunk content
response = await client.post(
f"{self.rag_api_url}/query",
json=payload,
headers=headers if headers else None,
)
response.raise_for_status()
result = response.json()
@ -138,17 +292,31 @@ class RAGEvaluator:
references = result.get("references", [])
# DEBUG: Inspect the API response
print(f" 🔍 References Count: {len(references)}")
logger.debug("🔍 References Count: %s", len(references))
if references:
first_ref = references[0]
print(f" 🔍 First Reference Keys: {list(first_ref.keys())}")
logger.debug("🔍 First Reference Keys: %s", list(first_ref.keys()))
if "content" in first_ref:
print(f" 🔍 Content Preview: {first_ref['content'][:100]}...")
content_preview = first_ref["content"]
if isinstance(content_preview, list) and content_preview:
logger.debug(
"🔍 Content Preview (first chunk): %s...",
content_preview[0][:100],
)
elif isinstance(content_preview, str):
logger.debug("🔍 Content Preview: %s...", content_preview[:100])
# Extract chunk content from enriched references
contexts = [
ref.get("content", "") for ref in references if ref.get("content")
]
# Note: content is now a list of chunks per reference (one file may have multiple chunks)
contexts = []
for ref in references:
content = ref.get("content", [])
if isinstance(content, list):
# Flatten the list: each chunk becomes a separate context
contexts.extend(content)
elif isinstance(content, str):
# Backward compatibility: if content is still a string (shouldn't happen)
contexts.append(content)
return {
"answer": answer,
@ -179,62 +347,82 @@ class RAGEvaluator:
self,
idx: int,
test_case: Dict[str, str],
semaphore: asyncio.Semaphore,
rag_semaphore: asyncio.Semaphore,
eval_semaphore: asyncio.Semaphore,
client: httpx.AsyncClient,
progress_counter: Dict[str, int],
) -> Dict[str, Any]:
"""
Evaluate a single test case with concurrency control
Evaluate a single test case with two-stage pipeline concurrency control
Args:
idx: Test case index (1-based)
test_case: Test case dictionary with question and ground_truth
semaphore: Semaphore to control concurrency
rag_semaphore: Semaphore to control RAG generation concurrency (Stage 1)
eval_semaphore: Semaphore to control RAGAS evaluation concurrency (Stage 2)
client: Shared httpx AsyncClient for connection pooling
progress_counter: Shared dictionary for progress tracking
Returns:
Evaluation result dictionary
"""
async with semaphore:
question = test_case["question"]
ground_truth = test_case["ground_truth"]
question = test_case["question"]
ground_truth = test_case["ground_truth"]
print(f"[{idx}/{len(self.test_cases)}] Evaluating: {question[:60]}...")
# Generate RAG response by calling actual LightRAG API
rag_response = await self.generate_rag_response(
question=question, client=client
)
# *** CRITICAL FIX: Use actual retrieved contexts, NOT ground_truth ***
retrieved_contexts = rag_response["contexts"]
# DEBUG: Print what was actually retrieved
print(f" 📝 Retrieved {len(retrieved_contexts)} contexts")
if retrieved_contexts:
print(f" 📄 First context preview: {retrieved_contexts[0][:100]}...")
else:
print(" ⚠️ WARNING: No contexts retrieved!")
# Prepare dataset for RAGAS evaluation with CORRECT contexts
eval_dataset = Dataset.from_dict(
{
"question": [question],
"answer": [rag_response["answer"]],
"contexts": [retrieved_contexts],
"ground_truth": [ground_truth],
}
)
# Run RAGAS evaluation
# Stage 1: Generate RAG response (controlled by rag_semaphore)
async with rag_semaphore:
try:
rag_response = await self.generate_rag_response(
question=question, client=client
)
except Exception as e:
logger.error("Error generating response for test %s: %s", idx, str(e))
progress_counter["completed"] += 1
return {
"test_number": idx,
"question": question,
"error": str(e),
"metrics": {},
"ragas_score": 0,
"timestamp": datetime.now().isoformat(),
}
# *** CRITICAL FIX: Use actual retrieved contexts, NOT ground_truth ***
retrieved_contexts = rag_response["contexts"]
# DEBUG: Print what was actually retrieved (only in debug mode)
logger.debug("📝 Test %s: Retrieved %s contexts", idx, len(retrieved_contexts))
# Prepare dataset for RAGAS evaluation with CORRECT contexts
eval_dataset = Dataset.from_dict(
{
"question": [question],
"answer": [rag_response["answer"]],
"contexts": [retrieved_contexts],
"ground_truth": [ground_truth],
}
)
# Stage 2: Run RAGAS evaluation (controlled by eval_semaphore)
# IMPORTANT: Create fresh metric instances for each evaluation to avoid
# concurrent state conflicts when multiple tasks run in parallel
async with eval_semaphore:
pbar = None
try:
# Create standard tqdm progress bar for RAGAS evaluation
pbar = tqdm(total=4, desc=f"Eval-{idx}", leave=True)
eval_results = evaluate(
dataset=eval_dataset,
metrics=[
faithfulness,
answer_relevancy,
context_recall,
context_precision,
Faithfulness(),
AnswerRelevancy(),
ContextRecall(),
ContextPrecision(),
],
llm=self.eval_llm,
embeddings=self.eval_embeddings,
_pbar=pbar,
)
# Convert to DataFrame (RAGAS v0.3+ API)
@ -245,6 +433,7 @@ class RAGEvaluator:
# Extract scores (RAGAS v0.3+ uses .to_pandas())
result = {
"test_number": idx,
"question": question,
"answer": rag_response["answer"][:200] + "..."
if len(rag_response["answer"]) > 200
@ -252,7 +441,7 @@ class RAGEvaluator:
"ground_truth": ground_truth[:200] + "..."
if len(ground_truth) > 200
else ground_truth,
"project": test_case.get("project_context", "unknown"),
"project": test_case.get("project", "unknown"),
"metrics": {
"faithfulness": float(scores_row.get("faithfulness", 0)),
"answer_relevance": float(
@ -266,67 +455,87 @@ class RAGEvaluator:
"timestamp": datetime.now().isoformat(),
}
# Calculate RAGAS score (average of all metrics)
# Calculate RAGAS score (average of all metrics, excluding NaN values)
metrics = result["metrics"]
ragas_score = sum(metrics.values()) / len(metrics) if metrics else 0
valid_metrics = [v for v in metrics.values() if not _is_nan(v)]
ragas_score = (
sum(valid_metrics) / len(valid_metrics) if valid_metrics else 0
)
result["ragas_score"] = round(ragas_score, 4)
# Print metrics
print(f" ✅ Faithfulness: {metrics['faithfulness']:.4f}")
print(f" ✅ Answer Relevance: {metrics['answer_relevance']:.4f}")
print(f" ✅ Context Recall: {metrics['context_recall']:.4f}")
print(f" ✅ Context Precision: {metrics['context_precision']:.4f}")
print(f" 📊 RAGAS Score: {result['ragas_score']:.4f}\n")
# Update progress counter
progress_counter["completed"] += 1
return result
except Exception as e:
import traceback
print(f" ❌ Error evaluating: {str(e)}")
print(f" 🔍 Full traceback:\n{traceback.format_exc()}\n")
logger.error("Error evaluating test %s: %s", idx, str(e))
progress_counter["completed"] += 1
return {
"test_number": idx,
"question": question,
"error": str(e),
"metrics": {},
"ragas_score": 0,
"timestamp": datetime.now().isoformat(),
}
finally:
# Force close progress bar to ensure completion
if pbar is not None:
pbar.close()
async def evaluate_responses(self) -> List[Dict[str, Any]]:
"""
Evaluate all test cases in parallel and return metrics
Evaluate all test cases in parallel with two-stage pipeline and return metrics
Returns:
List of evaluation results with metrics
"""
# Get MAX_ASYNC from environment (default to 4 if not set)
max_async = int(os.getenv("MAX_ASYNC", "4"))
# Get evaluation concurrency from environment (default to 2 for parallel evaluation)
max_async = int(os.getenv("EVAL_MAX_CONCURRENT", "2"))
print("\n" + "=" * 70)
print("🚀 Starting RAGAS Evaluation of Portfolio RAG System")
print(f"🔧 Parallel evaluations: {max_async}")
print("=" * 70 + "\n")
logger.info("%s", "=" * 70)
logger.info("🚀 Starting RAGAS Evaluation of LightRAG System")
logger.info("🔧 Two-Stage Pipeline Configuration:")
logger.info(" • RAGAS Evaluation (Stage 2): %s concurrent", max_async)
logger.info("%s", "=" * 70)
# Create semaphore to limit concurrent evaluations
semaphore = asyncio.Semaphore(max_async)
# Create two-stage pipeline semaphores
# Stage 1: RAG generation - allow +1 concurrency to keep evaluation fed
rag_semaphore = asyncio.Semaphore(max_async + 1)
# Stage 2: RAGAS evaluation - primary bottleneck
eval_semaphore = asyncio.Semaphore(max_async)
# Create progress counter (shared across all tasks)
progress_counter = {"completed": 0}
# Create shared HTTP client with connection pooling and proper timeouts
# Timeout: 3 minutes for connect, 5 minutes for read (LLM can be slow)
timeout = httpx.Timeout(180.0, connect=180.0, read=300.0)
timeout = httpx.Timeout(
TOTAL_TIMEOUT_SECONDS,
connect=CONNECT_TIMEOUT_SECONDS,
read=READ_TIMEOUT_SECONDS,
)
limits = httpx.Limits(
max_connections=max_async * 2, # Allow some buffer
max_keepalive_connections=max_async,
max_connections=(max_async + 1) * 2, # Allow buffer for RAG stage
max_keepalive_connections=max_async + 1,
)
async with httpx.AsyncClient(timeout=timeout, limits=limits) as client:
# Create tasks for all test cases
tasks = [
self.evaluate_single_case(idx, test_case, semaphore, client)
self.evaluate_single_case(
idx,
test_case,
rag_semaphore,
eval_semaphore,
client,
progress_counter,
)
for idx, test_case in enumerate(self.test_cases, 1)
]
# Run all evaluations in parallel (limited by semaphore)
# Run all evaluations in parallel (limited by two-stage semaphores)
results = await asyncio.gather(*tasks)
return list(results)
@ -391,6 +600,94 @@ class RAGEvaluator:
return csv_path
def _format_metric(self, value: float, width: int = 6) -> str:
"""
Format a metric value for display, handling NaN gracefully
Args:
value: The metric value to format
width: The width of the formatted string
Returns:
Formatted string (e.g., "0.8523" or " N/A ")
"""
if _is_nan(value):
return "N/A".center(width)
return f"{value:.4f}".rjust(width)
def _display_results_table(self, results: List[Dict[str, Any]]):
"""
Display evaluation results in a formatted table
Args:
results: List of evaluation results
"""
logger.info("%s", "=" * 115)
logger.info("📊 EVALUATION RESULTS SUMMARY")
logger.info("%s", "=" * 115)
# Table header
logger.info(
"%-4s | %-50s | %6s | %7s | %6s | %7s | %6s | %6s",
"#",
"Question",
"Faith",
"AnswRel",
"CtxRec",
"CtxPrec",
"RAGAS",
"Status",
)
logger.info("%s", "-" * 115)
# Table rows
for result in results:
test_num = result.get("test_number", 0)
question = result.get("question", "")
# Truncate question to 50 chars
question_display = (
(question[:47] + "...") if len(question) > 50 else question
)
metrics = result.get("metrics", {})
if metrics:
# Success case - format each metric, handling NaN values
faith = metrics.get("faithfulness", 0)
ans_rel = metrics.get("answer_relevance", 0)
ctx_rec = metrics.get("context_recall", 0)
ctx_prec = metrics.get("context_precision", 0)
ragas = result.get("ragas_score", 0)
status = ""
logger.info(
"%-4d | %-50s | %s | %s | %s | %s | %s | %6s",
test_num,
question_display,
self._format_metric(faith, 6),
self._format_metric(ans_rel, 7),
self._format_metric(ctx_rec, 6),
self._format_metric(ctx_prec, 7),
self._format_metric(ragas, 6),
status,
)
else:
# Error case
error = result.get("error", "Unknown error")
error_display = (error[:20] + "...") if len(error) > 23 else error
logger.info(
"%-4d | %-50s | %6s | %7s | %6s | %7s | %6s | ✗ %s",
test_num,
question_display,
"N/A",
"N/A",
"N/A",
"N/A",
"N/A",
error_display,
)
logger.info("%s", "=" * 115)
def _calculate_benchmark_stats(
self, results: List[Dict[str, Any]]
) -> Dict[str, Any]:
@ -417,69 +714,61 @@ class RAGEvaluator:
"success_rate": 0.0,
}
# Calculate averages for each metric (handling NaN values)
import math
metrics_sum = {
"faithfulness": 0.0,
"answer_relevance": 0.0,
"context_recall": 0.0,
"context_precision": 0.0,
"ragas_score": 0.0,
# Calculate averages for each metric (handling NaN values correctly)
# Track both sum and count for each metric to handle NaN values properly
metrics_data = {
"faithfulness": {"sum": 0.0, "count": 0},
"answer_relevance": {"sum": 0.0, "count": 0},
"context_recall": {"sum": 0.0, "count": 0},
"context_precision": {"sum": 0.0, "count": 0},
"ragas_score": {"sum": 0.0, "count": 0},
}
for result in valid_results:
metrics = result.get("metrics", {})
# Skip NaN values when summing
# For each metric, sum non-NaN values and count them
faithfulness = metrics.get("faithfulness", 0)
if (
not math.isnan(faithfulness)
if isinstance(faithfulness, float)
else True
):
metrics_sum["faithfulness"] += faithfulness
if not _is_nan(faithfulness):
metrics_data["faithfulness"]["sum"] += faithfulness
metrics_data["faithfulness"]["count"] += 1
answer_relevance = metrics.get("answer_relevance", 0)
if (
not math.isnan(answer_relevance)
if isinstance(answer_relevance, float)
else True
):
metrics_sum["answer_relevance"] += answer_relevance
if not _is_nan(answer_relevance):
metrics_data["answer_relevance"]["sum"] += answer_relevance
metrics_data["answer_relevance"]["count"] += 1
context_recall = metrics.get("context_recall", 0)
if (
not math.isnan(context_recall)
if isinstance(context_recall, float)
else True
):
metrics_sum["context_recall"] += context_recall
if not _is_nan(context_recall):
metrics_data["context_recall"]["sum"] += context_recall
metrics_data["context_recall"]["count"] += 1
context_precision = metrics.get("context_precision", 0)
if (
not math.isnan(context_precision)
if isinstance(context_precision, float)
else True
):
metrics_sum["context_precision"] += context_precision
if not _is_nan(context_precision):
metrics_data["context_precision"]["sum"] += context_precision
metrics_data["context_precision"]["count"] += 1
ragas_score = result.get("ragas_score", 0)
if not math.isnan(ragas_score) if isinstance(ragas_score, float) else True:
metrics_sum["ragas_score"] += ragas_score
if not _is_nan(ragas_score):
metrics_data["ragas_score"]["sum"] += ragas_score
metrics_data["ragas_score"]["count"] += 1
# Calculate averages
n = len(valid_results)
# Calculate averages using actual counts for each metric
avg_metrics = {}
for k, v in metrics_sum.items():
avg_val = v / n if n > 0 else 0
# Handle NaN in average
avg_metrics[k] = round(avg_val, 4) if not math.isnan(avg_val) else 0.0
for metric_name, data in metrics_data.items():
if data["count"] > 0:
avg_val = data["sum"] / data["count"]
avg_metrics[metric_name] = (
round(avg_val, 4) if not _is_nan(avg_val) else 0.0
)
else:
avg_metrics[metric_name] = 0.0
# Find min and max RAGAS scores (filter out NaN)
ragas_scores = []
for r in valid_results:
score = r.get("ragas_score", 0)
if isinstance(score, float) and math.isnan(score):
if _is_nan(score):
continue # Skip NaN values
ragas_scores.append(score)
@ -525,43 +814,57 @@ class RAGEvaluator:
)
with open(json_path, "w") as f:
json.dump(summary, f, indent=2)
print(f"✅ JSON results saved to: {json_path}")
# Display results table
self._display_results_table(results)
logger.info("✅ JSON results saved to: %s", json_path)
# Export to CSV
csv_path = self._export_to_csv(results)
print(f"✅ CSV results saved to: {csv_path}")
logger.info("✅ CSV results saved to: %s", csv_path)
# Print summary
print("\n" + "=" * 70)
print("📊 EVALUATION COMPLETE")
print("=" * 70)
print(f"Total Tests: {len(results)}")
print(f"Successful: {benchmark_stats['successful_tests']}")
print(f"Failed: {benchmark_stats['failed_tests']}")
print(f"Success Rate: {benchmark_stats['success_rate']:.2f}%")
print(f"Elapsed Time: {elapsed_time:.2f} seconds")
print(f"Avg Time/Test: {elapsed_time / len(results):.2f} seconds")
logger.info("")
logger.info("%s", "=" * 70)
logger.info("📊 EVALUATION COMPLETE")
logger.info("%s", "=" * 70)
logger.info("Total Tests: %s", len(results))
logger.info("Successful: %s", benchmark_stats["successful_tests"])
logger.info("Failed: %s", benchmark_stats["failed_tests"])
logger.info("Success Rate: %.2f%%", benchmark_stats["success_rate"])
logger.info("Elapsed Time: %.2f seconds", elapsed_time)
logger.info("Avg Time/Test: %.2f seconds", elapsed_time / len(results))
# Print benchmark metrics
print("\n" + "=" * 70)
print("📈 BENCHMARK RESULTS (Averages)")
print("=" * 70)
logger.info("")
logger.info("%s", "=" * 70)
logger.info("📈 BENCHMARK RESULTS (Average)")
logger.info("%s", "=" * 70)
avg = benchmark_stats["average_metrics"]
print(f"Average Faithfulness: {avg['faithfulness']:.4f}")
print(f"Average Answer Relevance: {avg['answer_relevance']:.4f}")
print(f"Average Context Recall: {avg['context_recall']:.4f}")
print(f"Average Context Precision: {avg['context_precision']:.4f}")
print(f"Average RAGAS Score: {avg['ragas_score']:.4f}")
print(f"\nMin RAGAS Score: {benchmark_stats['min_ragas_score']:.4f}")
print(f"Max RAGAS Score: {benchmark_stats['max_ragas_score']:.4f}")
logger.info("Average Faithfulness: %.4f", avg["faithfulness"])
logger.info("Average Answer Relevance: %.4f", avg["answer_relevance"])
logger.info("Average Context Recall: %.4f", avg["context_recall"])
logger.info("Average Context Precision: %.4f", avg["context_precision"])
logger.info("Average RAGAS Score: %.4f", avg["ragas_score"])
logger.info("")
logger.info(
"Min RAGAS Score: %.4f",
benchmark_stats["min_ragas_score"],
)
logger.info(
"Max RAGAS Score: %.4f",
benchmark_stats["max_ragas_score"],
)
print("\n" + "=" * 70)
print("📁 GENERATED FILES")
print("=" * 70)
print(f"Results Dir: {self.results_dir.absolute()}")
print(f" • CSV: {csv_path.name}")
print(f" • JSON: {json_path.name}")
print("=" * 70 + "\n")
logger.info("")
logger.info("%s", "=" * 70)
logger.info("📁 GENERATED FILES")
logger.info("%s", "=" * 70)
logger.info("Results Dir: %s", self.results_dir.absolute())
logger.info(" • CSV: %s", csv_path.name)
logger.info(" • JSON: %s", json_path.name)
logger.info("%s", "=" * 70)
return summary
@ -570,30 +873,64 @@ async def main():
"""
Main entry point for RAGAS evaluation
Command-line arguments:
--dataset, -d: Path to test dataset JSON file (default: sample_dataset.json)
--ragendpoint, -r: LightRAG API endpoint URL (default: http://localhost:9621 or $LIGHTRAG_API_URL)
Usage:
python lightrag/evaluation/eval_rag_quality.py
python lightrag/evaluation/eval_rag_quality.py http://localhost:9621
python lightrag/evaluation/eval_rag_quality.py http://your-server.com:9621
python lightrag/evaluation/eval_rag_quality.py --dataset my_test.json
python lightrag/evaluation/eval_rag_quality.py -d my_test.json -r http://localhost:9621
"""
try:
# Get RAG API URL from command line or environment
rag_api_url = None
if len(sys.argv) > 1:
rag_api_url = sys.argv[1]
# Parse command-line arguments
parser = argparse.ArgumentParser(
description="RAGAS Evaluation Script for LightRAG System",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Use defaults
python lightrag/evaluation/eval_rag_quality.py
print("\n" + "=" * 70)
print("🔍 RAGAS Evaluation - Using Real LightRAG API")
print("=" * 70)
if rag_api_url:
print(f"📡 RAG API URL: {rag_api_url}")
else:
print("📡 RAG API URL: http://localhost:9621 (default)")
print("=" * 70 + "\n")
# Specify custom dataset
python lightrag/evaluation/eval_rag_quality.py --dataset my_test.json
evaluator = RAGEvaluator(rag_api_url=rag_api_url)
# Specify custom RAG endpoint
python lightrag/evaluation/eval_rag_quality.py --ragendpoint http://my-server.com:9621
# Specify both
python lightrag/evaluation/eval_rag_quality.py -d my_test.json -r http://localhost:9621
""",
)
parser.add_argument(
"--dataset",
"-d",
type=str,
default=None,
help="Path to test dataset JSON file (default: sample_dataset.json in evaluation directory)",
)
parser.add_argument(
"--ragendpoint",
"-r",
type=str,
default=None,
help="LightRAG API endpoint URL (default: http://localhost:9621 or $LIGHTRAG_API_URL environment variable)",
)
args = parser.parse_args()
logger.info("%s", "=" * 70)
logger.info("🔍 RAGAS Evaluation - Using Real LightRAG API")
logger.info("%s", "=" * 70)
evaluator = RAGEvaluator(
test_dataset_path=args.dataset, rag_api_url=args.ragendpoint
)
await evaluator.run()
except Exception as e:
print(f"\n❌ Error: {str(e)}\n")
logger.exception("❌ Error: %s", e)
sys.exit(1)