This commit is contained in:
Raphaël MANSUY 2025-12-04 19:19:00 +08:00
parent 1afb70f563
commit 2debc9288d

View file

@ -16,6 +16,12 @@ Usage:
Results are saved to: lightrag/evaluation/results/
- results_YYYYMMDD_HHMMSS.csv (CSV export for analysis)
- results_YYYYMMDD_HHMMSS.json (Full results with details)
Note on Custom OpenAI-Compatible Endpoints:
This script uses bypass_n=True mode for answer_relevancy metric to ensure
compatibility with custom endpoints that may not support OpenAI's 'n' parameter
for multiple completions. This generates multiple outputs through repeated prompts
instead, maintaining evaluation quality while supporting broader endpoint compatibility.
"""
import asyncio
@ -36,46 +42,31 @@ from lightrag.utils import logger
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
# Load .env from project root
project_root = Path(__file__).parent.parent.parent
load_dotenv(project_root / ".env")
# Setup OpenAI API key (required for RAGAS evaluation)
# Use LLM_BINDING_API_KEY when running with the OpenAI binding
llm_binding = os.getenv("LLM_BINDING", "").lower()
llm_binding_key = os.getenv("LLM_BINDING_API_KEY")
# Validate LLM_BINDING is set to openai
if llm_binding != "openai":
logger.error(
"❌ LLM_BINDING must be set to 'openai'. Current value: '%s'",
llm_binding or "(not set)",
)
sys.exit(1)
# Validate LLM_BINDING_API_KEY exists
if not llm_binding_key:
logger.error("❌ LLM_BINDING_API_KEY is not set. Cannot run RAGAS evaluation.")
sys.exit(1)
# Set OPENAI_API_KEY from LLM_BINDING_API_KEY
os.environ["OPENAI_API_KEY"] = llm_binding_key
logger.info("✅ LLM_BINDING: openai")
# use the .env that is inside the current folder
# allows to use different .env file for each lightrag instance
# the OS environment variables take precedence over the .env file
load_dotenv(dotenv_path=".env", override=False)
# Conditional imports - will raise ImportError if dependencies not installed
try:
from datasets import Dataset
from ragas import evaluate
from ragas.metrics import (
answer_relevancy,
context_precision,
context_recall,
faithfulness,
AnswerRelevancy,
ContextPrecision,
ContextRecall,
Faithfulness,
)
except ImportError as e:
logger.error("❌ RAGAS import error: %s", e)
logger.error(" Install with: pip install ragas datasets")
sys.exit(1)
from ragas.llms import LangchainLLMWrapper
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
RAGAS_AVAILABLE = True
except ImportError:
RAGAS_AVAILABLE = False
Dataset = None
evaluate = None
LangchainLLMWrapper = None
CONNECT_TIMEOUT_SECONDS = 180.0
@ -99,7 +90,75 @@ class RAGEvaluator:
test_dataset_path: Path to test dataset JSON file
rag_api_url: Base URL of LightRAG API (e.g., http://localhost:9621)
If None, will try to read from environment or use default
Environment Variables:
EVAL_LLM_MODEL: LLM model for evaluation (default: gpt-4o-mini)
EVAL_EMBEDDING_MODEL: Embedding model for evaluation (default: text-embedding-3-small)
EVAL_LLM_BINDING_API_KEY: API key for evaluation models (fallback to OPENAI_API_KEY)
EVAL_LLM_BINDING_HOST: Custom endpoint URL for evaluation models (optional)
Raises:
ImportError: If ragas or datasets packages are not installed
EnvironmentError: If EVAL_LLM_BINDING_API_KEY and OPENAI_API_KEY are both not set
"""
# Validate RAGAS dependencies are installed
if not RAGAS_AVAILABLE:
raise ImportError(
"RAGAS dependencies not installed. "
"Install with: pip install ragas datasets"
)
# Configure evaluation models (for RAGAS scoring)
eval_api_key = os.getenv("EVAL_LLM_BINDING_API_KEY") or os.getenv(
"OPENAI_API_KEY"
)
if not eval_api_key:
raise EnvironmentError(
"EVAL_LLM_BINDING_API_KEY or OPENAI_API_KEY is required for evaluation. "
"Set EVAL_LLM_BINDING_API_KEY to use a custom API key, "
"or ensure OPENAI_API_KEY is set."
)
eval_model = os.getenv("EVAL_LLM_MODEL", "gpt-4.1")
eval_embedding_model = os.getenv(
"EVAL_EMBEDDING_MODEL", "text-embedding-3-large"
)
eval_base_url = os.getenv("EVAL_LLM_BINDING_HOST")
# Create LLM and Embeddings instances for RAGAS
llm_kwargs = {
"model": eval_model,
"api_key": eval_api_key,
"max_retries": int(os.getenv("EVAL_LLM_MAX_RETRIES", "5")),
"request_timeout": int(os.getenv("EVAL_LLM_TIMEOUT", "180")),
}
embedding_kwargs = {"model": eval_embedding_model, "api_key": eval_api_key}
if eval_base_url:
llm_kwargs["base_url"] = eval_base_url
embedding_kwargs["base_url"] = eval_base_url
# Create base LangChain LLM
base_llm = ChatOpenAI(**llm_kwargs)
self.eval_embeddings = OpenAIEmbeddings(**embedding_kwargs)
# Wrap LLM with LangchainLLMWrapper and enable bypass_n mode for custom endpoints
# This ensures compatibility with endpoints that don't support the 'n' parameter
# by generating multiple outputs through repeated prompts instead of using 'n' parameter
try:
self.eval_llm = LangchainLLMWrapper(
langchain_llm=base_llm,
bypass_n=True, # Enable bypass_n to avoid passing 'n' to OpenAI API
)
logger.debug("Successfully configured bypass_n mode for LLM wrapper")
except Exception as e:
logger.warning(
"Could not configure LangchainLLMWrapper with bypass_n: %s. "
"Using base LLM directly, which may cause warnings with custom endpoints.",
e,
)
self.eval_llm = base_llm
if test_dataset_path is None:
test_dataset_path = Path(__file__).parent / "sample_dataset.json"
@ -114,6 +173,41 @@ class RAGEvaluator:
# Load test dataset
self.test_cases = self._load_test_dataset()
# Store configuration values for display
self.eval_model = eval_model
self.eval_embedding_model = eval_embedding_model
self.eval_base_url = eval_base_url
self.eval_max_retries = llm_kwargs["max_retries"]
self.eval_timeout = llm_kwargs["request_timeout"]
# Display configuration
self._display_configuration()
def _display_configuration(self):
"""Display all evaluation configuration settings"""
logger.info("EVALUATION CONFIGURATION")
logger.info(" Evaluation Models:")
logger.info(" • LLM Model: %s", self.eval_model)
logger.info(" • Embedding Model: %s", self.eval_embedding_model)
if self.eval_base_url:
logger.info(" • Custom Endpoint: %s", self.eval_base_url)
logger.info(" • Bypass N-Parameter: Enabled (for compatibility)")
else:
logger.info(" • Endpoint: OpenAI Official API")
logger.info(" Concurrency & Rate Limiting:")
query_top_k = int(os.getenv("EVAL_QUERY_TOP_K", "10"))
logger.info(" • Query Top-K: %s Entities/Relations", query_top_k)
logger.info(" • LLM Max Retries: %s", self.eval_max_retries)
logger.info(" • LLM Timeout: %s seconds", self.eval_timeout)
logger.info(" Test Configuration:")
logger.info(" • Total Test Cases: %s", len(self.test_cases))
logger.info(" • Test Dataset: %s", self.test_dataset_path.name)
logger.info(" • LightRAG API: %s", self.rag_api_url)
logger.info(" • Results Directory: %s", self.results_dir.name)
def _load_test_dataset(self) -> List[Dict[str, str]]:
"""Load test cases from JSON file"""
if not self.test_dataset_path.exists():
@ -150,13 +244,22 @@ class RAGEvaluator:
"include_references": True,
"include_chunk_content": True, # NEW: Request chunk content in references
"response_type": "Multiple Paragraphs",
"top_k": 10,
"top_k": int(os.getenv("EVAL_QUERY_TOP_K", "10")),
}
# Get API key from environment for authentication
api_key = os.getenv("LIGHTRAG_API_KEY")
# Prepare headers with optional authentication
headers = {}
if api_key:
headers["X-API-Key"] = api_key
# Single optimized API call - gets both answer AND chunk content
response = await client.post(
f"{self.rag_api_url}/query",
json=payload,
headers=headers if headers else None,
)
response.raise_for_status()
result = response.json()
@ -222,6 +325,7 @@ class RAGEvaluator:
test_case: Dict[str, str],
semaphore: asyncio.Semaphore,
client: httpx.AsyncClient,
progress_counter: Dict[str, int],
) -> Dict[str, Any]:
"""
Evaluate a single test case with concurrency control
@ -231,34 +335,39 @@ class RAGEvaluator:
test_case: Test case dictionary with question and ground_truth
semaphore: Semaphore to control concurrency
client: Shared httpx AsyncClient for connection pooling
progress_counter: Shared dictionary for progress tracking
Returns:
Evaluation result dictionary
"""
total_cases = len(self.test_cases)
async with semaphore:
question = test_case["question"]
ground_truth = test_case["ground_truth"]
logger.info("[%s/%s] Evaluating: %s...", idx, total_cases, question[:60])
# Generate RAG response by calling actual LightRAG API
rag_response = await self.generate_rag_response(
question=question, client=client
)
try:
rag_response = await self.generate_rag_response(
question=question, client=client
)
except Exception as e:
logger.error("Error generating response for test %s: %s", idx, str(e))
progress_counter["completed"] += 1
return {
"test_number": idx,
"question": question,
"error": str(e),
"metrics": {},
"ragas_score": 0,
"timestamp": datetime.now().isoformat(),
}
# *** CRITICAL FIX: Use actual retrieved contexts, NOT ground_truth ***
retrieved_contexts = rag_response["contexts"]
# DEBUG: Print what was actually retrieved
logger.debug("📝 Retrieved %s contexts", len(retrieved_contexts))
if retrieved_contexts:
logger.debug(
"📄 First context preview: %s...", retrieved_contexts[0][:100]
)
else:
logger.warning("⚠️ No contexts retrieved!")
# DEBUG: Print what was actually retrieved (only in debug mode)
logger.debug(
"📝 Test %s: Retrieved %s contexts", idx, len(retrieved_contexts)
)
# Prepare dataset for RAGAS evaluation with CORRECT contexts
eval_dataset = Dataset.from_dict(
@ -271,15 +380,19 @@ class RAGEvaluator:
)
# Run RAGAS evaluation
# IMPORTANT: Create fresh metric instances for each evaluation to avoid
# concurrent state conflicts when multiple tasks run in parallel
try:
eval_results = evaluate(
dataset=eval_dataset,
metrics=[
faithfulness,
answer_relevancy,
context_recall,
context_precision,
Faithfulness(),
AnswerRelevancy(),
ContextRecall(),
ContextPrecision(),
],
llm=self.eval_llm,
embeddings=self.eval_embeddings,
)
# Convert to DataFrame (RAGAS v0.3+ API)
@ -290,6 +403,7 @@ class RAGEvaluator:
# Extract scores (RAGAS v0.3+ uses .to_pandas())
result = {
"test_number": idx,
"question": question,
"answer": rag_response["answer"][:200] + "..."
if len(rag_response["answer"]) > 200
@ -297,7 +411,7 @@ class RAGEvaluator:
"ground_truth": ground_truth[:200] + "..."
if len(ground_truth) > 200
else ground_truth,
"project": test_case.get("project_context", "unknown"),
"project": test_case.get("project", "unknown"),
"metrics": {
"faithfulness": float(scores_row.get("faithfulness", 0)),
"answer_relevance": float(
@ -311,22 +425,24 @@ class RAGEvaluator:
"timestamp": datetime.now().isoformat(),
}
# Calculate RAGAS score (average of all metrics)
# Calculate RAGAS score (average of all metrics, excluding NaN values)
metrics = result["metrics"]
ragas_score = sum(metrics.values()) / len(metrics) if metrics else 0
valid_metrics = [v for v in metrics.values() if not _is_nan(v)]
ragas_score = (
sum(valid_metrics) / len(valid_metrics) if valid_metrics else 0
)
result["ragas_score"] = round(ragas_score, 4)
logger.info("✅ Faithfulness: %.4f", metrics["faithfulness"])
logger.info("✅ Answer Relevance: %.4f", metrics["answer_relevance"])
logger.info("✅ Context Recall: %.4f", metrics["context_recall"])
logger.info("✅ Context Precision: %.4f", metrics["context_precision"])
logger.info("📊 RAGAS Score: %.4f", result["ragas_score"])
# Update progress counter
progress_counter["completed"] += 1
return result
except Exception as e:
logger.exception("❌ Error evaluating: %s", e)
logger.error("Error evaluating test %s: %s", idx, str(e))
progress_counter["completed"] += 1
return {
"test_number": idx,
"question": question,
"error": str(e),
"metrics": {},
@ -341,18 +457,21 @@ class RAGEvaluator:
Returns:
List of evaluation results with metrics
"""
# Get MAX_ASYNC from environment (default to 4 if not set)
max_async = int(os.getenv("MAX_ASYNC", "4"))
# Get evaluation concurrency from environment (default to 1 for serial evaluation)
max_async = int(os.getenv("EVAL_MAX_CONCURRENT", "3"))
logger.info("")
logger.info("%s", "=" * 70)
logger.info("🚀 Starting RAGAS Evaluation of Portfolio RAG System")
logger.info("🔧 Parallel evaluations: %s", max_async)
logger.info("🔧 Concurrent evaluations: %s", max_async)
logger.info("%s", "=" * 70)
# Create semaphore to limit concurrent evaluations
semaphore = asyncio.Semaphore(max_async)
# Create progress counter (shared across all tasks)
progress_counter = {"completed": 0}
# Create shared HTTP client with connection pooling and proper timeouts
# Timeout: 3 minutes for connect, 5 minutes for read (LLM can be slow)
timeout = httpx.Timeout(
@ -368,7 +487,9 @@ class RAGEvaluator:
async with httpx.AsyncClient(timeout=timeout, limits=limits) as client:
# Create tasks for all test cases
tasks = [
self.evaluate_single_case(idx, test_case, semaphore, client)
self.evaluate_single_case(
idx, test_case, semaphore, client, progress_counter
)
for idx, test_case in enumerate(self.test_cases, 1)
]
@ -437,6 +558,95 @@ class RAGEvaluator:
return csv_path
def _format_metric(self, value: float, width: int = 6) -> str:
"""
Format a metric value for display, handling NaN gracefully
Args:
value: The metric value to format
width: The width of the formatted string
Returns:
Formatted string (e.g., "0.8523" or " N/A ")
"""
if _is_nan(value):
return "N/A".center(width)
return f"{value:.4f}".rjust(width)
def _display_results_table(self, results: List[Dict[str, Any]]):
"""
Display evaluation results in a formatted table
Args:
results: List of evaluation results
"""
logger.info("")
logger.info("%s", "=" * 115)
logger.info("📊 EVALUATION RESULTS SUMMARY")
logger.info("%s", "=" * 115)
# Table header
logger.info(
"%-4s | %-50s | %6s | %7s | %6s | %7s | %6s | %6s",
"#",
"Question",
"Faith",
"AnswRel",
"CtxRec",
"CtxPrec",
"RAGAS",
"Status",
)
logger.info("%s", "-" * 115)
# Table rows
for result in results:
test_num = result.get("test_number", 0)
question = result.get("question", "")
# Truncate question to 50 chars
question_display = (
(question[:47] + "...") if len(question) > 50 else question
)
metrics = result.get("metrics", {})
if metrics:
# Success case - format each metric, handling NaN values
faith = metrics.get("faithfulness", 0)
ans_rel = metrics.get("answer_relevance", 0)
ctx_rec = metrics.get("context_recall", 0)
ctx_prec = metrics.get("context_precision", 0)
ragas = result.get("ragas_score", 0)
status = ""
logger.info(
"%-4d | %-50s | %s | %s | %s | %s | %s | %6s",
test_num,
question_display,
self._format_metric(faith, 6),
self._format_metric(ans_rel, 7),
self._format_metric(ctx_rec, 6),
self._format_metric(ctx_prec, 7),
self._format_metric(ragas, 6),
status,
)
else:
# Error case
error = result.get("error", "Unknown error")
error_display = (error[:20] + "...") if len(error) > 23 else error
logger.info(
"%-4d | %-50s | %6s | %7s | %6s | %7s | %6s | ✗ %s",
test_num,
question_display,
"N/A",
"N/A",
"N/A",
"N/A",
"N/A",
error_display,
)
logger.info("%s", "=" * 115)
def _calculate_benchmark_stats(
self, results: List[Dict[str, Any]]
) -> Dict[str, Any]:
@ -463,45 +673,55 @@ class RAGEvaluator:
"success_rate": 0.0,
}
# Calculate averages for each metric (handling NaN values)
metrics_sum = {
"faithfulness": 0.0,
"answer_relevance": 0.0,
"context_recall": 0.0,
"context_precision": 0.0,
"ragas_score": 0.0,
# Calculate averages for each metric (handling NaN values correctly)
# Track both sum and count for each metric to handle NaN values properly
metrics_data = {
"faithfulness": {"sum": 0.0, "count": 0},
"answer_relevance": {"sum": 0.0, "count": 0},
"context_recall": {"sum": 0.0, "count": 0},
"context_precision": {"sum": 0.0, "count": 0},
"ragas_score": {"sum": 0.0, "count": 0},
}
for result in valid_results:
metrics = result.get("metrics", {})
# Skip NaN values when summing
# For each metric, sum non-NaN values and count them
faithfulness = metrics.get("faithfulness", 0)
if not _is_nan(faithfulness):
metrics_sum["faithfulness"] += faithfulness
metrics_data["faithfulness"]["sum"] += faithfulness
metrics_data["faithfulness"]["count"] += 1
answer_relevance = metrics.get("answer_relevance", 0)
if not _is_nan(answer_relevance):
metrics_sum["answer_relevance"] += answer_relevance
metrics_data["answer_relevance"]["sum"] += answer_relevance
metrics_data["answer_relevance"]["count"] += 1
context_recall = metrics.get("context_recall", 0)
if not _is_nan(context_recall):
metrics_sum["context_recall"] += context_recall
metrics_data["context_recall"]["sum"] += context_recall
metrics_data["context_recall"]["count"] += 1
context_precision = metrics.get("context_precision", 0)
if not _is_nan(context_precision):
metrics_sum["context_precision"] += context_precision
metrics_data["context_precision"]["sum"] += context_precision
metrics_data["context_precision"]["count"] += 1
ragas_score = result.get("ragas_score", 0)
if not _is_nan(ragas_score):
metrics_sum["ragas_score"] += ragas_score
metrics_data["ragas_score"]["sum"] += ragas_score
metrics_data["ragas_score"]["count"] += 1
# Calculate averages
n = len(valid_results)
# Calculate averages using actual counts for each metric
avg_metrics = {}
for k, v in metrics_sum.items():
avg_val = v / n if n > 0 else 0
# Handle NaN in average
avg_metrics[k] = round(avg_val, 4) if not _is_nan(avg_val) else 0.0
for metric_name, data in metrics_data.items():
if data["count"] > 0:
avg_val = data["sum"] / data["count"]
avg_metrics[metric_name] = (
round(avg_val, 4) if not _is_nan(avg_val) else 0.0
)
else:
avg_metrics[metric_name] = 0.0
# Find min and max RAGAS scores (filter out NaN)
ragas_scores = []
@ -534,6 +754,19 @@ class RAGEvaluator:
elapsed_time = time.time() - start_time
# Add a small delay to ensure all buffered output is completely written
await asyncio.sleep(0.2)
# Flush all output buffers to ensure RAGAS progress bars are fully displayed
sys.stdout.flush()
sys.stderr.flush()
await asyncio.sleep(0.2)
sys.stderr.write("\n")
sys.stderr.flush()
# Display results table
self._display_results_table(results)
# Calculate benchmark statistics
benchmark_stats = self._calculate_benchmark_stats(results)
@ -619,15 +852,9 @@ async def main():
if len(sys.argv) > 1:
rag_api_url = sys.argv[1]
logger.info("")
logger.info("%s", "=" * 70)
logger.info("🔍 RAGAS Evaluation - Using Real LightRAG API")
logger.info("%s", "=" * 70)
if rag_api_url:
logger.info("📡 RAG API URL: %s", rag_api_url)
else:
logger.info("📡 RAG API URL: http://localhost:9621 (default)")
logger.info("%s", "=" * 70)
evaluator = RAGEvaluator(rag_api_url=rag_api_url)
await evaluator.run()