Update lightrag/evaluation/eval_rag_quality.py for launguage

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
(cherry picked from commit 98f0464a31)
This commit is contained in:
ben moussa anouar 2025-11-02 18:03:54 +01:00 committed by Raphaël MANSUY
parent 407a2c2ecd
commit dd425e5513

View file

@ -21,7 +21,6 @@ Results are saved to: lightrag/evaluation/results/
import asyncio import asyncio
import csv import csv
import json import json
import math
import os import os
import sys import sys
import time import time
@ -31,17 +30,22 @@ from typing import Any, Dict, List
import httpx import httpx
from dotenv import load_dotenv from dotenv import load_dotenv
from lightrag.utils import logger
# Add parent directory to path # Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent.parent)) sys.path.insert(0, str(Path(__file__).parent.parent.parent))
# use the .env that is inside the current folder # Load .env from project root
# allows to use different .env file for each lightrag instance project_root = Path(__file__).parent.parent.parent
# the OS environment variables take precedence over the .env file load_dotenv(project_root / ".env")
load_dotenv(dotenv_path=".env", override=False)
# Setup OpenAI API key (required for RAGAS evaluation)
# Use LLM_BINDING_API_KEY if OPENAI_API_KEY is not set
if "OPENAI_API_KEY" not in os.environ:
if "LLM_BINDING_API_KEY" in os.environ:
os.environ["OPENAI_API_KEY"] = os.environ["LLM_BINDING_API_KEY"]
else:
os.environ["OPENAI_API_KEY"] = input("Enter your OpenAI API key: ")
# Conditional imports - will raise ImportError if dependencies not installed
try: try:
from datasets import Dataset from datasets import Dataset
from ragas import evaluate from ragas import evaluate
@ -51,22 +55,10 @@ try:
context_recall, context_recall,
faithfulness, faithfulness,
) )
except ImportError as e:
RAGAS_AVAILABLE = True print(f"❌ RAGAS import error: {e}")
except ImportError: print(" Install with: pip install ragas datasets")
RAGAS_AVAILABLE = False sys.exit(1)
Dataset = None
evaluate = None
CONNECT_TIMEOUT_SECONDS = 180.0
READ_TIMEOUT_SECONDS = 300.0
TOTAL_TIMEOUT_SECONDS = 180.0
def _is_nan(value: Any) -> bool:
"""Return True when value is a float NaN."""
return isinstance(value, float) and math.isnan(value)
class RAGEvaluator: class RAGEvaluator:
@ -80,39 +72,7 @@ class RAGEvaluator:
test_dataset_path: Path to test dataset JSON file test_dataset_path: Path to test dataset JSON file
rag_api_url: Base URL of LightRAG API (e.g., http://localhost:9621) rag_api_url: Base URL of LightRAG API (e.g., http://localhost:9621)
If None, will try to read from environment or use default If None, will try to read from environment or use default
Raises:
ImportError: If ragas or datasets packages are not installed
ValueError: If LLM_BINDING is not set to 'openai'
EnvironmentError: If LLM_BINDING_API_KEY is not set
""" """
# Validate RAGAS dependencies are installed
if not RAGAS_AVAILABLE:
raise ImportError(
"RAGAS dependencies not installed. "
"Install with: pip install ragas datasets"
)
# Validate LLM_BINDING is set to openai (required for RAGAS)
llm_binding = os.getenv("LLM_BINDING", "").lower()
if llm_binding != "openai":
raise ValueError(
f"LLM_BINDING must be set to 'openai' for RAGAS evaluation. "
f"Current value: '{llm_binding or '(not set)'}'"
)
# Validate LLM_BINDING_API_KEY exists
llm_binding_key = os.getenv("LLM_BINDING_API_KEY")
if not llm_binding_key:
raise EnvironmentError(
"LLM_BINDING_API_KEY environment variable is not set. "
"This is required for RAGAS evaluation."
)
# Set OPENAI_API_KEY from LLM_BINDING_API_KEY for RAGAS
os.environ["OPENAI_API_KEY"] = llm_binding_key
logger.info("✅ LLM_BINDING: openai")
if test_dataset_path is None: if test_dataset_path is None:
test_dataset_path = Path(__file__).parent / "sample_dataset.json" test_dataset_path = Path(__file__).parent / "sample_dataset.json"
@ -166,19 +126,10 @@ class RAGEvaluator:
"top_k": 10, "top_k": 10,
} }
# Get API key from environment for authentication
api_key = os.getenv("LIGHTRAG_API_KEY")
# Prepare headers with optional authentication
headers = {}
if api_key:
headers["X-API-Key"] = api_key
# Single optimized API call - gets both answer AND chunk content # Single optimized API call - gets both answer AND chunk content
response = await client.post( response = await client.post(
f"{self.rag_api_url}/query", f"{self.rag_api_url}/query",
json=payload, json=payload,
headers=headers if headers else None,
) )
response.raise_for_status() response.raise_for_status()
result = response.json() result = response.json()
@ -187,31 +138,17 @@ class RAGEvaluator:
references = result.get("references", []) references = result.get("references", [])
# DEBUG: Inspect the API response # DEBUG: Inspect the API response
logger.debug("🔍 References Count: %s", len(references)) print(f" 🔍 References Count: {len(references)}")
if references: if references:
first_ref = references[0] first_ref = references[0]
logger.debug("🔍 First Reference Keys: %s", list(first_ref.keys())) print(f" 🔍 First Reference Keys: {list(first_ref.keys())}")
if "content" in first_ref: if "content" in first_ref:
content_preview = first_ref["content"] print(f" 🔍 Content Preview: {first_ref['content'][:100]}...")
if isinstance(content_preview, list) and content_preview:
logger.debug(
"🔍 Content Preview (first chunk): %s...",
content_preview[0][:100],
)
elif isinstance(content_preview, str):
logger.debug("🔍 Content Preview: %s...", content_preview[:100])
# Extract chunk content from enriched references # Extract chunk content from enriched references
# Note: content is now a list of chunks per reference (one file may have multiple chunks) contexts = [
contexts = [] ref.get("content", "") for ref in references if ref.get("content")
for ref in references: ]
content = ref.get("content", [])
if isinstance(content, list):
# Flatten the list: each chunk becomes a separate context
contexts.extend(content)
elif isinstance(content, str):
# Backward compatibility: if content is still a string (shouldn't happen)
contexts.append(content)
return { return {
"answer": answer, "answer": answer,
@ -257,13 +194,11 @@ class RAGEvaluator:
Returns: Returns:
Evaluation result dictionary Evaluation result dictionary
""" """
total_cases = len(self.test_cases)
async with semaphore: async with semaphore:
question = test_case["question"] question = test_case["question"]
ground_truth = test_case["ground_truth"] ground_truth = test_case["ground_truth"]
logger.info("[%s/%s] Evaluating: %s...", idx, total_cases, question[:60]) print(f"[{idx}/{len(self.test_cases)}] Evaluating: {question[:60]}...")
# Generate RAG response by calling actual LightRAG API # Generate RAG response by calling actual LightRAG API
rag_response = await self.generate_rag_response( rag_response = await self.generate_rag_response(
@ -274,13 +209,11 @@ class RAGEvaluator:
retrieved_contexts = rag_response["contexts"] retrieved_contexts = rag_response["contexts"]
# DEBUG: Print what was actually retrieved # DEBUG: Print what was actually retrieved
logger.debug("📝 Retrieved %s contexts", len(retrieved_contexts)) print(f" 📝 Retrieved {len(retrieved_contexts)} contexts")
if retrieved_contexts: if retrieved_contexts:
logger.debug( print(f" 📄 First context preview: {retrieved_contexts[0][:100]}...")
"📄 First context preview: %s...", retrieved_contexts[0][:100]
)
else: else:
logger.warning("⚠️ No contexts retrieved!") print(" ⚠️ WARNING: No contexts retrieved!")
# Prepare dataset for RAGAS evaluation with CORRECT contexts # Prepare dataset for RAGAS evaluation with CORRECT contexts
eval_dataset = Dataset.from_dict( eval_dataset = Dataset.from_dict(
@ -338,16 +271,20 @@ class RAGEvaluator:
ragas_score = sum(metrics.values()) / len(metrics) if metrics else 0 ragas_score = sum(metrics.values()) / len(metrics) if metrics else 0
result["ragas_score"] = round(ragas_score, 4) result["ragas_score"] = round(ragas_score, 4)
logger.info("✅ Faithfulness: %.4f", metrics["faithfulness"]) # Print metrics
logger.info("✅ Answer Relevance: %.4f", metrics["answer_relevance"]) print(f" ✅ Faithfulness: {metrics['faithfulness']:.4f}")
logger.info("✅ Context Recall: %.4f", metrics["context_recall"]) print(f" ✅ Answer Relevance: {metrics['answer_relevance']:.4f}")
logger.info("✅ Context Precision: %.4f", metrics["context_precision"]) print(f" ✅ Context Recall: {metrics['context_recall']:.4f}")
logger.info("📊 RAGAS Score: %.4f", result["ragas_score"]) print(f" ✅ Context Precision: {metrics['context_precision']:.4f}")
print(f" 📊 RAGAS Score: {result['ragas_score']:.4f}\n")
return result return result
except Exception as e: except Exception as e:
logger.exception("❌ Error evaluating: %s", e) import traceback
print(f" ❌ Error evaluating: {str(e)}")
print(f" 🔍 Full traceback:\n{traceback.format_exc()}\n")
return { return {
"question": question, "question": question,
"error": str(e), "error": str(e),
@ -366,22 +303,17 @@ class RAGEvaluator:
# Get MAX_ASYNC from environment (default to 4 if not set) # Get MAX_ASYNC from environment (default to 4 if not set)
max_async = int(os.getenv("MAX_ASYNC", "4")) max_async = int(os.getenv("MAX_ASYNC", "4"))
logger.info("") print("\n" + "=" * 70)
logger.info("%s", "=" * 70) print("🚀 Starting RAGAS Evaluation of Portfolio RAG System")
logger.info("🚀 Starting RAGAS Evaluation of Portfolio RAG System") print(f"🔧 Parallel evaluations: {max_async}")
logger.info("🔧 Parallel evaluations: %s", max_async) print("=" * 70 + "\n")
logger.info("%s", "=" * 70)
# Create semaphore to limit concurrent evaluations # Create semaphore to limit concurrent evaluations
semaphore = asyncio.Semaphore(max_async) semaphore = asyncio.Semaphore(max_async)
# Create shared HTTP client with connection pooling and proper timeouts # Create shared HTTP client with connection pooling and proper timeouts
# Timeout: 3 minutes for connect, 5 minutes for read (LLM can be slow) # Timeout: 3 minutes for connect, 5 minutes for read (LLM can be slow)
timeout = httpx.Timeout( timeout = httpx.Timeout(180.0, connect=180.0, read=300.0)
TOTAL_TIMEOUT_SECONDS,
connect=CONNECT_TIMEOUT_SECONDS,
read=READ_TIMEOUT_SECONDS,
)
limits = httpx.Limits( limits = httpx.Limits(
max_connections=max_async * 2, # Allow some buffer max_connections=max_async * 2, # Allow some buffer
max_keepalive_connections=max_async, max_keepalive_connections=max_async,
@ -486,6 +418,8 @@ class RAGEvaluator:
} }
# Calculate averages for each metric (handling NaN values) # Calculate averages for each metric (handling NaN values)
import math
metrics_sum = { metrics_sum = {
"faithfulness": 0.0, "faithfulness": 0.0,
"answer_relevance": 0.0, "answer_relevance": 0.0,
@ -498,23 +432,39 @@ class RAGEvaluator:
metrics = result.get("metrics", {}) metrics = result.get("metrics", {})
# Skip NaN values when summing # Skip NaN values when summing
faithfulness = metrics.get("faithfulness", 0) faithfulness = metrics.get("faithfulness", 0)
if not _is_nan(faithfulness): if (
not math.isnan(faithfulness)
if isinstance(faithfulness, float)
else True
):
metrics_sum["faithfulness"] += faithfulness metrics_sum["faithfulness"] += faithfulness
answer_relevance = metrics.get("answer_relevance", 0) answer_relevance = metrics.get("answer_relevance", 0)
if not _is_nan(answer_relevance): if (
not math.isnan(answer_relevance)
if isinstance(answer_relevance, float)
else True
):
metrics_sum["answer_relevance"] += answer_relevance metrics_sum["answer_relevance"] += answer_relevance
context_recall = metrics.get("context_recall", 0) context_recall = metrics.get("context_recall", 0)
if not _is_nan(context_recall): if (
not math.isnan(context_recall)
if isinstance(context_recall, float)
else True
):
metrics_sum["context_recall"] += context_recall metrics_sum["context_recall"] += context_recall
context_precision = metrics.get("context_precision", 0) context_precision = metrics.get("context_precision", 0)
if not _is_nan(context_precision): if (
not math.isnan(context_precision)
if isinstance(context_precision, float)
else True
):
metrics_sum["context_precision"] += context_precision metrics_sum["context_precision"] += context_precision
ragas_score = result.get("ragas_score", 0) ragas_score = result.get("ragas_score", 0)
if not _is_nan(ragas_score): if not math.isnan(ragas_score) if isinstance(ragas_score, float) else True:
metrics_sum["ragas_score"] += ragas_score metrics_sum["ragas_score"] += ragas_score
# Calculate averages # Calculate averages
@ -523,13 +473,13 @@ class RAGEvaluator:
for k, v in metrics_sum.items(): for k, v in metrics_sum.items():
avg_val = v / n if n > 0 else 0 avg_val = v / n if n > 0 else 0
# Handle NaN in average # Handle NaN in average
avg_metrics[k] = round(avg_val, 4) if not _is_nan(avg_val) else 0.0 avg_metrics[k] = round(avg_val, 4) if not math.isnan(avg_val) else 0.0
# Find min and max RAGAS scores (filter out NaN) # Find min and max RAGAS scores (filter out NaN)
ragas_scores = [] ragas_scores = []
for r in valid_results: for r in valid_results:
score = r.get("ragas_score", 0) score = r.get("ragas_score", 0)
if _is_nan(score): if isinstance(score, float) and math.isnan(score):
continue # Skip NaN values continue # Skip NaN values
ragas_scores.append(score) ragas_scores.append(score)
@ -575,53 +525,43 @@ class RAGEvaluator:
) )
with open(json_path, "w") as f: with open(json_path, "w") as f:
json.dump(summary, f, indent=2) json.dump(summary, f, indent=2)
logger.info("✅ JSON results saved to: %s", json_path) print(f"✅ JSON results saved to: {json_path}")
# Export to CSV # Export to CSV
csv_path = self._export_to_csv(results) csv_path = self._export_to_csv(results)
logger.info("✅ CSV results saved to: %s", csv_path) print(f"✅ CSV results saved to: {csv_path}")
# Print summary # Print summary
logger.info("") print("\n" + "=" * 70)
logger.info("%s", "=" * 70) print("📊 EVALUATION COMPLETE")
logger.info("📊 EVALUATION COMPLETE") print("=" * 70)
logger.info("%s", "=" * 70) print(f"Total Tests: {len(results)}")
logger.info("Total Tests: %s", len(results)) print(f"Successful: {benchmark_stats['successful_tests']}")
logger.info("Successful: %s", benchmark_stats["successful_tests"]) print(f"Failed: {benchmark_stats['failed_tests']}")
logger.info("Failed: %s", benchmark_stats["failed_tests"]) print(f"Success Rate: {benchmark_stats['success_rate']:.2f}%")
logger.info("Success Rate: %.2f%%", benchmark_stats["success_rate"]) print(f"Elapsed Time: {elapsed_time:.2f} seconds")
logger.info("Elapsed Time: %.2f seconds", elapsed_time) print(f"Avg Time/Test: {elapsed_time / len(results):.2f} seconds")
logger.info("Avg Time/Test: %.2f seconds", elapsed_time / len(results))
# Print benchmark metrics # Print benchmark metrics
logger.info("") print("\n" + "=" * 70)
logger.info("%s", "=" * 70) print("📈 BENCHMARK RESULTS (Averages)")
logger.info("📈 BENCHMARK RESULTS (Average)") print("=" * 70)
logger.info("%s", "=" * 70)
avg = benchmark_stats["average_metrics"] avg = benchmark_stats["average_metrics"]
logger.info("Average Faithfulness: %.4f", avg["faithfulness"]) print(f"Average Faithfulness: {avg['faithfulness']:.4f}")
logger.info("Average Answer Relevance: %.4f", avg["answer_relevance"]) print(f"Average Answer Relevance: {avg['answer_relevance']:.4f}")
logger.info("Average Context Recall: %.4f", avg["context_recall"]) print(f"Average Context Recall: {avg['context_recall']:.4f}")
logger.info("Average Context Precision: %.4f", avg["context_precision"]) print(f"Average Context Precision: {avg['context_precision']:.4f}")
logger.info("Average RAGAS Score: %.4f", avg["ragas_score"]) print(f"Average RAGAS Score: {avg['ragas_score']:.4f}")
logger.info("") print(f"\nMin RAGAS Score: {benchmark_stats['min_ragas_score']:.4f}")
logger.info( print(f"Max RAGAS Score: {benchmark_stats['max_ragas_score']:.4f}")
"Min RAGAS Score: %.4f",
benchmark_stats["min_ragas_score"],
)
logger.info(
"Max RAGAS Score: %.4f",
benchmark_stats["max_ragas_score"],
)
logger.info("") print("\n" + "=" * 70)
logger.info("%s", "=" * 70) print("📁 GENERATED FILES")
logger.info("📁 GENERATED FILES") print("=" * 70)
logger.info("%s", "=" * 70) print(f"Results Dir: {self.results_dir.absolute()}")
logger.info("Results Dir: %s", self.results_dir.absolute()) print(f" • CSV: {csv_path.name}")
logger.info(" • CSV: %s", csv_path.name) print(f" • JSON: {json_path.name}")
logger.info(" • JSON: %s", json_path.name) print("=" * 70 + "\n")
logger.info("%s", "=" * 70)
return summary return summary
@ -641,20 +581,19 @@ async def main():
if len(sys.argv) > 1: if len(sys.argv) > 1:
rag_api_url = sys.argv[1] rag_api_url = sys.argv[1]
logger.info("") print("\n" + "=" * 70)
logger.info("%s", "=" * 70) print("🔍 RAGAS Evaluation - Using Real LightRAG API")
logger.info("🔍 RAGAS Evaluation - Using Real LightRAG API") print("=" * 70)
logger.info("%s", "=" * 70)
if rag_api_url: if rag_api_url:
logger.info("📡 RAG API URL: %s", rag_api_url) print(f"📡 RAG API URL: {rag_api_url}")
else: else:
logger.info("📡 RAG API URL: http://localhost:9621 (default)") print("📡 RAG API URL: http://localhost:9621 (default)")
logger.info("%s", "=" * 70) print("=" * 70 + "\n")
evaluator = RAGEvaluator(rag_api_url=rag_api_url) evaluator = RAGEvaluator(rag_api_url=rag_api_url)
await evaluator.run() await evaluator.run()
except Exception as e: except Exception as e:
logger.exception("❌ Error: %s", e) print(f"\n❌ Error: {str(e)}\n")
sys.exit(1) sys.exit(1)