This commit is contained in:
Raphaël MANSUY 2025-12-04 19:19:23 +08:00
parent 4af5f6c759
commit d761d8e6be
2 changed files with 575 additions and 238 deletions

View file

@ -25,7 +25,14 @@ Instead of requiring human-annotated ground truth, RAGAS uses state-of-the-art e
``` ```
lightrag/evaluation/ lightrag/evaluation/
├── eval_rag_quality.py # Main evaluation script ├── eval_rag_quality.py # Main evaluation script
├── sample_dataset.json # Generic LightRAG test cases (not personal data) ├── sample_dataset.json # 3 test questions about LightRAG
├── sample_documents/ # Matching markdown files for testing
│ ├── 01_lightrag_overview.md
│ ├── 02_rag_architecture.md
│ ├── 03_lightrag_improvements.md
│ ├── 04_supported_databases.md
│ ├── 05_evaluation_and_deployment.md
│ └── README.md
├── __init__.py # Package init ├── __init__.py # Package init
├── results/ # Output directory ├── results/ # Output directory
│ ├── results_YYYYMMDD_HHMMSS.json # Raw metrics in JSON │ ├── results_YYYYMMDD_HHMMSS.json # Raw metrics in JSON
@ -33,7 +40,7 @@ lightrag/evaluation/
└── README.md # This file └── README.md # This file
``` ```
**Note:** `sample_dataset.json` contains **generic test questions** about LightRAG features (RAG systems, vector databases, deployment, etc.). This is **not personal portfolio data** - you can use these questions directly to test your own LightRAG installation. **Quick Test:** Index files from `sample_documents/` into LightRAG, then run the evaluator to reproduce results (~89-100% RAGAS score per question).
--- ---
@ -53,15 +60,30 @@ pip install -e ".[offline-llm]"
### 2. Run Evaluation ### 2. Run Evaluation
**Basic usage (uses defaults):**
```bash ```bash
cd /path/to/LightRAG cd /path/to/LightRAG
python -m lightrag.evaluation.eval_rag_quality python lightrag/evaluation/eval_rag_quality.py
``` ```
Or directly: **Specify custom dataset:**
```bash ```bash
python lightrag/evaluation/eval_rag_quality.py python lightrag/evaluation/eval_rag_quality.py --dataset my_test.json
```
**Specify custom RAG endpoint:**
```bash
python lightrag/evaluation/eval_rag_quality.py --ragendpoint http://my-server.com:9621
```
**Specify both (short form):**
```bash
python lightrag/evaluation/eval_rag_quality.py -d my_test.json -r http://localhost:9621
```
**Get help:**
```bash
python lightrag/evaluation/eval_rag_quality.py --help
``` ```
### 3. View Results ### 3. View Results
@ -82,72 +104,142 @@ results/
--- ---
## 📋 Command-Line Arguments
The evaluation script supports command-line arguments for easy configuration:
| Argument | Short | Default | Description |
|----------|-------|---------|-------------|
| `--dataset` | `-d` | `sample_dataset.json` | Path to test dataset JSON file |
| `--ragendpoint` | `-r` | `http://localhost:9621` or `$LIGHTRAG_API_URL` | LightRAG API endpoint URL |
### Usage Examples
**Use default dataset and endpoint:**
```bash
python lightrag/evaluation/eval_rag_quality.py
```
**Custom dataset with default endpoint:**
```bash
python lightrag/evaluation/eval_rag_quality.py --dataset path/to/my_dataset.json
```
**Default dataset with custom endpoint:**
```bash
python lightrag/evaluation/eval_rag_quality.py --ragendpoint http://my-server.com:9621
```
**Custom dataset and endpoint:**
```bash
python lightrag/evaluation/eval_rag_quality.py -d my_dataset.json -r http://localhost:9621
```
**Absolute path to dataset:**
```bash
python lightrag/evaluation/eval_rag_quality.py -d /path/to/custom_dataset.json
```
**Show help message:**
```bash
python lightrag/evaluation/eval_rag_quality.py --help
```
---
## ⚙️ Configuration
### Environment Variables
The evaluation framework supports customization through environment variables:
| Variable | Default | Description |
|----------|---------|-------------|
| `EVAL_LLM_MODEL` | `gpt-4o-mini` | LLM model used for RAGAS evaluation |
| `EVAL_EMBEDDING_MODEL` | `text-embedding-3-small` | Embedding model for evaluation |
| `EVAL_LLM_BINDING_API_KEY` | (falls back to `OPENAI_API_KEY`) | API key for evaluation models |
| `EVAL_LLM_BINDING_HOST` | (optional) | Custom endpoint URL for OpenAI-compatible services |
| `EVAL_MAX_CONCURRENT` | `1` | Number of concurrent test case evaluations (1=serial) |
| `EVAL_QUERY_TOP_K` | `10` | Number of documents to retrieve per query |
| `EVAL_LLM_MAX_RETRIES` | `5` | Maximum LLM request retries |
| `EVAL_LLM_TIMEOUT` | `120` | LLM request timeout in seconds |
### Usage Examples
**Default Configuration (OpenAI):**
```bash
export OPENAI_API_KEY=sk-xxx
python lightrag/evaluation/eval_rag_quality.py
```
**Custom Model:**
```bash
export OPENAI_API_KEY=sk-xxx
export EVAL_LLM_MODEL=gpt-4o-mini
export EVAL_EMBEDDING_MODEL=text-embedding-3-large
python lightrag/evaluation/eval_rag_quality.py
```
**OpenAI-Compatible Endpoint:**
```bash
export EVAL_LLM_BINDING_API_KEY=your-custom-key
export EVAL_LLM_BINDING_HOST=https://api.openai.com/v1
export EVAL_LLM_MODEL=qwen-plus
python lightrag/evaluation/eval_rag_quality.py
```
### Concurrency Control & Rate Limiting
The evaluation framework includes built-in concurrency control to prevent API rate limiting issues:
**Why Concurrency Control Matters:**
- RAGAS internally makes many concurrent LLM calls for each test case
- Context Precision metric calls LLM once per retrieved document
- Without control, this can easily exceed API rate limits
**Default Configuration (Conservative):**
```bash
EVAL_MAX_CONCURRENT=1 # Serial evaluation (one test at a time)
EVAL_QUERY_TOP_K=10 # OP_K query parameter of LightRAG
EVAL_LLM_MAX_RETRIES=5 # Retry failed requests 5 times
EVAL_LLM_TIMEOUT=180 # 2-minute timeout per request
```
**If You Have Higher API Quotas:**
```bash
EVAL_MAX_CONCURRENT=2 # Evaluate 2 tests in parallel
EVAL_QUERY_TOP_K=20 # OP_K query parameter of LightRAG
```
**Common Issues and Solutions:**
| Issue | Solution |
|-------|----------|
| **Warning: "LM returned 1 generations instead of 3"** | Reduce `EVAL_MAX_CONCURRENT` to 1 or decrease `EVAL_QUERY_TOP_K` |
| **Context Precision returns NaN** | Lower `EVAL_QUERY_TOP_K` to reduce LLM calls per test case |
| **Rate limit errors (429)** | Increase `EVAL_LLM_MAX_RETRIES` and decrease `EVAL_MAX_CONCURRENT` |
| **Request timeouts** | Increase `EVAL_LLM_TIMEOUT` to 180 or higher |
---
## 📝 Test Dataset ## 📝 Test Dataset
The included `sample_dataset.json` contains **generic example questions** about LightRAG (RAG systems, vector databases, deployment, etc.). **This is NOT personal data** - it's meant as a template. `sample_dataset.json` contains 3 generic questions about LightRAG. Replace with questions matching YOUR indexed documents.
**Important:** You should **replace these with test questions based on YOUR data** that you've injected into your RAG system. **Custom Test Cases:**
### Creating Your Own Test Cases
Edit `sample_dataset.json` with questions relevant to your indexed documents:
```json ```json
{ {
"test_cases": [ "test_cases": [
{ {
"question": "Question based on your documents", "question": "Your question here",
"ground_truth": "Expected answer from your data", "ground_truth": "Expected answer from your data",
"context": "topic_category" "context": "topic"
} }
] ]
} }
``` ```
**Example (for a technical portfolio):**
```json
{
"question": "Which projects use PyTorch?",
"ground_truth": "The Neural ODE Project uses PyTorch with TorchODE library for continuous-time neural networks.",
"context": "ml_projects"
}
```
---
## 🔧 Integration with Your RAG System
Currently, the evaluation script uses **ground truth as mock responses**. To evaluate your actual LightRAG:
### Step 1: Update `generate_rag_response()`
In `eval_rag_quality.py`, replace the mock implementation:
```python
async def generate_rag_response(self, question: str, context: str = None) -> Dict[str, str]:
"""Generate RAG response using your LightRAG system"""
from lightrag import LightRAG
rag = LightRAG(
working_dir="./rag_storage",
llm_model_func=your_llm_function
)
response = await rag.aquery(question)
return {
"answer": response,
"context": "context_from_kg" # If available
}
```
### Step 2: Run Evaluation
```bash
python lightrag/evaluation/eval_rag_quality.py
```
--- ---
## 📊 Interpreting Results ## 📊 Interpreting Results
@ -192,82 +284,10 @@ python lightrag/evaluation/eval_rag_quality.py
--- ---
## 📈 Usage Examples
### Python API
```python
import asyncio
from lightrag.evaluation import RAGEvaluator
async def main():
evaluator = RAGEvaluator()
results = await evaluator.run()
# Access results
for result in results:
print(f"Question: {result['question']}")
print(f"RAGAS Score: {result['ragas_score']:.2%}")
print(f"Metrics: {result['metrics']}")
asyncio.run(main())
```
### Custom Dataset
```python
evaluator = RAGEvaluator(test_dataset_path="custom_tests.json")
results = await evaluator.run()
```
### Batch Evaluation
```python
from pathlib import Path
import json
results_dir = Path("lightrag/evaluation/results")
results_dir.mkdir(exist_ok=True)
# Run multiple evaluations
for i in range(3):
evaluator = RAGEvaluator()
results = await evaluator.run()
```
---
## 🎯 Using Evaluation Results
**What the Metrics Tell You:**
1. ✅ **Quality Metrics**: Overall RAGAS score indicates system health
2. ✅ **Evaluation Framework**: Automated quality assessment with RAGAS
3. ✅ **Best Practices**: Offline evaluation pipeline for continuous improvement
4. ✅ **Production-Ready**: Metrics-driven system optimization
**Example Use Cases:**
- Track RAG quality over time as you update your documents
- Compare different retrieval modes (local, global, hybrid, mix)
- Measure impact of chunking strategy changes
- Validate system performance before deployment
---
## 🔗 Related Features
- **LangFuse Integration**: Real-time observability of production RAG calls
- **LightRAG**: Core RAG system with entity extraction and knowledge graphs
- **Metrics**: See `results/` for detailed evaluation metrics
---
## 📚 Resources ## 📚 Resources
- [RAGAS Documentation](https://docs.ragas.io/) - [RAGAS Documentation](https://docs.ragas.io/)
- [RAGAS GitHub](https://github.com/explodinggradients/ragas) - [RAGAS GitHub](https://github.com/explodinggradients/ragas)
- [LangFuse + RAGAS Guide](https://langfuse.com/guides/cookbook/evaluation_of_rag_with_ragas)
--- ---
@ -279,6 +299,50 @@ for i in range(3):
pip install ragas datasets pip install ragas datasets
``` ```
### "Warning: LM returned 1 generations instead of requested 3" or Context Precision NaN
**Cause**: This warning indicates API rate limiting or concurrent request overload:
- RAGAS makes multiple LLM calls per test case (faithfulness, relevancy, recall, precision)
- Context Precision calls LLM once per retrieved document (with `EVAL_QUERY_TOP_K=10`, that's 10 calls)
- Concurrent evaluation multiplies these calls: `EVAL_MAX_CONCURRENT × LLM calls per test`
**Solutions** (in order of effectiveness):
1. **Serial Evaluation** (Default):
```bash
export EVAL_MAX_CONCURRENT=1
python lightrag/evaluation/eval_rag_quality.py
```
2. **Reduce Retrieved Documents**:
```bash
export EVAL_QUERY_TOP_K=5 # Halves Context Precision LLM calls
python lightrag/evaluation/eval_rag_quality.py
```
3. **Increase Retry & Timeout**:
```bash
export EVAL_LLM_MAX_RETRIES=10
export EVAL_LLM_TIMEOUT=180
python lightrag/evaluation/eval_rag_quality.py
```
4. **Use Higher Quota API** (if available):
- Upgrade to OpenAI Tier 2+ for higher RPM limits
- Use self-hosted OpenAI-compatible service with no rate limits
### "AttributeError: 'InstructorLLM' object has no attribute 'agenerate_prompt'" or NaN results
This error occurs with RAGAS 0.3.x when LLM and Embeddings are not explicitly configured. The evaluation framework now handles this automatically by:
- Using environment variables to configure evaluation models
- Creating proper LLM and Embeddings instances for RAGAS
**Solution**: Ensure you have set one of the following:
- `OPENAI_API_KEY` environment variable (default)
- `EVAL_LLM_BINDING_API_KEY` for custom API key
The framework will automatically configure the evaluation models.
### "No sample_dataset.json found" ### "No sample_dataset.json found"
Make sure you're running from the project root: Make sure you're running from the project root:
@ -295,25 +359,22 @@ The evaluation uses your configured LLM (OpenAI by default). Ensure:
- Have sufficient API quota - Have sufficient API quota
- Network connection is stable - Network connection is stable
### Results showing 0 scores ### Evaluation requires running LightRAG API
Current implementation uses ground truth as mock responses. Results will show perfect scores because the "generated answer" equals the ground truth. The evaluator queries a running LightRAG API server at `http://localhost:9621`. Make sure:
1. LightRAG API server is running (`python lightrag/api/lightrag_server.py`)
**To use actual RAG results:** 2. Documents are indexed in your LightRAG instance
1. Implement the `generate_rag_response()` method 3. API is accessible at the configured URL
2. Connect to your LightRAG instance
3. Run evaluation again
--- ---
## 📝 Next Steps ## 📝 Next Steps
1. ✅ Review test dataset in `sample_dataset.json` 1. Index documents into LightRAG (WebUI or API)
2. ✅ Run `python lightrag/evaluation/eval_rag_quality.py` 2. Start LightRAG API server
3. ✅ Open the HTML report in browser 3. Run `python lightrag/evaluation/eval_rag_quality.py`
4. 🔄 Integrate with actual LightRAG system 4. Review results (JSON/CSV) in `results/` folder
5. 📊 Monitor metrics over time 5. Adjust entity extraction prompts or retrieval settings based on scores
6. 🎯 Use insights for optimization
--- ---

View file

@ -1,6 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
RAGAS Evaluation Script for Portfolio RAG System RAGAS Evaluation Script for LightRAG System
Evaluates RAG response quality using RAGAS metrics: Evaluates RAG response quality using RAGAS metrics:
- Faithfulness: Is the answer factually accurate based on context? - Faithfulness: Is the answer factually accurate based on context?
@ -9,15 +9,35 @@ Evaluates RAG response quality using RAGAS metrics:
- Context Precision: Is retrieved context clean without noise? - Context Precision: Is retrieved context clean without noise?
Usage: Usage:
# Use defaults (sample_dataset.json, http://localhost:9621)
python lightrag/evaluation/eval_rag_quality.py python lightrag/evaluation/eval_rag_quality.py
python lightrag/evaluation/eval_rag_quality.py http://localhost:9621
python lightrag/evaluation/eval_rag_quality.py http://your-rag-server.com:9621 # Specify custom dataset
python lightrag/evaluation/eval_rag_quality.py --dataset my_test.json
python lightrag/evaluation/eval_rag_quality.py -d my_test.json
# Specify custom RAG endpoint
python lightrag/evaluation/eval_rag_quality.py --ragendpoint http://my-server.com:9621
python lightrag/evaluation/eval_rag_quality.py -r http://my-server.com:9621
# Specify both
python lightrag/evaluation/eval_rag_quality.py -d my_test.json -r http://localhost:9621
# Get help
python lightrag/evaluation/eval_rag_quality.py --help
Results are saved to: lightrag/evaluation/results/ Results are saved to: lightrag/evaluation/results/
- results_YYYYMMDD_HHMMSS.csv (CSV export for analysis) - results_YYYYMMDD_HHMMSS.csv (CSV export for analysis)
- results_YYYYMMDD_HHMMSS.json (Full results with details) - results_YYYYMMDD_HHMMSS.json (Full results with details)
Technical Notes:
- Uses stable RAGAS API (LangchainLLMWrapper) for maximum compatibility
- Supports custom OpenAI-compatible endpoints via EVAL_LLM_BINDING_HOST
- Enables bypass_n mode for endpoints that don't support 'n' parameter
- Deprecation warnings are suppressed for cleaner output
""" """
import argparse
import asyncio import asyncio
import csv import csv
import json import json
@ -25,6 +45,7 @@ import math
import os import os
import sys import sys
import time import time
import warnings
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List from typing import Any, Dict, List
@ -33,29 +54,42 @@ import httpx
from dotenv import load_dotenv from dotenv import load_dotenv
from lightrag.utils import logger from lightrag.utils import logger
# Suppress LangchainLLMWrapper deprecation warning
# We use LangchainLLMWrapper for stability and compatibility with all RAGAS versions
warnings.filterwarnings(
"ignore",
message=".*LangchainLLMWrapper is deprecated.*",
category=DeprecationWarning,
)
# Add parent directory to path # Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent.parent)) sys.path.insert(0, str(Path(__file__).parent.parent.parent))
# Load .env from project root # use the .env that is inside the current folder
project_root = Path(__file__).parent.parent.parent # allows to use different .env file for each lightrag instance
load_dotenv(project_root / ".env") # the OS environment variables take precedence over the .env file
load_dotenv(dotenv_path=".env", override=False)
# Conditional imports - will raise ImportError if dependencies not installed # Conditional imports - will raise ImportError if dependencies not installed
try: try:
from datasets import Dataset from datasets import Dataset
from ragas import evaluate from ragas import evaluate
from ragas.metrics import ( from ragas.metrics import (
answer_relevancy, AnswerRelevancy,
context_precision, ContextPrecision,
context_recall, ContextRecall,
faithfulness, Faithfulness,
) )
from ragas.llms import LangchainLLMWrapper
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
RAGAS_AVAILABLE = True RAGAS_AVAILABLE = True
except ImportError: except ImportError:
RAGAS_AVAILABLE = False RAGAS_AVAILABLE = False
Dataset = None Dataset = None
evaluate = None evaluate = None
LangchainLLMWrapper = None
CONNECT_TIMEOUT_SECONDS = 180.0 CONNECT_TIMEOUT_SECONDS = 180.0
@ -80,10 +114,15 @@ class RAGEvaluator:
rag_api_url: Base URL of LightRAG API (e.g., http://localhost:9621) rag_api_url: Base URL of LightRAG API (e.g., http://localhost:9621)
If None, will try to read from environment or use default If None, will try to read from environment or use default
Environment Variables:
EVAL_LLM_MODEL: LLM model for evaluation (default: gpt-4o-mini)
EVAL_EMBEDDING_MODEL: Embedding model for evaluation (default: text-embedding-3-small)
EVAL_LLM_BINDING_API_KEY: API key for evaluation models (fallback to OPENAI_API_KEY)
EVAL_LLM_BINDING_HOST: Custom endpoint URL for evaluation models (optional)
Raises: Raises:
ImportError: If ragas or datasets packages are not installed ImportError: If ragas or datasets packages are not installed
ValueError: If LLM_BINDING is not set to 'openai' EnvironmentError: If EVAL_LLM_BINDING_API_KEY and OPENAI_API_KEY are both not set
EnvironmentError: If LLM_BINDING_API_KEY is not set
""" """
# Validate RAGAS dependencies are installed # Validate RAGAS dependencies are installed
if not RAGAS_AVAILABLE: if not RAGAS_AVAILABLE:
@ -92,25 +131,56 @@ class RAGEvaluator:
"Install with: pip install ragas datasets" "Install with: pip install ragas datasets"
) )
# Validate LLM_BINDING is set to openai (required for RAGAS) # Configure evaluation models (for RAGAS scoring)
llm_binding = os.getenv("LLM_BINDING", "").lower() eval_api_key = os.getenv("EVAL_LLM_BINDING_API_KEY") or os.getenv(
if llm_binding != "openai": "OPENAI_API_KEY"
raise ValueError( )
f"LLM_BINDING must be set to 'openai' for RAGAS evaluation. " if not eval_api_key:
f"Current value: '{llm_binding or '(not set)'}'"
)
# Validate LLM_BINDING_API_KEY exists
llm_binding_key = os.getenv("LLM_BINDING_API_KEY")
if not llm_binding_key:
raise EnvironmentError( raise EnvironmentError(
"LLM_BINDING_API_KEY environment variable is not set. " "EVAL_LLM_BINDING_API_KEY or OPENAI_API_KEY is required for evaluation. "
"This is required for RAGAS evaluation." "Set EVAL_LLM_BINDING_API_KEY to use a custom API key, "
"or ensure OPENAI_API_KEY is set."
) )
# Set OPENAI_API_KEY from LLM_BINDING_API_KEY for RAGAS eval_model = os.getenv("EVAL_LLM_MODEL", "gpt-4o-mini")
os.environ["OPENAI_API_KEY"] = llm_binding_key eval_embedding_model = os.getenv(
logger.info("✅ LLM_BINDING: openai") "EVAL_EMBEDDING_MODEL", "text-embedding-3-large"
)
eval_base_url = os.getenv("EVAL_LLM_BINDING_HOST")
# Create LLM and Embeddings instances for RAGAS
llm_kwargs = {
"model": eval_model,
"api_key": eval_api_key,
"max_retries": int(os.getenv("EVAL_LLM_MAX_RETRIES", "5")),
"request_timeout": int(os.getenv("EVAL_LLM_TIMEOUT", "180")),
}
embedding_kwargs = {"model": eval_embedding_model, "api_key": eval_api_key}
if eval_base_url:
llm_kwargs["base_url"] = eval_base_url
embedding_kwargs["base_url"] = eval_base_url
# Create base LangChain LLM
base_llm = ChatOpenAI(**llm_kwargs)
self.eval_embeddings = OpenAIEmbeddings(**embedding_kwargs)
# Wrap LLM with LangchainLLMWrapper and enable bypass_n mode for custom endpoints
# This ensures compatibility with endpoints that don't support the 'n' parameter
# by generating multiple outputs through repeated prompts instead of using 'n' parameter
try:
self.eval_llm = LangchainLLMWrapper(
langchain_llm=base_llm,
bypass_n=True, # Enable bypass_n to avoid passing 'n' to OpenAI API
)
logger.debug("Successfully configured bypass_n mode for LLM wrapper")
except Exception as e:
logger.warning(
"Could not configure LangchainLLMWrapper with bypass_n: %s. "
"Using base LLM directly, which may cause warnings with custom endpoints.",
e,
)
self.eval_llm = base_llm
if test_dataset_path is None: if test_dataset_path is None:
test_dataset_path = Path(__file__).parent / "sample_dataset.json" test_dataset_path = Path(__file__).parent / "sample_dataset.json"
@ -126,6 +196,41 @@ class RAGEvaluator:
# Load test dataset # Load test dataset
self.test_cases = self._load_test_dataset() self.test_cases = self._load_test_dataset()
# Store configuration values for display
self.eval_model = eval_model
self.eval_embedding_model = eval_embedding_model
self.eval_base_url = eval_base_url
self.eval_max_retries = llm_kwargs["max_retries"]
self.eval_timeout = llm_kwargs["request_timeout"]
# Display configuration
self._display_configuration()
def _display_configuration(self):
"""Display all evaluation configuration settings"""
logger.info("Evaluation Models:")
logger.info(" • LLM Model: %s", self.eval_model)
logger.info(" • Embedding Model: %s", self.eval_embedding_model)
if self.eval_base_url:
logger.info(" • Custom Endpoint: %s", self.eval_base_url)
logger.info(
" • Bypass N-Parameter: Enabled (use LangchainLLMWrapperfor compatibility)"
)
else:
logger.info(" • Endpoint: OpenAI Official API")
logger.info("Concurrency & Rate Limiting:")
query_top_k = int(os.getenv("EVAL_QUERY_TOP_K", "10"))
logger.info(" • Query Top-K: %s Entities/Relations", query_top_k)
logger.info(" • LLM Max Retries: %s", self.eval_max_retries)
logger.info(" • LLM Timeout: %s seconds", self.eval_timeout)
logger.info("Test Configuration:")
logger.info(" • Total Test Cases: %s", len(self.test_cases))
logger.info(" • Test Dataset: %s", self.test_dataset_path.name)
logger.info(" • LightRAG API: %s", self.rag_api_url)
logger.info(" • Results Directory: %s", self.results_dir.name)
def _load_test_dataset(self) -> List[Dict[str, str]]: def _load_test_dataset(self) -> List[Dict[str, str]]:
"""Load test cases from JSON file""" """Load test cases from JSON file"""
if not self.test_dataset_path.exists(): if not self.test_dataset_path.exists():
@ -162,13 +267,22 @@ class RAGEvaluator:
"include_references": True, "include_references": True,
"include_chunk_content": True, # NEW: Request chunk content in references "include_chunk_content": True, # NEW: Request chunk content in references
"response_type": "Multiple Paragraphs", "response_type": "Multiple Paragraphs",
"top_k": 10, "top_k": int(os.getenv("EVAL_QUERY_TOP_K", "10")),
} }
# Get API key from environment for authentication
api_key = os.getenv("LIGHTRAG_API_KEY")
# Prepare headers with optional authentication
headers = {}
if api_key:
headers["X-API-Key"] = api_key
# Single optimized API call - gets both answer AND chunk content # Single optimized API call - gets both answer AND chunk content
response = await client.post( response = await client.post(
f"{self.rag_api_url}/query", f"{self.rag_api_url}/query",
json=payload, json=payload,
headers=headers if headers else None,
) )
response.raise_for_status() response.raise_for_status()
result = response.json() result = response.json()
@ -234,6 +348,7 @@ class RAGEvaluator:
test_case: Dict[str, str], test_case: Dict[str, str],
semaphore: asyncio.Semaphore, semaphore: asyncio.Semaphore,
client: httpx.AsyncClient, client: httpx.AsyncClient,
progress_counter: Dict[str, int],
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Evaluate a single test case with concurrency control Evaluate a single test case with concurrency control
@ -243,34 +358,39 @@ class RAGEvaluator:
test_case: Test case dictionary with question and ground_truth test_case: Test case dictionary with question and ground_truth
semaphore: Semaphore to control concurrency semaphore: Semaphore to control concurrency
client: Shared httpx AsyncClient for connection pooling client: Shared httpx AsyncClient for connection pooling
progress_counter: Shared dictionary for progress tracking
Returns: Returns:
Evaluation result dictionary Evaluation result dictionary
""" """
total_cases = len(self.test_cases)
async with semaphore: async with semaphore:
question = test_case["question"] question = test_case["question"]
ground_truth = test_case["ground_truth"] ground_truth = test_case["ground_truth"]
logger.info("[%s/%s] Evaluating: %s...", idx, total_cases, question[:60])
# Generate RAG response by calling actual LightRAG API # Generate RAG response by calling actual LightRAG API
rag_response = await self.generate_rag_response( try:
question=question, client=client rag_response = await self.generate_rag_response(
) question=question, client=client
)
except Exception as e:
logger.error("Error generating response for test %s: %s", idx, str(e))
progress_counter["completed"] += 1
return {
"test_number": idx,
"question": question,
"error": str(e),
"metrics": {},
"ragas_score": 0,
"timestamp": datetime.now().isoformat(),
}
# *** CRITICAL FIX: Use actual retrieved contexts, NOT ground_truth *** # *** CRITICAL FIX: Use actual retrieved contexts, NOT ground_truth ***
retrieved_contexts = rag_response["contexts"] retrieved_contexts = rag_response["contexts"]
# DEBUG: Print what was actually retrieved # DEBUG: Print what was actually retrieved (only in debug mode)
logger.debug("📝 Retrieved %s contexts", len(retrieved_contexts)) logger.debug(
if retrieved_contexts: "📝 Test %s: Retrieved %s contexts", idx, len(retrieved_contexts)
logger.debug( )
"📄 First context preview: %s...", retrieved_contexts[0][:100]
)
else:
logger.warning("⚠️ No contexts retrieved!")
# Prepare dataset for RAGAS evaluation with CORRECT contexts # Prepare dataset for RAGAS evaluation with CORRECT contexts
eval_dataset = Dataset.from_dict( eval_dataset = Dataset.from_dict(
@ -283,15 +403,19 @@ class RAGEvaluator:
) )
# Run RAGAS evaluation # Run RAGAS evaluation
# IMPORTANT: Create fresh metric instances for each evaluation to avoid
# concurrent state conflicts when multiple tasks run in parallel
try: try:
eval_results = evaluate( eval_results = evaluate(
dataset=eval_dataset, dataset=eval_dataset,
metrics=[ metrics=[
faithfulness, Faithfulness(),
answer_relevancy, AnswerRelevancy(),
context_recall, ContextRecall(),
context_precision, ContextPrecision(),
], ],
llm=self.eval_llm,
embeddings=self.eval_embeddings,
) )
# Convert to DataFrame (RAGAS v0.3+ API) # Convert to DataFrame (RAGAS v0.3+ API)
@ -302,6 +426,7 @@ class RAGEvaluator:
# Extract scores (RAGAS v0.3+ uses .to_pandas()) # Extract scores (RAGAS v0.3+ uses .to_pandas())
result = { result = {
"test_number": idx,
"question": question, "question": question,
"answer": rag_response["answer"][:200] + "..." "answer": rag_response["answer"][:200] + "..."
if len(rag_response["answer"]) > 200 if len(rag_response["answer"]) > 200
@ -309,7 +434,7 @@ class RAGEvaluator:
"ground_truth": ground_truth[:200] + "..." "ground_truth": ground_truth[:200] + "..."
if len(ground_truth) > 200 if len(ground_truth) > 200
else ground_truth, else ground_truth,
"project": test_case.get("project_context", "unknown"), "project": test_case.get("project", "unknown"),
"metrics": { "metrics": {
"faithfulness": float(scores_row.get("faithfulness", 0)), "faithfulness": float(scores_row.get("faithfulness", 0)),
"answer_relevance": float( "answer_relevance": float(
@ -323,22 +448,24 @@ class RAGEvaluator:
"timestamp": datetime.now().isoformat(), "timestamp": datetime.now().isoformat(),
} }
# Calculate RAGAS score (average of all metrics) # Calculate RAGAS score (average of all metrics, excluding NaN values)
metrics = result["metrics"] metrics = result["metrics"]
ragas_score = sum(metrics.values()) / len(metrics) if metrics else 0 valid_metrics = [v for v in metrics.values() if not _is_nan(v)]
ragas_score = (
sum(valid_metrics) / len(valid_metrics) if valid_metrics else 0
)
result["ragas_score"] = round(ragas_score, 4) result["ragas_score"] = round(ragas_score, 4)
logger.info("✅ Faithfulness: %.4f", metrics["faithfulness"]) # Update progress counter
logger.info("✅ Answer Relevance: %.4f", metrics["answer_relevance"]) progress_counter["completed"] += 1
logger.info("✅ Context Recall: %.4f", metrics["context_recall"])
logger.info("✅ Context Precision: %.4f", metrics["context_precision"])
logger.info("📊 RAGAS Score: %.4f", result["ragas_score"])
return result return result
except Exception as e: except Exception as e:
logger.exception("❌ Error evaluating: %s", e) logger.error("Error evaluating test %s: %s", idx, str(e))
progress_counter["completed"] += 1
return { return {
"test_number": idx,
"question": question, "question": question,
"error": str(e), "error": str(e),
"metrics": {}, "metrics": {},
@ -353,18 +480,20 @@ class RAGEvaluator:
Returns: Returns:
List of evaluation results with metrics List of evaluation results with metrics
""" """
# Get MAX_ASYNC from environment (default to 4 if not set) # Get evaluation concurrency from environment (default to 1 for serial evaluation)
max_async = int(os.getenv("MAX_ASYNC", "4")) max_async = int(os.getenv("EVAL_MAX_CONCURRENT", "3"))
logger.info("")
logger.info("%s", "=" * 70) logger.info("%s", "=" * 70)
logger.info("🚀 Starting RAGAS Evaluation of Portfolio RAG System") logger.info("🚀 Starting RAGAS Evaluation of LightRAG System")
logger.info("🔧 Parallel evaluations: %s", max_async) logger.info("🔧 Concurrent evaluations: %s", max_async)
logger.info("%s", "=" * 70) logger.info("%s", "=" * 70)
# Create semaphore to limit concurrent evaluations # Create semaphore to limit concurrent evaluations
semaphore = asyncio.Semaphore(max_async) semaphore = asyncio.Semaphore(max_async)
# Create progress counter (shared across all tasks)
progress_counter = {"completed": 0}
# Create shared HTTP client with connection pooling and proper timeouts # Create shared HTTP client with connection pooling and proper timeouts
# Timeout: 3 minutes for connect, 5 minutes for read (LLM can be slow) # Timeout: 3 minutes for connect, 5 minutes for read (LLM can be slow)
timeout = httpx.Timeout( timeout = httpx.Timeout(
@ -380,7 +509,9 @@ class RAGEvaluator:
async with httpx.AsyncClient(timeout=timeout, limits=limits) as client: async with httpx.AsyncClient(timeout=timeout, limits=limits) as client:
# Create tasks for all test cases # Create tasks for all test cases
tasks = [ tasks = [
self.evaluate_single_case(idx, test_case, semaphore, client) self.evaluate_single_case(
idx, test_case, semaphore, client, progress_counter
)
for idx, test_case in enumerate(self.test_cases, 1) for idx, test_case in enumerate(self.test_cases, 1)
] ]
@ -449,6 +580,94 @@ class RAGEvaluator:
return csv_path return csv_path
def _format_metric(self, value: float, width: int = 6) -> str:
"""
Format a metric value for display, handling NaN gracefully
Args:
value: The metric value to format
width: The width of the formatted string
Returns:
Formatted string (e.g., "0.8523" or " N/A ")
"""
if _is_nan(value):
return "N/A".center(width)
return f"{value:.4f}".rjust(width)
def _display_results_table(self, results: List[Dict[str, Any]]):
"""
Display evaluation results in a formatted table
Args:
results: List of evaluation results
"""
logger.info("%s", "=" * 115)
logger.info("📊 EVALUATION RESULTS SUMMARY")
logger.info("%s", "=" * 115)
# Table header
logger.info(
"%-4s | %-50s | %6s | %7s | %6s | %7s | %6s | %6s",
"#",
"Question",
"Faith",
"AnswRel",
"CtxRec",
"CtxPrec",
"RAGAS",
"Status",
)
logger.info("%s", "-" * 115)
# Table rows
for result in results:
test_num = result.get("test_number", 0)
question = result.get("question", "")
# Truncate question to 50 chars
question_display = (
(question[:47] + "...") if len(question) > 50 else question
)
metrics = result.get("metrics", {})
if metrics:
# Success case - format each metric, handling NaN values
faith = metrics.get("faithfulness", 0)
ans_rel = metrics.get("answer_relevance", 0)
ctx_rec = metrics.get("context_recall", 0)
ctx_prec = metrics.get("context_precision", 0)
ragas = result.get("ragas_score", 0)
status = ""
logger.info(
"%-4d | %-50s | %s | %s | %s | %s | %s | %6s",
test_num,
question_display,
self._format_metric(faith, 6),
self._format_metric(ans_rel, 7),
self._format_metric(ctx_rec, 6),
self._format_metric(ctx_prec, 7),
self._format_metric(ragas, 6),
status,
)
else:
# Error case
error = result.get("error", "Unknown error")
error_display = (error[:20] + "...") if len(error) > 23 else error
logger.info(
"%-4d | %-50s | %6s | %7s | %6s | %7s | %6s | ✗ %s",
test_num,
question_display,
"N/A",
"N/A",
"N/A",
"N/A",
"N/A",
error_display,
)
logger.info("%s", "=" * 115)
def _calculate_benchmark_stats( def _calculate_benchmark_stats(
self, results: List[Dict[str, Any]] self, results: List[Dict[str, Any]]
) -> Dict[str, Any]: ) -> Dict[str, Any]:
@ -475,45 +694,55 @@ class RAGEvaluator:
"success_rate": 0.0, "success_rate": 0.0,
} }
# Calculate averages for each metric (handling NaN values) # Calculate averages for each metric (handling NaN values correctly)
metrics_sum = { # Track both sum and count for each metric to handle NaN values properly
"faithfulness": 0.0, metrics_data = {
"answer_relevance": 0.0, "faithfulness": {"sum": 0.0, "count": 0},
"context_recall": 0.0, "answer_relevance": {"sum": 0.0, "count": 0},
"context_precision": 0.0, "context_recall": {"sum": 0.0, "count": 0},
"ragas_score": 0.0, "context_precision": {"sum": 0.0, "count": 0},
"ragas_score": {"sum": 0.0, "count": 0},
} }
for result in valid_results: for result in valid_results:
metrics = result.get("metrics", {}) metrics = result.get("metrics", {})
# Skip NaN values when summing
# For each metric, sum non-NaN values and count them
faithfulness = metrics.get("faithfulness", 0) faithfulness = metrics.get("faithfulness", 0)
if not _is_nan(faithfulness): if not _is_nan(faithfulness):
metrics_sum["faithfulness"] += faithfulness metrics_data["faithfulness"]["sum"] += faithfulness
metrics_data["faithfulness"]["count"] += 1
answer_relevance = metrics.get("answer_relevance", 0) answer_relevance = metrics.get("answer_relevance", 0)
if not _is_nan(answer_relevance): if not _is_nan(answer_relevance):
metrics_sum["answer_relevance"] += answer_relevance metrics_data["answer_relevance"]["sum"] += answer_relevance
metrics_data["answer_relevance"]["count"] += 1
context_recall = metrics.get("context_recall", 0) context_recall = metrics.get("context_recall", 0)
if not _is_nan(context_recall): if not _is_nan(context_recall):
metrics_sum["context_recall"] += context_recall metrics_data["context_recall"]["sum"] += context_recall
metrics_data["context_recall"]["count"] += 1
context_precision = metrics.get("context_precision", 0) context_precision = metrics.get("context_precision", 0)
if not _is_nan(context_precision): if not _is_nan(context_precision):
metrics_sum["context_precision"] += context_precision metrics_data["context_precision"]["sum"] += context_precision
metrics_data["context_precision"]["count"] += 1
ragas_score = result.get("ragas_score", 0) ragas_score = result.get("ragas_score", 0)
if not _is_nan(ragas_score): if not _is_nan(ragas_score):
metrics_sum["ragas_score"] += ragas_score metrics_data["ragas_score"]["sum"] += ragas_score
metrics_data["ragas_score"]["count"] += 1
# Calculate averages # Calculate averages using actual counts for each metric
n = len(valid_results)
avg_metrics = {} avg_metrics = {}
for k, v in metrics_sum.items(): for metric_name, data in metrics_data.items():
avg_val = v / n if n > 0 else 0 if data["count"] > 0:
# Handle NaN in average avg_val = data["sum"] / data["count"]
avg_metrics[k] = round(avg_val, 4) if not _is_nan(avg_val) else 0.0 avg_metrics[metric_name] = (
round(avg_val, 4) if not _is_nan(avg_val) else 0.0
)
else:
avg_metrics[metric_name] = 0.0
# Find min and max RAGAS scores (filter out NaN) # Find min and max RAGAS scores (filter out NaN)
ragas_scores = [] ragas_scores = []
@ -565,6 +794,20 @@ class RAGEvaluator:
) )
with open(json_path, "w") as f: with open(json_path, "w") as f:
json.dump(summary, f, indent=2) json.dump(summary, f, indent=2)
# Add a small delay to ensure all buffered output is completely written
await asyncio.sleep(0.8)
# Flush all output buffers to ensure RAGAS progress bars are fully displayed
sys.stdout.flush()
sys.stderr.flush()
sys.stdout.write("\n")
sys.stderr.write("\n")
sys.stdout.flush()
sys.stderr.flush()
# Display results table
self._display_results_table(results)
logger.info("✅ JSON results saved to: %s", json_path) logger.info("✅ JSON results saved to: %s", json_path)
# Export to CSV # Export to CSV
@ -620,28 +863,61 @@ async def main():
""" """
Main entry point for RAGAS evaluation Main entry point for RAGAS evaluation
Command-line arguments:
--dataset, -d: Path to test dataset JSON file (default: sample_dataset.json)
--ragendpoint, -r: LightRAG API endpoint URL (default: http://localhost:9621 or $LIGHTRAG_API_URL)
Usage: Usage:
python lightrag/evaluation/eval_rag_quality.py python lightrag/evaluation/eval_rag_quality.py
python lightrag/evaluation/eval_rag_quality.py http://localhost:9621 python lightrag/evaluation/eval_rag_quality.py --dataset my_test.json
python lightrag/evaluation/eval_rag_quality.py http://your-server.com:9621 python lightrag/evaluation/eval_rag_quality.py -d my_test.json -r http://localhost:9621
""" """
try: try:
# Get RAG API URL from command line or environment # Parse command-line arguments
rag_api_url = None parser = argparse.ArgumentParser(
if len(sys.argv) > 1: description="RAGAS Evaluation Script for LightRAG System",
rag_api_url = sys.argv[1] formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Use defaults
python lightrag/evaluation/eval_rag_quality.py
# Specify custom dataset
python lightrag/evaluation/eval_rag_quality.py --dataset my_test.json
# Specify custom RAG endpoint
python lightrag/evaluation/eval_rag_quality.py --ragendpoint http://my-server.com:9621
# Specify both
python lightrag/evaluation/eval_rag_quality.py -d my_test.json -r http://localhost:9621
""",
)
parser.add_argument(
"--dataset",
"-d",
type=str,
default=None,
help="Path to test dataset JSON file (default: sample_dataset.json in evaluation directory)",
)
parser.add_argument(
"--ragendpoint",
"-r",
type=str,
default=None,
help="LightRAG API endpoint URL (default: http://localhost:9621 or $LIGHTRAG_API_URL environment variable)",
)
args = parser.parse_args()
logger.info("")
logger.info("%s", "=" * 70) logger.info("%s", "=" * 70)
logger.info("🔍 RAGAS Evaluation - Using Real LightRAG API") logger.info("🔍 RAGAS Evaluation - Using Real LightRAG API")
logger.info("%s", "=" * 70) logger.info("%s", "=" * 70)
if rag_api_url:
logger.info("📡 RAG API URL: %s", rag_api_url)
else:
logger.info("📡 RAG API URL: http://localhost:9621 (default)")
logger.info("%s", "=" * 70)
evaluator = RAGEvaluator(rag_api_url=rag_api_url) evaluator = RAGEvaluator(
test_dataset_path=args.dataset, rag_api_url=args.ragendpoint
)
await evaluator.run() await evaluator.run()
except Exception as e: except Exception as e:
logger.exception("❌ Error: %s", e) logger.exception("❌ Error: %s", e)