This commit is contained in:
Raphaël MANSUY 2025-12-04 19:19:23 +08:00
parent 7aa4af900a
commit 6b0fe03dcf

View file

@ -9,9 +9,22 @@ Evaluates RAG response quality using RAGAS metrics:
- Context Precision: Is retrieved context clean without noise? - Context Precision: Is retrieved context clean without noise?
Usage: Usage:
# Use defaults (sample_dataset.json, http://localhost:9621)
python lightrag/evaluation/eval_rag_quality.py python lightrag/evaluation/eval_rag_quality.py
python lightrag/evaluation/eval_rag_quality.py http://localhost:9621
python lightrag/evaluation/eval_rag_quality.py http://your-rag-server.com:9621 # Specify custom dataset
python lightrag/evaluation/eval_rag_quality.py --dataset my_test.json
python lightrag/evaluation/eval_rag_quality.py -d my_test.json
# Specify custom RAG endpoint
python lightrag/evaluation/eval_rag_quality.py --ragendpoint http://my-server.com:9621
python lightrag/evaluation/eval_rag_quality.py -r http://my-server.com:9621
# Specify both
python lightrag/evaluation/eval_rag_quality.py -d my_test.json -r http://localhost:9621
# Get help
python lightrag/evaluation/eval_rag_quality.py --help
Results are saved to: lightrag/evaluation/results/ Results are saved to: lightrag/evaluation/results/
- results_YYYYMMDD_HHMMSS.csv (CSV export for analysis) - results_YYYYMMDD_HHMMSS.csv (CSV export for analysis)
@ -24,6 +37,7 @@ Technical Notes:
- Deprecation warnings are suppressed for cleaner output - Deprecation warnings are suppressed for cleaner output
""" """
import argparse
import asyncio import asyncio
import csv import csv
import json import json
@ -68,6 +82,7 @@ try:
) )
from ragas.llms import LangchainLLMWrapper from ragas.llms import LangchainLLMWrapper
from langchain_openai import ChatOpenAI, OpenAIEmbeddings from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from tqdm.auto import tqdm
RAGAS_AVAILABLE = True RAGAS_AVAILABLE = True
@ -199,7 +214,9 @@ class RAGEvaluator:
logger.info(" • Embedding Model: %s", self.eval_embedding_model) logger.info(" • Embedding Model: %s", self.eval_embedding_model)
if self.eval_base_url: if self.eval_base_url:
logger.info(" • Custom Endpoint: %s", self.eval_base_url) logger.info(" • Custom Endpoint: %s", self.eval_base_url)
logger.info(" • Bypass N-Parameter: Enabled (use LangchainLLMWrapperfor compatibility)") logger.info(
" • Bypass N-Parameter: Enabled (use LangchainLLMWrapperfor compatibility)"
)
else: else:
logger.info(" • Endpoint: OpenAI Official API") logger.info(" • Endpoint: OpenAI Official API")
@ -330,28 +347,36 @@ class RAGEvaluator:
self, self,
idx: int, idx: int,
test_case: Dict[str, str], test_case: Dict[str, str],
semaphore: asyncio.Semaphore, rag_semaphore: asyncio.Semaphore,
eval_semaphore: asyncio.Semaphore,
client: httpx.AsyncClient, client: httpx.AsyncClient,
progress_counter: Dict[str, int], progress_counter: Dict[str, int],
position_pool: asyncio.Queue,
pbar_creation_lock: asyncio.Lock,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Evaluate a single test case with concurrency control Evaluate a single test case with two-stage pipeline concurrency control
Args: Args:
idx: Test case index (1-based) idx: Test case index (1-based)
test_case: Test case dictionary with question and ground_truth test_case: Test case dictionary with question and ground_truth
semaphore: Semaphore to control concurrency rag_semaphore: Semaphore to control overall concurrency (covers entire function)
eval_semaphore: Semaphore to control RAGAS evaluation concurrency (Stage 2)
client: Shared httpx AsyncClient for connection pooling client: Shared httpx AsyncClient for connection pooling
progress_counter: Shared dictionary for progress tracking progress_counter: Shared dictionary for progress tracking
position_pool: Queue of available tqdm position indices
pbar_creation_lock: Lock to serialize tqdm creation and prevent race conditions
Returns: Returns:
Evaluation result dictionary Evaluation result dictionary
""" """
async with semaphore: # rag_semaphore controls the entire evaluation process to prevent
# all RAG responses from being generated at once when eval is slow
async with rag_semaphore:
question = test_case["question"] question = test_case["question"]
ground_truth = test_case["ground_truth"] ground_truth = test_case["ground_truth"]
# Generate RAG response by calling actual LightRAG API # Stage 1: Generate RAG response
try: try:
rag_response = await self.generate_rag_response( rag_response = await self.generate_rag_response(
question=question, client=client question=question, client=client
@ -371,11 +396,6 @@ class RAGEvaluator:
# *** CRITICAL FIX: Use actual retrieved contexts, NOT ground_truth *** # *** CRITICAL FIX: Use actual retrieved contexts, NOT ground_truth ***
retrieved_contexts = rag_response["contexts"] retrieved_contexts = rag_response["contexts"]
# DEBUG: Print what was actually retrieved (only in debug mode)
logger.debug(
"📝 Test %s: Retrieved %s contexts", idx, len(retrieved_contexts)
)
# Prepare dataset for RAGAS evaluation with CORRECT contexts # Prepare dataset for RAGAS evaluation with CORRECT contexts
eval_dataset = Dataset.from_dict( eval_dataset = Dataset.from_dict(
{ {
@ -386,98 +406,142 @@ class RAGEvaluator:
} }
) )
# Run RAGAS evaluation # Stage 2: Run RAGAS evaluation (controlled by eval_semaphore)
# IMPORTANT: Create fresh metric instances for each evaluation to avoid # IMPORTANT: Create fresh metric instances for each evaluation to avoid
# concurrent state conflicts when multiple tasks run in parallel # concurrent state conflicts when multiple tasks run in parallel
try: async with eval_semaphore:
eval_results = evaluate( pbar = None
dataset=eval_dataset, position = None
metrics=[ try:
Faithfulness(), # Acquire a position from the pool for this tqdm progress bar
AnswerRelevancy(), position = await position_pool.get()
ContextRecall(),
ContextPrecision(),
],
llm=self.eval_llm,
embeddings=self.eval_embeddings,
)
# Convert to DataFrame (RAGAS v0.3+ API) # Serialize tqdm creation to prevent race conditions
df = eval_results.to_pandas() # Multiple tasks creating tqdm simultaneously can cause display conflicts
async with pbar_creation_lock:
# Create tqdm progress bar with assigned position to avoid overlapping
# leave=False ensures the progress bar is cleared after completion,
# preventing accumulation of completed bars and allowing position reuse
pbar = tqdm(
total=4,
desc=f"Eval-{idx:02d}",
position=position,
leave=False,
)
# Give tqdm time to initialize and claim its screen position
await asyncio.sleep(0.05)
# Extract scores from first row eval_results = evaluate(
scores_row = df.iloc[0] dataset=eval_dataset,
metrics=[
Faithfulness(),
AnswerRelevancy(),
ContextRecall(),
ContextPrecision(),
],
llm=self.eval_llm,
embeddings=self.eval_embeddings,
_pbar=pbar,
)
# Extract scores (RAGAS v0.3+ uses .to_pandas()) # Convert to DataFrame (RAGAS v0.3+ API)
result = { df = eval_results.to_pandas()
"test_number": idx,
"question": question,
"answer": rag_response["answer"][:200] + "..."
if len(rag_response["answer"]) > 200
else rag_response["answer"],
"ground_truth": ground_truth[:200] + "..."
if len(ground_truth) > 200
else ground_truth,
"project": test_case.get("project", "unknown"),
"metrics": {
"faithfulness": float(scores_row.get("faithfulness", 0)),
"answer_relevance": float(
scores_row.get("answer_relevancy", 0)
),
"context_recall": float(scores_row.get("context_recall", 0)),
"context_precision": float(
scores_row.get("context_precision", 0)
),
},
"timestamp": datetime.now().isoformat(),
}
# Calculate RAGAS score (average of all metrics, excluding NaN values) # Extract scores from first row
metrics = result["metrics"] scores_row = df.iloc[0]
valid_metrics = [v for v in metrics.values() if not _is_nan(v)]
ragas_score = (
sum(valid_metrics) / len(valid_metrics) if valid_metrics else 0
)
result["ragas_score"] = round(ragas_score, 4)
# Update progress counter # Extract scores (RAGAS v0.3+ uses .to_pandas())
progress_counter["completed"] += 1 result = {
"test_number": idx,
"question": question,
"answer": rag_response["answer"][:200] + "..."
if len(rag_response["answer"]) > 200
else rag_response["answer"],
"ground_truth": ground_truth[:200] + "..."
if len(ground_truth) > 200
else ground_truth,
"project": test_case.get("project", "unknown"),
"metrics": {
"faithfulness": float(scores_row.get("faithfulness", 0)),
"answer_relevance": float(
scores_row.get("answer_relevancy", 0)
),
"context_recall": float(
scores_row.get("context_recall", 0)
),
"context_precision": float(
scores_row.get("context_precision", 0)
),
},
"timestamp": datetime.now().isoformat(),
}
return result # Calculate RAGAS score (average of all metrics, excluding NaN values)
metrics = result["metrics"]
valid_metrics = [v for v in metrics.values() if not _is_nan(v)]
ragas_score = (
sum(valid_metrics) / len(valid_metrics) if valid_metrics else 0
)
result["ragas_score"] = round(ragas_score, 4)
except Exception as e: # Update progress counter
logger.error("Error evaluating test %s: %s", idx, str(e)) progress_counter["completed"] += 1
progress_counter["completed"] += 1
return { return result
"test_number": idx,
"question": question, except Exception as e:
"error": str(e), logger.error("Error evaluating test %s: %s", idx, str(e))
"metrics": {}, progress_counter["completed"] += 1
"ragas_score": 0, return {
"timestamp": datetime.now().isoformat(), "test_number": idx,
} "question": question,
"error": str(e),
"metrics": {},
"ragas_score": 0,
"timestamp": datetime.now().isoformat(),
}
finally:
# Force close progress bar to ensure completion
if pbar is not None:
pbar.close()
# Release the position back to the pool for reuse
if position is not None:
await position_pool.put(position)
async def evaluate_responses(self) -> List[Dict[str, Any]]: async def evaluate_responses(self) -> List[Dict[str, Any]]:
""" """
Evaluate all test cases in parallel and return metrics Evaluate all test cases in parallel with two-stage pipeline and return metrics
Returns: Returns:
List of evaluation results with metrics List of evaluation results with metrics
""" """
# Get evaluation concurrency from environment (default to 1 for serial evaluation) # Get evaluation concurrency from environment (default to 2 for parallel evaluation)
max_async = int(os.getenv("EVAL_MAX_CONCURRENT", "3")) max_async = int(os.getenv("EVAL_MAX_CONCURRENT", "2"))
logger.info("%s", "=" * 70) logger.info("%s", "=" * 70)
logger.info("🚀 Starting RAGAS Evaluation of LightRAG System") logger.info("🚀 Starting RAGAS Evaluation of LightRAG System")
logger.info("🔧 Concurrent evaluations: %s", max_async) logger.info("🔧 RAGAS Evaluation (Stage 2): %s concurrent", max_async)
logger.info("%s", "=" * 70) logger.info("%s", "=" * 70)
# Create semaphore to limit concurrent evaluations # Create two-stage pipeline semaphores
semaphore = asyncio.Semaphore(max_async) # Stage 1: RAG generation - allow x2 concurrency to keep evaluation fed
rag_semaphore = asyncio.Semaphore(max_async * 2)
# Stage 2: RAGAS evaluation - primary bottleneck
eval_semaphore = asyncio.Semaphore(max_async)
# Create progress counter (shared across all tasks) # Create progress counter (shared across all tasks)
progress_counter = {"completed": 0} progress_counter = {"completed": 0}
# Create position pool for tqdm progress bars
# Positions range from 0 to max_async-1, ensuring no overlapping displays
position_pool = asyncio.Queue()
for i in range(max_async):
await position_pool.put(i)
# Create lock to serialize tqdm creation and prevent race conditions
# This ensures progress bars are created one at a time, avoiding display conflicts
pbar_creation_lock = asyncio.Lock()
# Create shared HTTP client with connection pooling and proper timeouts # Create shared HTTP client with connection pooling and proper timeouts
# Timeout: 3 minutes for connect, 5 minutes for read (LLM can be slow) # Timeout: 3 minutes for connect, 5 minutes for read (LLM can be slow)
timeout = httpx.Timeout( timeout = httpx.Timeout(
@ -486,20 +550,27 @@ class RAGEvaluator:
read=READ_TIMEOUT_SECONDS, read=READ_TIMEOUT_SECONDS,
) )
limits = httpx.Limits( limits = httpx.Limits(
max_connections=max_async * 2, # Allow some buffer max_connections=(max_async + 1) * 2, # Allow buffer for RAG stage
max_keepalive_connections=max_async, max_keepalive_connections=max_async + 1,
) )
async with httpx.AsyncClient(timeout=timeout, limits=limits) as client: async with httpx.AsyncClient(timeout=timeout, limits=limits) as client:
# Create tasks for all test cases # Create tasks for all test cases
tasks = [ tasks = [
self.evaluate_single_case( self.evaluate_single_case(
idx, test_case, semaphore, client, progress_counter idx,
test_case,
rag_semaphore,
eval_semaphore,
client,
progress_counter,
position_pool,
pbar_creation_lock,
) )
for idx, test_case in enumerate(self.test_cases, 1) for idx, test_case in enumerate(self.test_cases, 1)
] ]
# Run all evaluations in parallel (limited by semaphore) # Run all evaluations in parallel (limited by two-stage semaphores)
results = await asyncio.gather(*tasks) results = await asyncio.gather(*tasks)
return list(results) return list(results)
@ -759,19 +830,6 @@ class RAGEvaluator:
elapsed_time = time.time() - start_time elapsed_time = time.time() - start_time
# Add a small delay to ensure all buffered output is completely written
await asyncio.sleep(0.5)
# Flush all output buffers to ensure RAGAS progress bars are fully displayed
sys.stdout.flush()
sys.stderr.flush()
sys.stdout.write("\n")
sys.stderr.write("\n")
sys.stdout.flush()
sys.stderr.flush()
# Display results table
self._display_results_table(results)
# Calculate benchmark statistics # Calculate benchmark statistics
benchmark_stats = self._calculate_benchmark_stats(results) benchmark_stats = self._calculate_benchmark_stats(results)
@ -791,6 +849,10 @@ class RAGEvaluator:
) )
with open(json_path, "w") as f: with open(json_path, "w") as f:
json.dump(summary, f, indent=2) json.dump(summary, f, indent=2)
# Display results table
self._display_results_table(results)
logger.info("✅ JSON results saved to: %s", json_path) logger.info("✅ JSON results saved to: %s", json_path)
# Export to CSV # Export to CSV
@ -846,22 +908,61 @@ async def main():
""" """
Main entry point for RAGAS evaluation Main entry point for RAGAS evaluation
Command-line arguments:
--dataset, -d: Path to test dataset JSON file (default: sample_dataset.json)
--ragendpoint, -r: LightRAG API endpoint URL (default: http://localhost:9621 or $LIGHTRAG_API_URL)
Usage: Usage:
python lightrag/evaluation/eval_rag_quality.py python lightrag/evaluation/eval_rag_quality.py
python lightrag/evaluation/eval_rag_quality.py http://localhost:9621 python lightrag/evaluation/eval_rag_quality.py --dataset my_test.json
python lightrag/evaluation/eval_rag_quality.py http://your-server.com:9621 python lightrag/evaluation/eval_rag_quality.py -d my_test.json -r http://localhost:9621
""" """
try: try:
# Get RAG API URL from command line or environment # Parse command-line arguments
rag_api_url = None parser = argparse.ArgumentParser(
if len(sys.argv) > 1: description="RAGAS Evaluation Script for LightRAG System",
rag_api_url = sys.argv[1] formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Use defaults
python lightrag/evaluation/eval_rag_quality.py
# Specify custom dataset
python lightrag/evaluation/eval_rag_quality.py --dataset my_test.json
# Specify custom RAG endpoint
python lightrag/evaluation/eval_rag_quality.py --ragendpoint http://my-server.com:9621
# Specify both
python lightrag/evaluation/eval_rag_quality.py -d my_test.json -r http://localhost:9621
""",
)
parser.add_argument(
"--dataset",
"-d",
type=str,
default=None,
help="Path to test dataset JSON file (default: sample_dataset.json in evaluation directory)",
)
parser.add_argument(
"--ragendpoint",
"-r",
type=str,
default=None,
help="LightRAG API endpoint URL (default: http://localhost:9621 or $LIGHTRAG_API_URL environment variable)",
)
args = parser.parse_args()
logger.info("%s", "=" * 70) logger.info("%s", "=" * 70)
logger.info("🔍 RAGAS Evaluation - Using Real LightRAG API") logger.info("🔍 RAGAS Evaluation - Using Real LightRAG API")
logger.info("%s", "=" * 70) logger.info("%s", "=" * 70)
evaluator = RAGEvaluator(rag_api_url=rag_api_url) evaluator = RAGEvaluator(
test_dataset_path=args.dataset, rag_api_url=args.ragendpoint
)
await evaluator.run() await evaluator.run()
except Exception as e: except Exception as e:
logger.exception("❌ Error: %s", e) logger.exception("❌ Error: %s", e)