Merge pull request #2311 from danielaskdd/evalueate-cli

Feature: Enhanced RAG Evaluation CLI with Two-Stage Pipeline and Improved UX
This commit is contained in:
Daniel.y 2025-11-05 02:14:54 +08:00 committed by GitHub
commit eb80771f51
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 286 additions and 115 deletions

View file

@ -407,10 +407,9 @@ MEMGRAPH_DATABASE=memgraph
### Custom endpoint for evaluation models (optional, for OpenAI-compatible services)
# EVAL_LLM_BINDING_HOST=https://api.openai.com/v1
### Evaluation concurrency and rate limiting
### Number of concurrent test case evaluations (default: 1 for serial evaluation)
### Number of concurrent test case evaluations
### Lower values reduce API rate limit issues but increase evaluation time
# EVAL_MAX_CONCURRENT=3
# EVAL_MAX_CONCURRENT=2
### TOP_K query parameter of LightRAG (default: 10)
### Number of entities or relations retrieved from KG
# EVAL_QUERY_TOP_K=10

View file

@ -60,15 +60,30 @@ pip install -e ".[offline-llm]"
### 2. Run Evaluation
**Basic usage (uses defaults):**
```bash
cd /path/to/LightRAG
python -m lightrag.evaluation.eval_rag_quality
python lightrag/evaluation/eval_rag_quality.py
```
Or directly:
**Specify custom dataset:**
```bash
python lightrag/evaluation/eval_rag_quality.py
python lightrag/evaluation/eval_rag_quality.py --dataset my_test.json
```
**Specify custom RAG endpoint:**
```bash
python lightrag/evaluation/eval_rag_quality.py --ragendpoint http://my-server.com:9621
```
**Specify both (short form):**
```bash
python lightrag/evaluation/eval_rag_quality.py -d my_test.json -r http://localhost:9621
```
**Get help:**
```bash
python lightrag/evaluation/eval_rag_quality.py --help
```
### 3. View Results
@ -89,6 +104,49 @@ results/
---
## 📋 Command-Line Arguments
The evaluation script supports command-line arguments for easy configuration:
| Argument | Short | Default | Description |
|----------|-------|---------|-------------|
| `--dataset` | `-d` | `sample_dataset.json` | Path to test dataset JSON file |
| `--ragendpoint` | `-r` | `http://localhost:9621` or `$LIGHTRAG_API_URL` | LightRAG API endpoint URL |
### Usage Examples
**Use default dataset and endpoint:**
```bash
python lightrag/evaluation/eval_rag_quality.py
```
**Custom dataset with default endpoint:**
```bash
python lightrag/evaluation/eval_rag_quality.py --dataset path/to/my_dataset.json
```
**Default dataset with custom endpoint:**
```bash
python lightrag/evaluation/eval_rag_quality.py --ragendpoint http://my-server.com:9621
```
**Custom dataset and endpoint:**
```bash
python lightrag/evaluation/eval_rag_quality.py -d my_dataset.json -r http://localhost:9621
```
**Absolute path to dataset:**
```bash
python lightrag/evaluation/eval_rag_quality.py -d /path/to/custom_dataset.json
```
**Show help message:**
```bash
python lightrag/evaluation/eval_rag_quality.py --help
```
---
## ⚙️ Configuration
### Environment Variables

View file

@ -9,9 +9,22 @@ Evaluates RAG response quality using RAGAS metrics:
- Context Precision: Is retrieved context clean without noise?
Usage:
# Use defaults (sample_dataset.json, http://localhost:9621)
python lightrag/evaluation/eval_rag_quality.py
python lightrag/evaluation/eval_rag_quality.py http://localhost:9621
python lightrag/evaluation/eval_rag_quality.py http://your-rag-server.com:9621
# Specify custom dataset
python lightrag/evaluation/eval_rag_quality.py --dataset my_test.json
python lightrag/evaluation/eval_rag_quality.py -d my_test.json
# Specify custom RAG endpoint
python lightrag/evaluation/eval_rag_quality.py --ragendpoint http://my-server.com:9621
python lightrag/evaluation/eval_rag_quality.py -r http://my-server.com:9621
# Specify both
python lightrag/evaluation/eval_rag_quality.py -d my_test.json -r http://localhost:9621
# Get help
python lightrag/evaluation/eval_rag_quality.py --help
Results are saved to: lightrag/evaluation/results/
- results_YYYYMMDD_HHMMSS.csv (CSV export for analysis)
@ -24,6 +37,7 @@ Technical Notes:
- Deprecation warnings are suppressed for cleaner output
"""
import argparse
import asyncio
import csv
import json
@ -68,6 +82,7 @@ try:
)
from ragas.llms import LangchainLLMWrapper
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from tqdm.auto import tqdm
RAGAS_AVAILABLE = True
@ -199,7 +214,9 @@ class RAGEvaluator:
logger.info(" • Embedding Model: %s", self.eval_embedding_model)
if self.eval_base_url:
logger.info(" • Custom Endpoint: %s", self.eval_base_url)
logger.info(" • Bypass N-Parameter: Enabled (use LangchainLLMWrapperfor compatibility)")
logger.info(
" • Bypass N-Parameter: Enabled (use LangchainLLMWrapperfor compatibility)"
)
else:
logger.info(" • Endpoint: OpenAI Official API")
@ -330,28 +347,36 @@ class RAGEvaluator:
self,
idx: int,
test_case: Dict[str, str],
semaphore: asyncio.Semaphore,
rag_semaphore: asyncio.Semaphore,
eval_semaphore: asyncio.Semaphore,
client: httpx.AsyncClient,
progress_counter: Dict[str, int],
position_pool: asyncio.Queue,
pbar_creation_lock: asyncio.Lock,
) -> Dict[str, Any]:
"""
Evaluate a single test case with concurrency control
Evaluate a single test case with two-stage pipeline concurrency control
Args:
idx: Test case index (1-based)
test_case: Test case dictionary with question and ground_truth
semaphore: Semaphore to control concurrency
rag_semaphore: Semaphore to control overall concurrency (covers entire function)
eval_semaphore: Semaphore to control RAGAS evaluation concurrency (Stage 2)
client: Shared httpx AsyncClient for connection pooling
progress_counter: Shared dictionary for progress tracking
position_pool: Queue of available tqdm position indices
pbar_creation_lock: Lock to serialize tqdm creation and prevent race conditions
Returns:
Evaluation result dictionary
"""
async with semaphore:
# rag_semaphore controls the entire evaluation process to prevent
# all RAG responses from being generated at once when eval is slow
async with rag_semaphore:
question = test_case["question"]
ground_truth = test_case["ground_truth"]
# Generate RAG response by calling actual LightRAG API
# Stage 1: Generate RAG response
try:
rag_response = await self.generate_rag_response(
question=question, client=client
@ -371,11 +396,6 @@ class RAGEvaluator:
# *** CRITICAL FIX: Use actual retrieved contexts, NOT ground_truth ***
retrieved_contexts = rag_response["contexts"]
# DEBUG: Print what was actually retrieved (only in debug mode)
logger.debug(
"📝 Test %s: Retrieved %s contexts", idx, len(retrieved_contexts)
)
# Prepare dataset for RAGAS evaluation with CORRECT contexts
eval_dataset = Dataset.from_dict(
{
@ -386,98 +406,140 @@ class RAGEvaluator:
}
)
# Run RAGAS evaluation
# Stage 2: Run RAGAS evaluation (controlled by eval_semaphore)
# IMPORTANT: Create fresh metric instances for each evaluation to avoid
# concurrent state conflicts when multiple tasks run in parallel
try:
eval_results = evaluate(
dataset=eval_dataset,
metrics=[
Faithfulness(),
AnswerRelevancy(),
ContextRecall(),
ContextPrecision(),
],
llm=self.eval_llm,
embeddings=self.eval_embeddings,
)
async with eval_semaphore:
pbar = None
position = None
try:
# Acquire a position from the pool for this tqdm progress bar
position = await position_pool.get()
# Convert to DataFrame (RAGAS v0.3+ API)
df = eval_results.to_pandas()
# Serialize tqdm creation to prevent race conditions
# Multiple tasks creating tqdm simultaneously can cause display conflicts
async with pbar_creation_lock:
# Create tqdm progress bar with assigned position to avoid overlapping
# leave=False ensures the progress bar is cleared after completion,
# preventing accumulation of completed bars and allowing position reuse
pbar = tqdm(
total=4, desc=f"Eval-{idx}", position=position, leave=False
)
# Give tqdm time to initialize and claim its screen position
await asyncio.sleep(0.05)
# Extract scores from first row
scores_row = df.iloc[0]
eval_results = evaluate(
dataset=eval_dataset,
metrics=[
Faithfulness(),
AnswerRelevancy(),
ContextRecall(),
ContextPrecision(),
],
llm=self.eval_llm,
embeddings=self.eval_embeddings,
_pbar=pbar,
)
# Extract scores (RAGAS v0.3+ uses .to_pandas())
result = {
"test_number": idx,
"question": question,
"answer": rag_response["answer"][:200] + "..."
if len(rag_response["answer"]) > 200
else rag_response["answer"],
"ground_truth": ground_truth[:200] + "..."
if len(ground_truth) > 200
else ground_truth,
"project": test_case.get("project", "unknown"),
"metrics": {
"faithfulness": float(scores_row.get("faithfulness", 0)),
"answer_relevance": float(
scores_row.get("answer_relevancy", 0)
),
"context_recall": float(scores_row.get("context_recall", 0)),
"context_precision": float(
scores_row.get("context_precision", 0)
),
},
"timestamp": datetime.now().isoformat(),
}
# Convert to DataFrame (RAGAS v0.3+ API)
df = eval_results.to_pandas()
# Calculate RAGAS score (average of all metrics, excluding NaN values)
metrics = result["metrics"]
valid_metrics = [v for v in metrics.values() if not _is_nan(v)]
ragas_score = (
sum(valid_metrics) / len(valid_metrics) if valid_metrics else 0
)
result["ragas_score"] = round(ragas_score, 4)
# Extract scores from first row
scores_row = df.iloc[0]
# Update progress counter
progress_counter["completed"] += 1
# Extract scores (RAGAS v0.3+ uses .to_pandas())
result = {
"test_number": idx,
"question": question,
"answer": rag_response["answer"][:200] + "..."
if len(rag_response["answer"]) > 200
else rag_response["answer"],
"ground_truth": ground_truth[:200] + "..."
if len(ground_truth) > 200
else ground_truth,
"project": test_case.get("project", "unknown"),
"metrics": {
"faithfulness": float(scores_row.get("faithfulness", 0)),
"answer_relevance": float(
scores_row.get("answer_relevancy", 0)
),
"context_recall": float(
scores_row.get("context_recall", 0)
),
"context_precision": float(
scores_row.get("context_precision", 0)
),
},
"timestamp": datetime.now().isoformat(),
}
return result
# Calculate RAGAS score (average of all metrics, excluding NaN values)
metrics = result["metrics"]
valid_metrics = [v for v in metrics.values() if not _is_nan(v)]
ragas_score = (
sum(valid_metrics) / len(valid_metrics) if valid_metrics else 0
)
result["ragas_score"] = round(ragas_score, 4)
except Exception as e:
logger.error("Error evaluating test %s: %s", idx, str(e))
progress_counter["completed"] += 1
return {
"test_number": idx,
"question": question,
"error": str(e),
"metrics": {},
"ragas_score": 0,
"timestamp": datetime.now().isoformat(),
}
# Update progress counter
progress_counter["completed"] += 1
return result
except Exception as e:
logger.error("Error evaluating test %s: %s", idx, str(e))
progress_counter["completed"] += 1
return {
"test_number": idx,
"question": question,
"error": str(e),
"metrics": {},
"ragas_score": 0,
"timestamp": datetime.now().isoformat(),
}
finally:
# Force close progress bar to ensure completion
if pbar is not None:
pbar.close()
# Release the position back to the pool for reuse
if position is not None:
await position_pool.put(position)
async def evaluate_responses(self) -> List[Dict[str, Any]]:
"""
Evaluate all test cases in parallel and return metrics
Evaluate all test cases in parallel with two-stage pipeline and return metrics
Returns:
List of evaluation results with metrics
"""
# Get evaluation concurrency from environment (default to 1 for serial evaluation)
max_async = int(os.getenv("EVAL_MAX_CONCURRENT", "3"))
# Get evaluation concurrency from environment (default to 2 for parallel evaluation)
max_async = int(os.getenv("EVAL_MAX_CONCURRENT", "2"))
logger.info("%s", "=" * 70)
logger.info("🚀 Starting RAGAS Evaluation of LightRAG System")
logger.info("🔧 Concurrent evaluations: %s", max_async)
logger.info("🔧 Two-Stage Pipeline Configuration:")
logger.info(" • RAGAS Evaluation (Stage 2): %s concurrent", max_async)
logger.info("%s", "=" * 70)
# Create semaphore to limit concurrent evaluations
semaphore = asyncio.Semaphore(max_async)
# Create two-stage pipeline semaphores
# Stage 1: RAG generation - allow x2 concurrency to keep evaluation fed
rag_semaphore = asyncio.Semaphore(max_async * 2)
# Stage 2: RAGAS evaluation - primary bottleneck
eval_semaphore = asyncio.Semaphore(max_async)
# Create progress counter (shared across all tasks)
progress_counter = {"completed": 0}
# Create position pool for tqdm progress bars
# Positions range from 0 to max_async-1, ensuring no overlapping displays
position_pool = asyncio.Queue()
for i in range(max_async):
await position_pool.put(i)
# Create lock to serialize tqdm creation and prevent race conditions
# This ensures progress bars are created one at a time, avoiding display conflicts
pbar_creation_lock = asyncio.Lock()
# Create shared HTTP client with connection pooling and proper timeouts
# Timeout: 3 minutes for connect, 5 minutes for read (LLM can be slow)
timeout = httpx.Timeout(
@ -486,20 +548,27 @@ class RAGEvaluator:
read=READ_TIMEOUT_SECONDS,
)
limits = httpx.Limits(
max_connections=max_async * 2, # Allow some buffer
max_keepalive_connections=max_async,
max_connections=(max_async + 1) * 2, # Allow buffer for RAG stage
max_keepalive_connections=max_async + 1,
)
async with httpx.AsyncClient(timeout=timeout, limits=limits) as client:
# Create tasks for all test cases
tasks = [
self.evaluate_single_case(
idx, test_case, semaphore, client, progress_counter
idx,
test_case,
rag_semaphore,
eval_semaphore,
client,
progress_counter,
position_pool,
pbar_creation_lock,
)
for idx, test_case in enumerate(self.test_cases, 1)
]
# Run all evaluations in parallel (limited by semaphore)
# Run all evaluations in parallel (limited by two-stage semaphores)
results = await asyncio.gather(*tasks)
return list(results)
@ -759,19 +828,6 @@ class RAGEvaluator:
elapsed_time = time.time() - start_time
# Add a small delay to ensure all buffered output is completely written
await asyncio.sleep(0.5)
# Flush all output buffers to ensure RAGAS progress bars are fully displayed
sys.stdout.flush()
sys.stderr.flush()
sys.stdout.write("\n")
sys.stderr.write("\n")
sys.stdout.flush()
sys.stderr.flush()
# Display results table
self._display_results_table(results)
# Calculate benchmark statistics
benchmark_stats = self._calculate_benchmark_stats(results)
@ -791,6 +847,10 @@ class RAGEvaluator:
)
with open(json_path, "w") as f:
json.dump(summary, f, indent=2)
# Display results table
self._display_results_table(results)
logger.info("✅ JSON results saved to: %s", json_path)
# Export to CSV
@ -846,22 +906,61 @@ async def main():
"""
Main entry point for RAGAS evaluation
Command-line arguments:
--dataset, -d: Path to test dataset JSON file (default: sample_dataset.json)
--ragendpoint, -r: LightRAG API endpoint URL (default: http://localhost:9621 or $LIGHTRAG_API_URL)
Usage:
python lightrag/evaluation/eval_rag_quality.py
python lightrag/evaluation/eval_rag_quality.py http://localhost:9621
python lightrag/evaluation/eval_rag_quality.py http://your-server.com:9621
python lightrag/evaluation/eval_rag_quality.py --dataset my_test.json
python lightrag/evaluation/eval_rag_quality.py -d my_test.json -r http://localhost:9621
"""
try:
# Get RAG API URL from command line or environment
rag_api_url = None
if len(sys.argv) > 1:
rag_api_url = sys.argv[1]
# Parse command-line arguments
parser = argparse.ArgumentParser(
description="RAGAS Evaluation Script for LightRAG System",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Use defaults
python lightrag/evaluation/eval_rag_quality.py
# Specify custom dataset
python lightrag/evaluation/eval_rag_quality.py --dataset my_test.json
# Specify custom RAG endpoint
python lightrag/evaluation/eval_rag_quality.py --ragendpoint http://my-server.com:9621
# Specify both
python lightrag/evaluation/eval_rag_quality.py -d my_test.json -r http://localhost:9621
""",
)
parser.add_argument(
"--dataset",
"-d",
type=str,
default=None,
help="Path to test dataset JSON file (default: sample_dataset.json in evaluation directory)",
)
parser.add_argument(
"--ragendpoint",
"-r",
type=str,
default=None,
help="LightRAG API endpoint URL (default: http://localhost:9621 or $LIGHTRAG_API_URL environment variable)",
)
args = parser.parse_args()
logger.info("%s", "=" * 70)
logger.info("🔍 RAGAS Evaluation - Using Real LightRAG API")
logger.info("%s", "=" * 70)
evaluator = RAGEvaluator(rag_api_url=rag_api_url)
evaluator = RAGEvaluator(
test_dataset_path=args.dataset, rag_api_url=args.ragendpoint
)
await evaluator.run()
except Exception as e:
logger.exception("❌ Error: %s", e)

View file

@ -3,17 +3,32 @@
{
"question": "How does LightRAG solve the hallucination problem in large language models?",
"ground_truth": "LightRAG solves the hallucination problem by combining large language models with external knowledge retrieval. The framework ensures accurate responses by grounding LLM outputs in actual documents. LightRAG provides contextual responses that reduce hallucinations significantly.",
"project": "lightrag_overview"
"project": "lightrag_evaluation_sample"
},
{
"question": "What are the three main components required in a RAG system?",
"ground_truth": "A RAG system requires three main components: a retrieval system (vector database or search engine) to find relevant documents, an embedding model to convert text into vector representations for similarity search, and a large language model (LLM) to generate responses based on retrieved context.",
"project": "rag_architecture"
"project": "lightrag_evaluation_sample"
},
{
"question": "How does LightRAG's retrieval performance compare to traditional RAG approaches?",
"ground_truth": "LightRAG delivers faster retrieval performance than traditional RAG approaches. The framework optimizes document retrieval operations for speed, while traditional RAG systems often suffer from slow query response times. LightRAG achieves high quality results with improved performance.",
"project": "lightrag_improvements"
"ground_truth": "LightRAG delivers faster retrieval performance than traditional RAG approaches. The framework optimizes document retrieval operations for speed. Traditional RAG systems often suffer from slow query response times. LightRAG achieves high quality results with improved performance. The framework combines speed with accuracy in retrieval operations, prioritizing ease of use without sacrificing quality.",
"project": "lightrag_evaluation_sample"
},
{
"question": "What vector databases does LightRAG support and what are their key characteristics?",
"ground_truth": "LightRAG supports multiple vector databases including ChromaDB for simple deployment and efficient similarity search, Neo4j for graph-based knowledge representation with vector capabilities, Milvus for high-performance vector search at scale, Qdrant for fast similarity search with filtering and production-ready infrastructure, MongoDB Atlas for combined document storage and vector search, Redis for in-memory low-latency vector search, and a built-in nano-vectordb that eliminates external dependencies for small projects. This multi-database support enables developers to choose appropriate backends based on scale, performance, and infrastructure requirements.",
"project": "lightrag_evaluation_sample"
},
{
"question": "What are the four key metrics for evaluating RAG system quality and what does each metric measure?",
"ground_truth": "RAG system quality is measured through four key metrics: Faithfulness measures whether answers are factually grounded in retrieved context and detects hallucinations. Answer Relevance measures how well answers address the user question and evaluates response appropriateness. Context Recall measures completeness of retrieval and whether all relevant information was retrieved from documents. Context Precision measures quality and relevance of retrieved documents without noise or irrelevant content.",
"project": "lightrag_evaluation_sample"
},
{
"question": "What are the core benefits of LightRAG and how does it improve upon traditional RAG systems?",
"ground_truth": "LightRAG offers five core benefits: accuracy through document-grounded responses, up-to-date information without model retraining, domain expertise through specialized document collections, cost-effectiveness by avoiding expensive fine-tuning, and transparency by showing source documents. Compared to traditional RAG systems, LightRAG provides a simpler API with intuitive interfaces, faster retrieval performance with optimized operations, better integration with multiple vector database backends for flexible selection, and optimized prompting strategies with refined templates. LightRAG prioritizes ease of use while maintaining quality and combines speed with accuracy.",
"project": "lightrag_evaluation_sample"
}
]
}