This commit is contained in:
Raphaël MANSUY 2025-12-04 19:19:22 +08:00
parent 5350756a1f
commit 4b18e6b3e4
5 changed files with 306 additions and 786 deletions

3
.gitignore vendored
View file

@ -50,6 +50,9 @@ output/
rag_storage/
data/
# Evaluation results
lightrag/evaluation/results/
# Miscellaneous
.DS_Store
TODO.md

View file

@ -1,6 +1,6 @@
# 📊 LightRAG Evaluation Framework
# 📊 Portfolio RAG Evaluation Framework
RAGAS-based offline evaluation of your LightRAG system.
RAGAS-based offline evaluation of your LightRAG portfolio system.
## What is RAGAS?
@ -25,23 +25,14 @@ Instead of requiring human-annotated ground truth, RAGAS uses state-of-the-art e
```
lightrag/evaluation/
├── eval_rag_quality.py # Main evaluation script
├── sample_dataset.json # 3 test questions about LightRAG
├── sample_documents/ # Matching markdown files for testing
│ ├── 01_lightrag_overview.md
│ ├── 02_rag_architecture.md
│ ├── 03_lightrag_improvements.md
│ ├── 04_supported_databases.md
│ ├── 05_evaluation_and_deployment.md
│ └── README.md
├── test_dataset.json # Test cases with ground truth
├── __init__.py # Package init
├── results/ # Output directory
│ ├── results_YYYYMMDD_HHMMSS.json # Raw metrics in JSON
│ └── results_YYYYMMDD_HHMMSS.csv # Metrics in CSV format
│ ├── results_YYYYMMDD_HHMMSS.json # Raw metrics
│ └── report_YYYYMMDD_HHMMSS.html # Beautiful HTML report
└── README.md # This file
```
**Quick Test:** Index files from `sample_documents/` into LightRAG, then run the evaluator to reproduce results (~89-100% RAGAS score per question).
---
## 🚀 Quick Start
@ -77,111 +68,78 @@ Results are saved automatically in `lightrag/evaluation/results/`:
```
results/
├── results_20241023_143022.json ← Raw metrics in JSON format
└── results_20241023_143022.csv ← Metrics in CSV format (for spreadsheets)
├── results_20241023_143022.json ← Raw metrics (for analysis)
└── report_20241023_143022.html ← Beautiful HTML report 🌟
```
**Results include:**
**Open the HTML report in your browser to see:**
- ✅ Overall RAGAS score
- 📊 Per-metric averages (Faithfulness, Answer Relevance, Context Recall, Context Precision)
- 📊 Per-metric averages
- 📋 Individual test case results
- 📈 Performance breakdown by question
---
## ⚙️ Configuration
### Environment Variables
The evaluation framework supports customization through environment variables:
| Variable | Default | Description |
|----------|---------|-------------|
| `EVAL_LLM_MODEL` | `gpt-4o-mini` | LLM model used for RAGAS evaluation |
| `EVAL_EMBEDDING_MODEL` | `text-embedding-3-small` | Embedding model for evaluation |
| `EVAL_LLM_BINDING_API_KEY` | (falls back to `OPENAI_API_KEY`) | API key for evaluation models |
| `EVAL_LLM_BINDING_HOST` | (optional) | Custom endpoint URL for OpenAI-compatible services |
| `EVAL_MAX_CONCURRENT` | `1` | Number of concurrent test case evaluations (1=serial) |
| `EVAL_QUERY_TOP_K` | `10` | Number of documents to retrieve per query |
| `EVAL_LLM_MAX_RETRIES` | `5` | Maximum LLM request retries |
| `EVAL_LLM_TIMEOUT` | `120` | LLM request timeout in seconds |
### Usage Examples
**Default Configuration (OpenAI):**
```bash
export OPENAI_API_KEY=sk-xxx
python lightrag/evaluation/eval_rag_quality.py
```
**Custom Model:**
```bash
export OPENAI_API_KEY=sk-xxx
export EVAL_LLM_MODEL=gpt-4.1
export EVAL_EMBEDDING_MODEL=text-embedding-3-large
python lightrag/evaluation/eval_rag_quality.py
```
**OpenAI-Compatible Endpoint:**
```bash
export EVAL_LLM_BINDING_API_KEY=your-custom-key
export EVAL_LLM_BINDING_HOST=https://api.openai.com/v1
export EVAL_LLM_MODEL=qwen-plus
python lightrag/evaluation/eval_rag_quality.py
```
### Concurrency Control & Rate Limiting
The evaluation framework includes built-in concurrency control to prevent API rate limiting issues:
**Why Concurrency Control Matters:**
- RAGAS internally makes many concurrent LLM calls for each test case
- Context Precision metric calls LLM once per retrieved document
- Without control, this can easily exceed API rate limits
**Default Configuration (Conservative):**
```bash
EVAL_MAX_CONCURRENT=1 # Serial evaluation (one test at a time)
EVAL_QUERY_TOP_K=10 # OP_K query parameter of LightRAG
EVAL_LLM_MAX_RETRIES=5 # Retry failed requests 5 times
EVAL_LLM_TIMEOUT=180 # 2-minute timeout per request
```
**If You Have Higher API Quotas:**
```bash
EVAL_MAX_CONCURRENT=2 # Evaluate 2 tests in parallel
EVAL_QUERY_TOP_K=20 # OP_K query parameter of LightRAG
```
**Common Issues and Solutions:**
| Issue | Solution |
|-------|----------|
| **Warning: "LM returned 1 generations instead of 3"** | Reduce `EVAL_MAX_CONCURRENT` to 1 or decrease `EVAL_QUERY_TOP_K` |
| **Context Precision returns NaN** | Lower `EVAL_QUERY_TOP_K` to reduce LLM calls per test case |
| **Rate limit errors (429)** | Increase `EVAL_LLM_MAX_RETRIES` and decrease `EVAL_MAX_CONCURRENT` |
| **Request timeouts** | Increase `EVAL_LLM_TIMEOUT` to 180 or higher |
- 📈 Performance breakdown
---
## 📝 Test Dataset
`sample_dataset.json` contains 3 generic questions about LightRAG. Replace with questions matching YOUR indexed documents.
**Custom Test Cases:**
Edit `test_dataset.json` to add your own test cases:
```json
{
"test_cases": [
{
"question": "Your question here",
"ground_truth": "Expected answer from your data",
"context": "topic"
"question": "Your test question here",
"ground_truth": "Expected answer with key information",
"project_context": "project_name"
}
]
}
```
**Example:**
```json
{
"question": "Which projects use PyTorch?",
"ground_truth": "The Neural ODE Project uses PyTorch with TorchODE library for continuous-time neural networks.",
"project_context": "neural_ode_project"
}
```
---
## 🔧 Integration with Your RAG System
Currently, the evaluation script uses **ground truth as mock responses**. To evaluate your actual LightRAG:
### Step 1: Update `generate_rag_response()`
In `eval_rag_quality.py`, replace the mock implementation:
```python
async def generate_rag_response(self, question: str, context: str = None) -> Dict[str, str]:
"""Generate RAG response using your LightRAG system"""
from lightrag import LightRAG
rag = LightRAG(
working_dir="./rag_storage",
llm_model_func=your_llm_function
)
response = await rag.aquery(question)
return {
"answer": response,
"context": "context_from_kg" # If available
}
```
### Step 2: Run Evaluation
```bash
python lightrag/evaluation/eval_rag_quality.py
```
---
## 📊 Interpreting Results
@ -226,10 +184,79 @@ EVAL_QUERY_TOP_K=20 # OP_K query parameter of LightRAG
---
## 📈 Usage Examples
### Python API
```python
import asyncio
from lightrag.evaluation import RAGEvaluator
async def main():
evaluator = RAGEvaluator()
results = await evaluator.run()
# Access results
for result in results:
print(f"Question: {result['question']}")
print(f"RAGAS Score: {result['ragas_score']:.2%}")
print(f"Metrics: {result['metrics']}")
asyncio.run(main())
```
### Custom Dataset
```python
evaluator = RAGEvaluator(test_dataset_path="custom_tests.json")
results = await evaluator.run()
```
### Batch Evaluation
```python
from pathlib import Path
import json
results_dir = Path("lightrag/evaluation/results")
results_dir.mkdir(exist_ok=True)
# Run multiple evaluations
for i in range(3):
evaluator = RAGEvaluator()
results = await evaluator.run()
```
---
## 🎯 For Portfolio/Interview
**What to Highlight:**
1. ✅ **Quality Metrics**: "RAG system achieves 85% RAGAS score"
2. ✅ **Evaluation Framework**: "Automated quality assessment with RAGAS"
3. ✅ **Best Practices**: "Offline evaluation pipeline for continuous improvement"
4. ✅ **Production-Ready**: "Metrics-driven system optimization"
**Example Statement:**
> "I built an evaluation framework using RAGAS that measures RAG quality across faithfulness, relevance, and context coverage. The system achieves 85% average RAGAS score, with automated HTML reports for quality tracking."
---
## 🔗 Related Features
- **LangFuse Integration**: Real-time observability of production RAG calls
- **LightRAG**: Core RAG system with entity extraction and knowledge graphs
- **Metrics**: See `results/` for detailed evaluation metrics
---
## 📚 Resources
- [RAGAS Documentation](https://docs.ragas.io/)
- [RAGAS GitHub](https://github.com/explodinggradients/ragas)
- [LangFuse + RAGAS Guide](https://langfuse.com/guides/cookbook/evaluation_of_rag_with_ragas)
---
@ -241,51 +268,7 @@ EVAL_QUERY_TOP_K=20 # OP_K query parameter of LightRAG
pip install ragas datasets
```
### "Warning: LM returned 1 generations instead of requested 3" or Context Precision NaN
**Cause**: This warning indicates API rate limiting or concurrent request overload:
- RAGAS makes multiple LLM calls per test case (faithfulness, relevancy, recall, precision)
- Context Precision calls LLM once per retrieved document (with `EVAL_QUERY_TOP_K=10`, that's 10 calls)
- Concurrent evaluation multiplies these calls: `EVAL_MAX_CONCURRENT × LLM calls per test`
**Solutions** (in order of effectiveness):
1. **Serial Evaluation** (Default):
```bash
export EVAL_MAX_CONCURRENT=1
python lightrag/evaluation/eval_rag_quality.py
```
2. **Reduce Retrieved Documents**:
```bash
export EVAL_QUERY_TOP_K=5 # Halves Context Precision LLM calls
python lightrag/evaluation/eval_rag_quality.py
```
3. **Increase Retry & Timeout**:
```bash
export EVAL_LLM_MAX_RETRIES=10
export EVAL_LLM_TIMEOUT=180
python lightrag/evaluation/eval_rag_quality.py
```
4. **Use Higher Quota API** (if available):
- Upgrade to OpenAI Tier 2+ for higher RPM limits
- Use self-hosted OpenAI-compatible service with no rate limits
### "AttributeError: 'InstructorLLM' object has no attribute 'agenerate_prompt'" or NaN results
This error occurs with RAGAS 0.3.x when LLM and Embeddings are not explicitly configured. The evaluation framework now handles this automatically by:
- Using environment variables to configure evaluation models
- Creating proper LLM and Embeddings instances for RAGAS
**Solution**: Ensure you have set one of the following:
- `OPENAI_API_KEY` environment variable (default)
- `EVAL_LLM_BINDING_API_KEY` for custom API key
The framework will automatically configure the evaluation models.
### "No sample_dataset.json found"
### "No test_dataset.json found"
Make sure you're running from the project root:
@ -301,22 +284,25 @@ The evaluation uses your configured LLM (OpenAI by default). Ensure:
- Have sufficient API quota
- Network connection is stable
### Evaluation requires running LightRAG API
### Results showing 0 scores
The evaluator queries a running LightRAG API server at `http://localhost:9621`. Make sure:
1. LightRAG API server is running (`python lightrag/api/lightrag_server.py`)
2. Documents are indexed in your LightRAG instance
3. API is accessible at the configured URL
Current implementation uses ground truth as mock responses. Results will show perfect scores because the "generated answer" equals the ground truth.
**To use actual RAG results:**
1. Implement the `generate_rag_response()` method
2. Connect to your LightRAG instance
3. Run evaluation again
---
## 📝 Next Steps
1. Index documents into LightRAG (WebUI or API)
2. Start LightRAG API server
3. Run `python lightrag/evaluation/eval_rag_quality.py`
4. Review results (JSON/CSV) in `results/` folder
5. Adjust entity extraction prompts or retrieval settings based on scores
1. ✅ Review test dataset in `test_dataset.json`
2. ✅ Run `python lightrag/evaluation/eval_rag_quality.py`
3. ✅ Open the HTML report in browser
4. 🔄 Integrate with actual LightRAG system
5. 📊 Monitor metrics over time
6. 🎯 Use insights for optimization
---

View file

@ -0,0 +1,16 @@
"""
LightRAG Evaluation Module
RAGAS-based evaluation framework for assessing RAG system quality.
Usage:
from lightrag.evaluation.eval_rag_quality import RAGEvaluator
evaluator = RAGEvaluator()
results = await evaluator.run()
Note: RAGEvaluator is imported dynamically to avoid import errors
when ragas/datasets are not installed.
"""
__all__ = ["RAGEvaluator"]

View file

@ -10,73 +10,54 @@ Evaluates RAG response quality using RAGAS metrics:
Usage:
python lightrag/evaluation/eval_rag_quality.py
python lightrag/evaluation/eval_rag_quality.py http://localhost:9621
python lightrag/evaluation/eval_rag_quality.py http://your-rag-server.com:9621
python lightrag/evaluation/eval_rag_quality.py http://localhost:8000
python lightrag/evaluation/eval_rag_quality.py http://your-rag-server.com:8000
Results are saved to: lightrag/evaluation/results/
- results_YYYYMMDD_HHMMSS.csv (CSV export for analysis)
- results_YYYYMMDD_HHMMSS.json (Full results with details)
Note on Custom OpenAI-Compatible Endpoints:
This script uses bypass_n=True mode for answer_relevancy metric to ensure
compatibility with custom endpoints that may not support OpenAI's 'n' parameter
for multiple completions. This generates multiple outputs through repeated prompts
instead, maintaining evaluation quality while supporting broader endpoint compatibility.
"""
import asyncio
import csv
import json
import math
import os
import sys
import asyncio
import time
from datetime import datetime
import csv
from pathlib import Path
from datetime import datetime
from typing import Any, Dict, List
import sys
import httpx
import os
from dotenv import load_dotenv
from lightrag.utils import logger
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
# use the .env that is inside the current folder
# allows to use different .env file for each lightrag instance
# the OS environment variables take precedence over the .env file
load_dotenv(dotenv_path=".env", override=False)
# Load .env from project root
project_root = Path(__file__).parent.parent.parent
load_dotenv(project_root / ".env")
# Setup OpenAI API key (required for RAGAS evaluation)
# Use LLM_BINDING_API_KEY if OPENAI_API_KEY is not set
if "OPENAI_API_KEY" not in os.environ:
if "LLM_BINDING_API_KEY" in os.environ:
os.environ["OPENAI_API_KEY"] = os.environ["LLM_BINDING_API_KEY"]
else:
os.environ["OPENAI_API_KEY"] = input("Enter your OpenAI API key: ")
# Conditional imports - will raise ImportError if dependencies not installed
try:
from datasets import Dataset
from ragas import evaluate
from ragas.metrics import (
answer_relevancy,
context_precision,
context_recall,
faithfulness,
answer_relevancy,
context_recall,
context_precision,
)
from ragas.llms import LangchainLLMWrapper
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
RAGAS_AVAILABLE = True
except ImportError:
RAGAS_AVAILABLE = False
Dataset = None
evaluate = None
LangchainLLMWrapper = None
CONNECT_TIMEOUT_SECONDS = 180.0
READ_TIMEOUT_SECONDS = 300.0
TOTAL_TIMEOUT_SECONDS = 180.0
def _is_nan(value: Any) -> bool:
"""Return True when value is a float NaN."""
return isinstance(value, float) and math.isnan(value)
from datasets import Dataset
except ImportError as e:
print(f"❌ RAGAS import error: {e}")
print(" Install with: pip install ragas datasets")
sys.exit(1)
class RAGEvaluator:
@ -88,141 +69,23 @@ class RAGEvaluator:
Args:
test_dataset_path: Path to test dataset JSON file
rag_api_url: Base URL of LightRAG API (e.g., http://localhost:9621)
rag_api_url: Base URL of LightRAG API (e.g., http://localhost:8000)
If None, will try to read from environment or use default
Environment Variables:
EVAL_LLM_MODEL: LLM model for evaluation (default: gpt-4o-mini)
EVAL_EMBEDDING_MODEL: Embedding model for evaluation (default: text-embedding-3-small)
EVAL_LLM_BINDING_API_KEY: API key for evaluation models (fallback to OPENAI_API_KEY)
EVAL_LLM_BINDING_HOST: Custom endpoint URL for evaluation models (optional)
Raises:
ImportError: If ragas or datasets packages are not installed
EnvironmentError: If EVAL_LLM_BINDING_API_KEY and OPENAI_API_KEY are both not set
"""
# Validate RAGAS dependencies are installed
if not RAGAS_AVAILABLE:
raise ImportError(
"RAGAS dependencies not installed. "
"Install with: pip install ragas datasets"
)
# Configure evaluation models (for RAGAS scoring)
eval_api_key = os.getenv("EVAL_LLM_BINDING_API_KEY") or os.getenv(
"OPENAI_API_KEY"
)
if not eval_api_key:
raise EnvironmentError(
"EVAL_LLM_BINDING_API_KEY or OPENAI_API_KEY is required for evaluation. "
"Set EVAL_LLM_BINDING_API_KEY to use a custom API key, "
"or ensure OPENAI_API_KEY is set."
)
eval_model = os.getenv("EVAL_LLM_MODEL", "gpt-4.1")
eval_embedding_model = os.getenv(
"EVAL_EMBEDDING_MODEL", "text-embedding-3-large"
)
eval_base_url = os.getenv("EVAL_LLM_BINDING_HOST")
# Create LLM and Embeddings instances for RAGAS
llm_kwargs = {
"model": eval_model,
"api_key": eval_api_key,
"max_retries": int(os.getenv("EVAL_LLM_MAX_RETRIES", "5")),
"request_timeout": int(os.getenv("EVAL_LLM_TIMEOUT", "180")),
}
embedding_kwargs = {"model": eval_embedding_model, "api_key": eval_api_key}
if eval_base_url:
llm_kwargs["base_url"] = eval_base_url
embedding_kwargs["base_url"] = eval_base_url
# Create base LangChain LLM
base_llm = ChatOpenAI(**llm_kwargs)
self.eval_embeddings = OpenAIEmbeddings(**embedding_kwargs)
# Wrap LLM with LangchainLLMWrapper and enable bypass_n mode for custom endpoints
# This ensures compatibility with endpoints that don't support the 'n' parameter
# by generating multiple outputs through repeated prompts instead of using 'n' parameter
try:
self.eval_llm = LangchainLLMWrapper(
langchain_llm=base_llm,
bypass_n=True, # Enable bypass_n to avoid passing 'n' to OpenAI API
)
logger.debug("Successfully configured bypass_n mode for LLM wrapper")
except Exception as e:
logger.warning(
"Could not configure LangchainLLMWrapper with bypass_n: %s. "
"Using base LLM directly, which may cause warnings with custom endpoints.",
e,
)
self.eval_llm = base_llm
if test_dataset_path is None:
test_dataset_path = Path(__file__).parent / "sample_dataset.json"
test_dataset_path = Path(__file__).parent / "test_dataset.json"
if rag_api_url is None:
rag_api_url = os.getenv("LIGHTRAG_API_URL", "http://localhost:9621")
rag_api_url = os.getenv("LIGHTRAG_API_URL", "http://localhost:8000")
self.test_dataset_path = Path(test_dataset_path)
self.rag_api_url = rag_api_url.rstrip("/")
self.rag_api_url = rag_api_url.rstrip("/") # Remove trailing slash
self.results_dir = Path(__file__).parent / "results"
self.results_dir.mkdir(exist_ok=True)
# Load test dataset
self.test_cases = self._load_test_dataset()
# Store configuration values for display
self.eval_model = eval_model
self.eval_embedding_model = eval_embedding_model
self.eval_base_url = eval_base_url
self.eval_max_retries = llm_kwargs["max_retries"]
self.eval_timeout = llm_kwargs["request_timeout"]
# Display configuration
self._display_configuration()
def _display_configuration(self):
"""Display all evaluation configuration settings"""
logger.info("")
logger.info("%s", "=" * 70)
logger.info("🔧 EVALUATION CONFIGURATION")
logger.info("%s", "=" * 70)
logger.info("")
logger.info("Evaluation Models:")
logger.info(" • LLM Model: %s", self.eval_model)
logger.info(" • Embedding Model: %s", self.eval_embedding_model)
if self.eval_base_url:
logger.info(" • Custom Endpoint: %s", self.eval_base_url)
logger.info(" • Bypass N-Parameter: Enabled (for compatibility)")
else:
logger.info(" • Endpoint: OpenAI Official API")
logger.info("")
logger.info("Concurrency & Rate Limiting:")
max_concurrent = int(os.getenv("EVAL_MAX_CONCURRENT", "1"))
query_top_k = int(os.getenv("EVAL_QUERY_TOP_K", "10"))
logger.info(
" • Max Concurrent: %s %s",
max_concurrent,
"(serial evaluation)" if max_concurrent == 1 else "parallel evaluations",
)
logger.info(" • Query Top-K: %s Entities/Relations", query_top_k)
logger.info(" • LLM Max Retries: %s", self.eval_max_retries)
logger.info(" • LLM Timeout: %s seconds", self.eval_timeout)
logger.info("")
logger.info("Test Configuration:")
logger.info(" • Total Test Cases: %s", len(self.test_cases))
logger.info(" • Test Dataset: %s", self.test_dataset_path.name)
logger.info(" • LightRAG API: %s", self.rag_api_url)
logger.info(" • Results Directory: %s", self.results_dir.name)
logger.info("%s", "=" * 70)
logger.info("")
def _load_test_dataset(self) -> List[Dict[str, str]]:
"""Load test cases from JSON file"""
if not self.test_dataset_path.exists():
@ -236,160 +99,93 @@ class RAGEvaluator:
async def generate_rag_response(
self,
question: str,
client: httpx.AsyncClient,
) -> Dict[str, Any]:
context: str = None, # Not used - actual context comes from LightRAG
) -> Dict[str, str]:
"""
Generate RAG response by calling LightRAG API.
Generate RAG response by calling LightRAG API
Calls the actual LightRAG /query endpoint instead of using mock data.
Args:
question: The user query.
client: Shared httpx AsyncClient for connection pooling.
question: The user query
context: Ignored (for compatibility), actual context from LightRAG
Returns:
Dictionary with 'answer' and 'contexts' keys.
'contexts' is a list of strings (one per retrieved document).
Dict with 'answer' and 'context' keys
Raises:
Exception: If LightRAG API is unavailable.
Exception: If LightRAG API is unavailable
"""
try:
payload = {
"query": question,
"mode": "mix",
"include_references": True,
"include_chunk_content": True, # NEW: Request chunk content in references
"response_type": "Multiple Paragraphs",
"top_k": int(os.getenv("EVAL_QUERY_TOP_K", "10")),
}
async with httpx.AsyncClient(timeout=60.0) as client:
# Prepare request to LightRAG API
payload = {
"query": question,
"mode": "mix", # Recommended: combines local & global
"include_references": True,
"response_type": "Multiple Paragraphs",
"top_k": 10,
}
# Get API key from environment for authentication
api_key = os.getenv("LIGHTRAG_API_KEY")
# Call LightRAG /query endpoint
response = await client.post(
f"{self.rag_api_url}/query",
json=payload,
)
# Prepare headers with optional authentication
headers = {}
if api_key:
headers["X-API-Key"] = api_key
if response.status_code != 200:
raise Exception(
f"LightRAG API error {response.status_code}: {response.text}"
)
# Single optimized API call - gets both answer AND chunk content
response = await client.post(
f"{self.rag_api_url}/query",
json=payload,
headers=headers if headers else None,
)
response.raise_for_status()
result = response.json()
result = response.json()
answer = result.get("response", "No response generated")
references = result.get("references", [])
return {
"answer": result.get("response", "No response generated"),
"context": json.dumps(result.get("references", []))
if result.get("references")
else "",
}
# DEBUG: Inspect the API response
logger.debug("🔍 References Count: %s", len(references))
if references:
first_ref = references[0]
logger.debug("🔍 First Reference Keys: %s", list(first_ref.keys()))
if "content" in first_ref:
content_preview = first_ref["content"]
if isinstance(content_preview, list) and content_preview:
logger.debug(
"🔍 Content Preview (first chunk): %s...",
content_preview[0][:100],
)
elif isinstance(content_preview, str):
logger.debug("🔍 Content Preview: %s...", content_preview[:100])
# Extract chunk content from enriched references
# Note: content is now a list of chunks per reference (one file may have multiple chunks)
contexts = []
for ref in references:
content = ref.get("content", [])
if isinstance(content, list):
# Flatten the list: each chunk becomes a separate context
contexts.extend(content)
elif isinstance(content, str):
# Backward compatibility: if content is still a string (shouldn't happen)
contexts.append(content)
return {
"answer": answer,
"contexts": contexts, # List of strings from actual retrieved chunks
}
except httpx.ConnectError as e:
except httpx.ConnectError:
raise Exception(
f"❌ Cannot connect to LightRAG API at {self.rag_api_url}\n"
f" Make sure LightRAG server is running:\n"
f" python -m lightrag.api.lightrag_server\n"
f" Error: {str(e)}"
)
except httpx.HTTPStatusError as e:
raise Exception(
f"LightRAG API error {e.response.status_code}: {e.response.text}"
)
except httpx.ReadTimeout as e:
raise Exception(
f"Request timeout after waiting for response\n"
f" Question: {question[:100]}...\n"
f" Error: {str(e)}"
f" python -m lightrag.api.lightrag_server"
)
except Exception as e:
raise Exception(f"Error calling LightRAG API: {type(e).__name__}: {str(e)}")
raise Exception(f"Error calling LightRAG API: {str(e)}")
async def evaluate_single_case(
self,
idx: int,
test_case: Dict[str, str],
semaphore: asyncio.Semaphore,
client: httpx.AsyncClient,
progress_counter: Dict[str, int],
) -> Dict[str, Any]:
async def evaluate_responses(self) -> List[Dict[str, Any]]:
"""
Evaluate a single test case with concurrency control
Args:
idx: Test case index (1-based)
test_case: Test case dictionary with question and ground_truth
semaphore: Semaphore to control concurrency
client: Shared httpx AsyncClient for connection pooling
progress_counter: Shared dictionary for progress tracking
Evaluate all test cases and return metrics
Returns:
Evaluation result dictionary
List of evaluation results with metrics
"""
async with semaphore:
print("\n" + "=" * 70)
print("🚀 Starting RAGAS Evaluation of Portfolio RAG System")
print("=" * 70 + "\n")
results = []
for idx, test_case in enumerate(self.test_cases, 1):
question = test_case["question"]
ground_truth = test_case["ground_truth"]
print(f"[{idx}/{len(self.test_cases)}] Evaluating: {question[:60]}...")
# Generate RAG response by calling actual LightRAG API
try:
rag_response = await self.generate_rag_response(
question=question, client=client
)
except Exception as e:
logger.error("Error generating response for test %s: %s", idx, str(e))
progress_counter["completed"] += 1
return {
"test_number": idx,
"question": question,
"error": str(e),
"metrics": {},
"ragas_score": 0,
"timestamp": datetime.now().isoformat(),
}
rag_response = await self.generate_rag_response(question=question)
# *** CRITICAL FIX: Use actual retrieved contexts, NOT ground_truth ***
retrieved_contexts = rag_response["contexts"]
# DEBUG: Print what was actually retrieved (only in debug mode)
logger.debug(
"📝 Test %s: Retrieved %s contexts", idx, len(retrieved_contexts)
)
# Prepare dataset for RAGAS evaluation with CORRECT contexts
# Prepare dataset for RAGAS evaluation
eval_dataset = Dataset.from_dict(
{
"question": [question],
"answer": [rag_response["answer"]],
"contexts": [retrieved_contexts],
"contexts": [
[ground_truth]
], # RAGAS expects list of context strings
"ground_truth": [ground_truth],
}
)
@ -404,8 +200,6 @@ class RAGEvaluator:
context_recall,
context_precision,
],
llm=self.eval_llm,
embeddings=self.eval_embeddings,
)
# Convert to DataFrame (RAGAS v0.3+ API)
@ -416,7 +210,6 @@ class RAGEvaluator:
# Extract scores (RAGAS v0.3+ uses .to_pandas())
result = {
"test_number": idx,
"question": question,
"answer": rag_response["answer"][:200] + "..."
if len(rag_response["answer"]) > 200
@ -424,7 +217,7 @@ class RAGEvaluator:
"ground_truth": ground_truth[:200] + "..."
if len(ground_truth) > 200
else ground_truth,
"project": test_case.get("project", "unknown"),
"project": test_case.get("project_context", "unknown"),
"metrics": {
"faithfulness": float(scores_row.get("faithfulness", 0)),
"answer_relevance": float(
@ -438,79 +231,34 @@ class RAGEvaluator:
"timestamp": datetime.now().isoformat(),
}
# Calculate RAGAS score (average of all metrics, excluding NaN values)
# Calculate RAGAS score (average of all metrics)
metrics = result["metrics"]
valid_metrics = [v for v in metrics.values() if not _is_nan(v)]
ragas_score = (
sum(valid_metrics) / len(valid_metrics) if valid_metrics else 0
)
ragas_score = sum(metrics.values()) / len(metrics) if metrics else 0
result["ragas_score"] = round(ragas_score, 4)
# Update progress counter
progress_counter["completed"] += 1
results.append(result)
return result
# Print metrics
print(f" ✅ Faithfulness: {metrics['faithfulness']:.4f}")
print(f" ✅ Answer Relevance: {metrics['answer_relevance']:.4f}")
print(f" ✅ Context Recall: {metrics['context_recall']:.4f}")
print(f" ✅ Context Precision: {metrics['context_precision']:.4f}")
print(f" 📊 RAGAS Score: {result['ragas_score']:.4f}\n")
except Exception as e:
logger.error("Error evaluating test %s: %s", idx, str(e))
progress_counter["completed"] += 1
return {
"test_number": idx,
import traceback
print(f" ❌ Error evaluating: {str(e)}")
print(f" 🔍 Full traceback:\n{traceback.format_exc()}\n")
result = {
"question": question,
"error": str(e),
"metrics": {},
"ragas_score": 0,
"timestamp": datetime.now().isoformat(),
"timestamp": datetime.now().isoformat()
}
results.append(result)
async def evaluate_responses(self) -> List[Dict[str, Any]]:
"""
Evaluate all test cases in parallel and return metrics
Returns:
List of evaluation results with metrics
"""
# Get evaluation concurrency from environment (default to 1 for serial evaluation)
max_async = int(os.getenv("EVAL_MAX_CONCURRENT", "3"))
logger.info("")
logger.info("%s", "=" * 70)
logger.info("🚀 Starting RAGAS Evaluation of Portfolio RAG System")
logger.info("🔧 Concurrent evaluations: %s", max_async)
logger.info("%s", "=" * 70)
logger.info("")
# Create semaphore to limit concurrent evaluations
semaphore = asyncio.Semaphore(max_async)
# Create progress counter (shared across all tasks)
progress_counter = {"completed": 0}
# Create shared HTTP client with connection pooling and proper timeouts
# Timeout: 3 minutes for connect, 5 minutes for read (LLM can be slow)
timeout = httpx.Timeout(
TOTAL_TIMEOUT_SECONDS,
connect=CONNECT_TIMEOUT_SECONDS,
read=READ_TIMEOUT_SECONDS,
)
limits = httpx.Limits(
max_connections=max_async * 2, # Allow some buffer
max_keepalive_connections=max_async,
)
async with httpx.AsyncClient(timeout=timeout, limits=limits) as client:
# Create tasks for all test cases
tasks = [
self.evaluate_single_case(
idx, test_case, semaphore, client, progress_counter
)
for idx, test_case in enumerate(self.test_cases, 1)
]
# Run all evaluations in parallel (limited by semaphore)
results = await asyncio.gather(*tasks)
return list(results)
return results
def _export_to_csv(self, results: List[Dict[str, Any]]) -> Path:
"""
@ -532,9 +280,7 @@ class RAGEvaluator:
- ragas_score: Overall RAGAS score (0-1)
- timestamp: When evaluation was run
"""
csv_path = (
self.results_dir / f"results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
)
csv_path = self.results_dir / f"results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
with open(csv_path, "w", newline="", encoding="utf-8") as f:
fieldnames = [
@ -555,209 +301,21 @@ class RAGEvaluator:
for idx, result in enumerate(results, 1):
metrics = result.get("metrics", {})
writer.writerow(
{
"test_number": idx,
"question": result.get("question", ""),
"project": result.get("project", "unknown"),
"faithfulness": f"{metrics.get('faithfulness', 0):.4f}",
"answer_relevance": f"{metrics.get('answer_relevance', 0):.4f}",
"context_recall": f"{metrics.get('context_recall', 0):.4f}",
"context_precision": f"{metrics.get('context_precision', 0):.4f}",
"ragas_score": f"{result.get('ragas_score', 0):.4f}",
"status": "success" if metrics else "error",
"timestamp": result.get("timestamp", ""),
}
)
writer.writerow({
"test_number": idx,
"question": result.get("question", ""),
"project": result.get("project", "unknown"),
"faithfulness": f"{metrics.get('faithfulness', 0):.4f}",
"answer_relevance": f"{metrics.get('answer_relevance', 0):.4f}",
"context_recall": f"{metrics.get('context_recall', 0):.4f}",
"context_precision": f"{metrics.get('context_precision', 0):.4f}",
"ragas_score": f"{result.get('ragas_score', 0):.4f}",
"status": "success" if metrics else "error",
"timestamp": result.get("timestamp", ""),
})
return csv_path
def _format_metric(self, value: float, width: int = 6) -> str:
"""
Format a metric value for display, handling NaN gracefully
Args:
value: The metric value to format
width: The width of the formatted string
Returns:
Formatted string (e.g., "0.8523" or " N/A ")
"""
if _is_nan(value):
return "N/A".center(width)
return f"{value:.4f}".rjust(width)
def _display_results_table(self, results: List[Dict[str, Any]]):
"""
Display evaluation results in a formatted table
Args:
results: List of evaluation results
"""
logger.info("")
logger.info("%s", "=" * 115)
logger.info("📊 EVALUATION RESULTS SUMMARY")
logger.info("%s", "=" * 115)
# Table header
logger.info(
"%-4s | %-50s | %6s | %7s | %6s | %7s | %6s | %6s",
"#",
"Question",
"Faith",
"AnswRel",
"CtxRec",
"CtxPrec",
"RAGAS",
"Status",
)
logger.info("%s", "-" * 115)
# Table rows
for result in results:
test_num = result.get("test_number", 0)
question = result.get("question", "")
# Truncate question to 50 chars
question_display = (
(question[:47] + "...") if len(question) > 50 else question
)
metrics = result.get("metrics", {})
if metrics:
# Success case - format each metric, handling NaN values
faith = metrics.get("faithfulness", 0)
ans_rel = metrics.get("answer_relevance", 0)
ctx_rec = metrics.get("context_recall", 0)
ctx_prec = metrics.get("context_precision", 0)
ragas = result.get("ragas_score", 0)
status = ""
logger.info(
"%-4d | %-50s | %s | %s | %s | %s | %s | %6s",
test_num,
question_display,
self._format_metric(faith, 6),
self._format_metric(ans_rel, 7),
self._format_metric(ctx_rec, 6),
self._format_metric(ctx_prec, 7),
self._format_metric(ragas, 6),
status,
)
else:
# Error case
error = result.get("error", "Unknown error")
error_display = (error[:20] + "...") if len(error) > 23 else error
logger.info(
"%-4d | %-50s | %6s | %7s | %6s | %7s | %6s | ✗ %s",
test_num,
question_display,
"N/A",
"N/A",
"N/A",
"N/A",
"N/A",
error_display,
)
logger.info("%s", "=" * 115)
def _calculate_benchmark_stats(
self, results: List[Dict[str, Any]]
) -> Dict[str, Any]:
"""
Calculate benchmark statistics from evaluation results
Args:
results: List of evaluation results
Returns:
Dictionary with benchmark statistics
"""
# Filter out results with errors
valid_results = [r for r in results if r.get("metrics")]
total_tests = len(results)
successful_tests = len(valid_results)
failed_tests = total_tests - successful_tests
if not valid_results:
return {
"total_tests": total_tests,
"successful_tests": 0,
"failed_tests": failed_tests,
"success_rate": 0.0,
}
# Calculate averages for each metric (handling NaN values correctly)
# Track both sum and count for each metric to handle NaN values properly
metrics_data = {
"faithfulness": {"sum": 0.0, "count": 0},
"answer_relevance": {"sum": 0.0, "count": 0},
"context_recall": {"sum": 0.0, "count": 0},
"context_precision": {"sum": 0.0, "count": 0},
"ragas_score": {"sum": 0.0, "count": 0},
}
for result in valid_results:
metrics = result.get("metrics", {})
# For each metric, sum non-NaN values and count them
faithfulness = metrics.get("faithfulness", 0)
if not _is_nan(faithfulness):
metrics_data["faithfulness"]["sum"] += faithfulness
metrics_data["faithfulness"]["count"] += 1
answer_relevance = metrics.get("answer_relevance", 0)
if not _is_nan(answer_relevance):
metrics_data["answer_relevance"]["sum"] += answer_relevance
metrics_data["answer_relevance"]["count"] += 1
context_recall = metrics.get("context_recall", 0)
if not _is_nan(context_recall):
metrics_data["context_recall"]["sum"] += context_recall
metrics_data["context_recall"]["count"] += 1
context_precision = metrics.get("context_precision", 0)
if not _is_nan(context_precision):
metrics_data["context_precision"]["sum"] += context_precision
metrics_data["context_precision"]["count"] += 1
ragas_score = result.get("ragas_score", 0)
if not _is_nan(ragas_score):
metrics_data["ragas_score"]["sum"] += ragas_score
metrics_data["ragas_score"]["count"] += 1
# Calculate averages using actual counts for each metric
avg_metrics = {}
for metric_name, data in metrics_data.items():
if data["count"] > 0:
avg_val = data["sum"] / data["count"]
avg_metrics[metric_name] = (
round(avg_val, 4) if not _is_nan(avg_val) else 0.0
)
else:
avg_metrics[metric_name] = 0.0
# Find min and max RAGAS scores (filter out NaN)
ragas_scores = []
for r in valid_results:
score = r.get("ragas_score", 0)
if _is_nan(score):
continue # Skip NaN values
ragas_scores.append(score)
min_score = min(ragas_scores) if ragas_scores else 0
max_score = max(ragas_scores) if ragas_scores else 0
return {
"total_tests": total_tests,
"successful_tests": successful_tests,
"failed_tests": failed_tests,
"success_rate": round(successful_tests / total_tests * 100, 2),
"average_metrics": avg_metrics,
"min_ragas_score": round(min_score, 4),
"max_ragas_score": round(max_score, 4),
}
async def run(self) -> Dict[str, Any]:
"""Run complete evaluation pipeline"""
@ -768,86 +326,35 @@ class RAGEvaluator:
elapsed_time = time.time() - start_time
# Add a small delay to ensure all buffered output is completely written
await asyncio.sleep(0.2)
# Flush all output buffers to ensure RAGAS progress bars are fully displayed
# before showing our results table
sys.stdout.flush()
sys.stderr.flush()
# Make sure the progress bar line ends before logging summary output
sys.stderr.write("\n")
sys.stderr.flush()
# Display results table
self._display_results_table(results)
# Calculate benchmark statistics
benchmark_stats = self._calculate_benchmark_stats(results)
# Save results
summary = {
"timestamp": datetime.now().isoformat(),
"total_tests": len(results),
"elapsed_time_seconds": round(elapsed_time, 2),
"benchmark_stats": benchmark_stats,
"results": results,
"results": results
}
# Save JSON results
json_path = (
self.results_dir
/ f"results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
)
json_path = self.results_dir / f"results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(json_path, "w") as f:
json.dump(summary, f, indent=2)
logger.info("✅ JSON results saved to: %s", json_path)
print(f"✅ JSON results saved to: {json_path}")
# Export to CSV
csv_path = self._export_to_csv(results)
logger.info("✅ CSV results saved to: %s", csv_path)
print(f"✅ CSV results saved to: {csv_path}")
# Print summary
logger.info("")
logger.info("%s", "=" * 70)
logger.info("📊 EVALUATION COMPLETE")
logger.info("%s", "=" * 70)
logger.info("Total Tests: %s", len(results))
logger.info("Successful: %s", benchmark_stats["successful_tests"])
logger.info("Failed: %s", benchmark_stats["failed_tests"])
logger.info("Success Rate: %.2f%%", benchmark_stats["success_rate"])
logger.info("Elapsed Time: %.2f seconds", elapsed_time)
logger.info("Avg Time/Test: %.2f seconds", elapsed_time / len(results))
# Print benchmark metrics
logger.info("")
logger.info("%s", "=" * 70)
logger.info("📈 BENCHMARK RESULTS (Average)")
logger.info("%s", "=" * 70)
avg = benchmark_stats["average_metrics"]
logger.info("Average Faithfulness: %.4f", avg["faithfulness"])
logger.info("Average Answer Relevance: %.4f", avg["answer_relevance"])
logger.info("Average Context Recall: %.4f", avg["context_recall"])
logger.info("Average Context Precision: %.4f", avg["context_precision"])
logger.info("Average RAGAS Score: %.4f", avg["ragas_score"])
logger.info("")
logger.info(
"Min RAGAS Score: %.4f",
benchmark_stats["min_ragas_score"],
)
logger.info(
"Max RAGAS Score: %.4f",
benchmark_stats["max_ragas_score"],
)
logger.info("")
logger.info("%s", "=" * 70)
logger.info("📁 GENERATED FILES")
logger.info("%s", "=" * 70)
logger.info("Results Dir: %s", self.results_dir.absolute())
logger.info(" • CSV: %s", csv_path.name)
logger.info(" • JSON: %s", json_path.name)
logger.info("%s", "=" * 70)
print("\n" + "="*70)
print("📊 EVALUATION COMPLETE")
print("="*70)
print(f"Total Tests: {len(results)}")
print(f"Elapsed Time: {elapsed_time:.2f} seconds")
print(f"Results Dir: {self.results_dir.absolute()}")
print("\n📁 Generated Files:")
print(f" • CSV: {csv_path.name}")
print(f" • JSON: {json_path.name}")
print("="*70 + "\n")
return summary
@ -858,8 +365,8 @@ async def main():
Usage:
python lightrag/evaluation/eval_rag_quality.py
python lightrag/evaluation/eval_rag_quality.py http://localhost:9621
python lightrag/evaluation/eval_rag_quality.py http://your-server.com:9621
python lightrag/evaluation/eval_rag_quality.py http://localhost:8000
python lightrag/evaluation/eval_rag_quality.py http://your-server.com:8000
"""
try:
# Get RAG API URL from command line or environment
@ -867,20 +374,19 @@ async def main():
if len(sys.argv) > 1:
rag_api_url = sys.argv[1]
logger.info("")
logger.info("%s", "=" * 70)
logger.info("🔍 RAGAS Evaluation - Using Real LightRAG API")
logger.info("%s", "=" * 70)
print("\n" + "="*70)
print("🔍 RAGAS Evaluation - Using Real LightRAG API")
print("="*70)
if rag_api_url:
logger.info("📡 RAG API URL: %s", rag_api_url)
print(f"📡 RAG API URL: {rag_api_url}")
else:
logger.info("📡 RAG API URL: http://localhost:9621 (default)")
logger.info("%s", "=" * 70)
print(f"📡 RAG API URL: http://localhost:8000 (default)")
print("="*70 + "\n")
evaluator = RAGEvaluator(rag_api_url=rag_api_url)
await evaluator.run()
except Exception as e:
logger.exception("❌ Error: %s", e)
print(f"\n❌ Error: {str(e)}\n")
sys.exit(1)

View file

@ -150,6 +150,15 @@ observability = [
>>>>>>> 69a0b74c (refactor: move document deps to api group, remove dynamic imports)
]
evaluation = [
# RAG evaluation dependencies (RAGAS framework)
"ragas>=0.3.7",
"datasets>=4.3.0",
"httpx>=0.28.1",
"pytest>=8.4.2",
"pytest-asyncio>=1.2.0",
]
[project.scripts]
lightrag-server = "lightrag.api.lightrag_server:main"
lightrag-gunicorn = "lightrag.api.run_with_gunicorn:main"