Merge branch 'new/ragas-evaluation'

This commit is contained in:
yangdx 2025-11-04 19:35:15 +08:00
commit a618f837a6
15 changed files with 1595 additions and 3 deletions

3
.gitignore vendored
View file

@ -50,6 +50,9 @@ output/
rag_storage/
data/
# Evaluation results
lightrag/evaluation/results/
# Miscellaneous
.DS_Store
TODO.md

View file

@ -50,6 +50,8 @@ OLLAMA_EMULATING_MODEL_TAG=latest
# JWT_ALGORITHM=HS256
### API-Key to access LightRAG Server API
### Use this key in HTTP requests with the 'X-API-Key' header
### Example: curl -H "X-API-Key: your-secure-api-key-here" http://localhost:9621/query
# LIGHTRAG_API_KEY=your-secure-api-key-here
# WHITELIST_PATHS=/health,/api/*
@ -392,3 +394,26 @@ MEMGRAPH_USERNAME=
MEMGRAPH_PASSWORD=
MEMGRAPH_DATABASE=memgraph
# MEMGRAPH_WORKSPACE=forced_workspace_name
############################
### Evaluation Configuration
############################
### RAGAS evaluation models (used for RAG quality assessment)
### Default uses OpenAI models for evaluation
# EVAL_LLM_MODEL=gpt-4o-mini
# EVAL_EMBEDDING_MODEL=text-embedding-3-large
### API key for evaluation (fallback to OPENAI_API_KEY if not set)
# EVAL_LLM_BINDING_API_KEY=your_api_key
### Custom endpoint for evaluation models (optional, for OpenAI-compatible services)
# EVAL_LLM_BINDING_HOST=https://api.openai.com/v1
### Evaluation concurrency and rate limiting
### Number of concurrent test case evaluations (default: 1 for serial evaluation)
### Lower values reduce API rate limit issues but increase evaluation time
# EVAL_MAX_CONCURRENT=3
### TOP_K query parameter of LightRAG (default: 10)
### Number of entities or relations retrieved from KG
# EVAL_QUERY_TOP_K=10
### LLM request retry and timeout settings for evaluation
# EVAL_LLM_MAX_RETRIES=5
# EVAL_LLM_TIMEOUT=120

View file

@ -463,6 +463,59 @@ The `/query` and `/query/stream` API endpoints include an `enable_rerank` parame
RERANK_BY_DEFAULT=False
```
### Include Chunk Content in References
By default, the `/query` and `/query/stream` endpoints return references with only `reference_id` and `file_path`. For evaluation, debugging, or citation purposes, you can request the actual retrieved chunk content to be included in references.
The `include_chunk_content` parameter (default: `false`) controls whether the actual text content of retrieved chunks is included in the response references. This is particularly useful for:
- **RAG Evaluation**: Testing systems like RAGAS that need access to retrieved contexts
- **Debugging**: Verifying what content was actually used to generate the answer
- **Citation Display**: Showing users the exact text passages that support the response
- **Transparency**: Providing full visibility into the RAG retrieval process
**Important**: The `content` field is an **array of strings**, where each string represents a chunk from the same file. A single file may correspond to multiple chunks, so the content is returned as a list to preserve chunk boundaries.
**Example API Request:**
```json
{
"query": "What is LightRAG?",
"mode": "mix",
"include_references": true,
"include_chunk_content": true
}
```
**Example Response (with chunk content):**
```json
{
"response": "LightRAG is a graph-based RAG system...",
"references": [
{
"reference_id": "1",
"file_path": "/documents/intro.md",
"content": [
"LightRAG is a retrieval-augmented generation system that combines knowledge graphs with vector similarity search...",
"The system uses a dual-indexing approach with both vector embeddings and graph structures for enhanced retrieval..."
]
},
{
"reference_id": "2",
"file_path": "/documents/features.md",
"content": [
"The system provides multiple query modes including local, global, hybrid, and mix modes..."
]
}
]
}
```
**Notes**:
- This parameter only works when `include_references=true`. Setting `include_chunk_content=true` without including references has no effect.
- **Breaking Change**: Prior versions returned `content` as a single concatenated string. Now it returns an array of strings to preserve individual chunk boundaries. If you need a single string, join the array elements with your preferred separator (e.g., `"\n\n".join(content)`).
### .env Examples
```bash

View file

@ -103,6 +103,11 @@ class QueryRequest(BaseModel):
description="If True, includes reference list in responses. Affects /query and /query/stream endpoints. /query/data always includes references.",
)
include_chunk_content: Optional[bool] = Field(
default=False,
description="If True, includes actual chunk text content in references. Only applies when include_references=True. Useful for evaluation and debugging.",
)
stream: Optional[bool] = Field(
default=True,
description="If True, enables streaming output for real-time responses. Only affects /query/stream endpoint.",
@ -130,7 +135,10 @@ class QueryRequest(BaseModel):
def to_query_params(self, is_stream: bool) -> "QueryParam":
"""Converts a QueryRequest instance into a QueryParam instance."""
# Use Pydantic's `.model_dump(exclude_none=True)` to remove None values automatically
request_data = self.model_dump(exclude_none=True, exclude={"query"})
# Exclude API-level parameters that don't belong in QueryParam
request_data = self.model_dump(
exclude_none=True, exclude={"query", "include_chunk_content"}
)
# Ensure `mode` and `stream` are set explicitly
param = QueryParam(**request_data)
@ -138,11 +146,22 @@ class QueryRequest(BaseModel):
return param
class ReferenceItem(BaseModel):
"""A single reference item in query responses."""
reference_id: str = Field(description="Unique reference identifier")
file_path: str = Field(description="Path to the source file")
content: Optional[List[str]] = Field(
default=None,
description="List of chunk contents from this file (only present when include_chunk_content=True)",
)
class QueryResponse(BaseModel):
response: str = Field(
description="The generated response",
)
references: Optional[List[Dict[str, str]]] = Field(
references: Optional[List[ReferenceItem]] = Field(
default=None,
description="Reference list (Disabled when include_references=False, /query/data always includes references.)",
)
@ -200,6 +219,11 @@ def create_query_routes(rag, api_key: Optional[str] = None, top_k: int = 60):
"properties": {
"reference_id": {"type": "string"},
"file_path": {"type": "string"},
"content": {
"type": "array",
"items": {"type": "string"},
"description": "List of chunk contents from this file (only included when include_chunk_content=True)",
},
},
},
"description": "Reference list (only included when include_references=True)",
@ -225,6 +249,30 @@ def create_query_routes(rag, api_key: Optional[str] = None, top_k: int = 60):
],
},
},
"with_chunk_content": {
"summary": "Response with chunk content",
"description": "Example response when include_references=True and include_chunk_content=True. Note: content is an array of chunks from the same file.",
"value": {
"response": "Artificial Intelligence (AI) is a branch of computer science that aims to create intelligent machines capable of performing tasks that typically require human intelligence, such as learning, reasoning, and problem-solving.",
"references": [
{
"reference_id": "1",
"file_path": "/documents/ai_overview.pdf",
"content": [
"Artificial Intelligence (AI) represents a transformative field in computer science focused on creating systems that can perform tasks requiring human-like intelligence. These tasks include learning from experience, understanding natural language, recognizing patterns, and making decisions.",
"AI systems can be categorized into narrow AI, which is designed for specific tasks, and general AI, which aims to match human cognitive abilities across a wide range of domains.",
],
},
{
"reference_id": "2",
"file_path": "/documents/machine_learning.txt",
"content": [
"Machine learning is a subset of AI that enables computers to learn and improve from experience without being explicitly programmed. It focuses on the development of algorithms that can access data and use it to learn for themselves."
],
},
],
},
},
"without_references": {
"summary": "Response without references",
"description": "Example response when include_references=False",
@ -368,13 +416,37 @@ def create_query_routes(rag, api_key: Optional[str] = None, top_k: int = 60):
# Extract LLM response and references from unified result
llm_response = result.get("llm_response", {})
references = result.get("data", {}).get("references", [])
data = result.get("data", {})
references = data.get("references", [])
# Get the non-streaming response content
response_content = llm_response.get("content", "")
if not response_content:
response_content = "No relevant context found for the query."
# Enrich references with chunk content if requested
if request.include_references and request.include_chunk_content:
chunks = data.get("chunks", [])
# Create a mapping from reference_id to chunk content
ref_id_to_content = {}
for chunk in chunks:
ref_id = chunk.get("reference_id", "")
content = chunk.get("content", "")
if ref_id and content:
# Collect chunk content; join later to avoid quadratic string concatenation
ref_id_to_content.setdefault(ref_id, []).append(content)
# Add content to references
enriched_references = []
for ref in references:
ref_copy = ref.copy()
ref_id = ref.get("reference_id", "")
if ref_id in ref_id_to_content:
# Keep content as a list of chunks (one file may have multiple chunks)
ref_copy["content"] = ref_id_to_content[ref_id]
enriched_references.append(ref_copy)
references = enriched_references
# Return response with or without references based on request
if request.include_references:
return QueryResponse(response=response_content, references=references)
@ -404,6 +476,11 @@ def create_query_routes(rag, api_key: Optional[str] = None, top_k: int = 60):
"description": "Multiple NDJSON lines when stream=True and include_references=True. First line contains references, subsequent lines contain response chunks.",
"value": '{"references": [{"reference_id": "1", "file_path": "/documents/ai_overview.pdf"}, {"reference_id": "2", "file_path": "/documents/ml_basics.txt"}]}\n{"response": "Artificial Intelligence (AI) is a branch of computer science"}\n{"response": " that aims to create intelligent machines capable of performing"}\n{"response": " tasks that typically require human intelligence, such as learning,"}\n{"response": " reasoning, and problem-solving."}',
},
"streaming_with_chunk_content": {
"summary": "Streaming mode with chunk content (stream=true, include_chunk_content=true)",
"description": "Multiple NDJSON lines when stream=True, include_references=True, and include_chunk_content=True. First line contains references with content arrays (one file may have multiple chunks), subsequent lines contain response chunks.",
"value": '{"references": [{"reference_id": "1", "file_path": "/documents/ai_overview.pdf", "content": ["Artificial Intelligence (AI) represents a transformative field...", "AI systems can be categorized into narrow AI and general AI..."]}, {"reference_id": "2", "file_path": "/documents/ml_basics.txt", "content": ["Machine learning is a subset of AI that enables computers to learn..."]}]}\n{"response": "Artificial Intelligence (AI) is a branch of computer science"}\n{"response": " that aims to create intelligent machines capable of performing"}\n{"response": " tasks that typically require human intelligence."}',
},
"streaming_without_references": {
"summary": "Streaming mode without references (stream=true)",
"description": "Multiple NDJSON lines when stream=True and include_references=False. Only response chunks are sent.",
@ -600,6 +677,30 @@ def create_query_routes(rag, api_key: Optional[str] = None, top_k: int = 60):
references = result.get("data", {}).get("references", [])
llm_response = result.get("llm_response", {})
# Enrich references with chunk content if requested
if request.include_references and request.include_chunk_content:
data = result.get("data", {})
chunks = data.get("chunks", [])
# Create a mapping from reference_id to chunk content
ref_id_to_content = {}
for chunk in chunks:
ref_id = chunk.get("reference_id", "")
content = chunk.get("content", "")
if ref_id and content:
# Collect chunk content
ref_id_to_content.setdefault(ref_id, []).append(content)
# Add content to references
enriched_references = []
for ref in references:
ref_copy = ref.copy()
ref_id = ref.get("reference_id", "")
if ref_id in ref_id_to_content:
# Keep content as a list of chunks (one file may have multiple chunks)
ref_copy["content"] = ref_id_to_content[ref_id]
enriched_references.append(ref_copy)
references = enriched_references
if llm_response.get("is_streaming"):
# Streaming mode: send references first, then stream response chunks
if request.include_references:

View file

@ -0,0 +1,323 @@
# 📊 LightRAG Evaluation Framework
RAGAS-based offline evaluation of your LightRAG system.
## What is RAGAS?
**RAGAS** (Retrieval Augmented Generation Assessment) is a framework for reference-free evaluation of RAG systems using LLMs.
Instead of requiring human-annotated ground truth, RAGAS uses state-of-the-art evaluation metrics:
### Core Metrics
| Metric | What It Measures | Good Score |
|--------|-----------------|-----------|
| **Faithfulness** | Is the answer factually accurate based on retrieved context? | > 0.80 |
| **Answer Relevance** | Is the answer relevant to the user's question? | > 0.80 |
| **Context Recall** | Was all relevant information retrieved from documents? | > 0.80 |
| **Context Precision** | Is retrieved context clean without irrelevant noise? | > 0.80 |
| **RAGAS Score** | Overall quality metric (average of above) | > 0.80 |
---
## 📁 Structure
```
lightrag/evaluation/
├── eval_rag_quality.py # Main evaluation script
├── sample_dataset.json # 3 test questions about LightRAG
├── sample_documents/ # Matching markdown files for testing
│ ├── 01_lightrag_overview.md
│ ├── 02_rag_architecture.md
│ ├── 03_lightrag_improvements.md
│ ├── 04_supported_databases.md
│ ├── 05_evaluation_and_deployment.md
│ └── README.md
├── __init__.py # Package init
├── results/ # Output directory
│ ├── results_YYYYMMDD_HHMMSS.json # Raw metrics in JSON
│ └── results_YYYYMMDD_HHMMSS.csv # Metrics in CSV format
└── README.md # This file
```
**Quick Test:** Index files from `sample_documents/` into LightRAG, then run the evaluator to reproduce results (~89-100% RAGAS score per question).
---
## 🚀 Quick Start
### 1. Install Dependencies
```bash
pip install ragas datasets langfuse
```
Or use your project dependencies (already included in pyproject.toml):
```bash
pip install -e ".[offline-llm]"
```
### 2. Run Evaluation
```bash
cd /path/to/LightRAG
python -m lightrag.evaluation.eval_rag_quality
```
Or directly:
```bash
python lightrag/evaluation/eval_rag_quality.py
```
### 3. View Results
Results are saved automatically in `lightrag/evaluation/results/`:
```
results/
├── results_20241023_143022.json ← Raw metrics in JSON format
└── results_20241023_143022.csv ← Metrics in CSV format (for spreadsheets)
```
**Results include:**
- ✅ Overall RAGAS score
- 📊 Per-metric averages (Faithfulness, Answer Relevance, Context Recall, Context Precision)
- 📋 Individual test case results
- 📈 Performance breakdown by question
---
## ⚙️ Configuration
### Environment Variables
The evaluation framework supports customization through environment variables:
| Variable | Default | Description |
|----------|---------|-------------|
| `EVAL_LLM_MODEL` | `gpt-4o-mini` | LLM model used for RAGAS evaluation |
| `EVAL_EMBEDDING_MODEL` | `text-embedding-3-small` | Embedding model for evaluation |
| `EVAL_LLM_BINDING_API_KEY` | (falls back to `OPENAI_API_KEY`) | API key for evaluation models |
| `EVAL_LLM_BINDING_HOST` | (optional) | Custom endpoint URL for OpenAI-compatible services |
| `EVAL_MAX_CONCURRENT` | `1` | Number of concurrent test case evaluations (1=serial) |
| `EVAL_QUERY_TOP_K` | `10` | Number of documents to retrieve per query |
| `EVAL_LLM_MAX_RETRIES` | `5` | Maximum LLM request retries |
| `EVAL_LLM_TIMEOUT` | `120` | LLM request timeout in seconds |
### Usage Examples
**Default Configuration (OpenAI):**
```bash
export OPENAI_API_KEY=sk-xxx
python lightrag/evaluation/eval_rag_quality.py
```
**Custom Model:**
```bash
export OPENAI_API_KEY=sk-xxx
export EVAL_LLM_MODEL=gpt-4o-mini
export EVAL_EMBEDDING_MODEL=text-embedding-3-large
python lightrag/evaluation/eval_rag_quality.py
```
**OpenAI-Compatible Endpoint:**
```bash
export EVAL_LLM_BINDING_API_KEY=your-custom-key
export EVAL_LLM_BINDING_HOST=https://api.openai.com/v1
export EVAL_LLM_MODEL=qwen-plus
python lightrag/evaluation/eval_rag_quality.py
```
### Concurrency Control & Rate Limiting
The evaluation framework includes built-in concurrency control to prevent API rate limiting issues:
**Why Concurrency Control Matters:**
- RAGAS internally makes many concurrent LLM calls for each test case
- Context Precision metric calls LLM once per retrieved document
- Without control, this can easily exceed API rate limits
**Default Configuration (Conservative):**
```bash
EVAL_MAX_CONCURRENT=1 # Serial evaluation (one test at a time)
EVAL_QUERY_TOP_K=10 # OP_K query parameter of LightRAG
EVAL_LLM_MAX_RETRIES=5 # Retry failed requests 5 times
EVAL_LLM_TIMEOUT=180 # 2-minute timeout per request
```
**If You Have Higher API Quotas:**
```bash
EVAL_MAX_CONCURRENT=2 # Evaluate 2 tests in parallel
EVAL_QUERY_TOP_K=20 # OP_K query parameter of LightRAG
```
**Common Issues and Solutions:**
| Issue | Solution |
|-------|----------|
| **Warning: "LM returned 1 generations instead of 3"** | Reduce `EVAL_MAX_CONCURRENT` to 1 or decrease `EVAL_QUERY_TOP_K` |
| **Context Precision returns NaN** | Lower `EVAL_QUERY_TOP_K` to reduce LLM calls per test case |
| **Rate limit errors (429)** | Increase `EVAL_LLM_MAX_RETRIES` and decrease `EVAL_MAX_CONCURRENT` |
| **Request timeouts** | Increase `EVAL_LLM_TIMEOUT` to 180 or higher |
---
## 📝 Test Dataset
`sample_dataset.json` contains 3 generic questions about LightRAG. Replace with questions matching YOUR indexed documents.
**Custom Test Cases:**
```json
{
"test_cases": [
{
"question": "Your question here",
"ground_truth": "Expected answer from your data",
"context": "topic"
}
]
}
```
---
## 📊 Interpreting Results
### Score Ranges
- **0.80-1.00**: ✅ Excellent (Production-ready)
- **0.60-0.80**: ⚠️ Good (Room for improvement)
- **0.40-0.60**: ❌ Poor (Needs optimization)
- **0.00-0.40**: 🔴 Critical (Major issues)
### What Low Scores Mean
| Metric | Low Score Indicates |
|--------|-------------------|
| **Faithfulness** | Responses contain hallucinations or incorrect information |
| **Answer Relevance** | Answers don't match what users asked |
| **Context Recall** | Missing important information in retrieval |
| **Context Precision** | Retrieved documents contain irrelevant noise |
### Optimization Tips
1. **Low Faithfulness**:
- Improve entity extraction quality
- Better document chunking
- Tune retrieval temperature
2. **Low Answer Relevance**:
- Improve prompt engineering
- Better query understanding
- Check semantic similarity threshold
3. **Low Context Recall**:
- Increase retrieval `top_k` results
- Improve embedding model
- Better document preprocessing
4. **Low Context Precision**:
- Smaller, focused chunks
- Better filtering
- Improve chunking strategy
---
## 📚 Resources
- [RAGAS Documentation](https://docs.ragas.io/)
- [RAGAS GitHub](https://github.com/explodinggradients/ragas)
---
## 🐛 Troubleshooting
### "ModuleNotFoundError: No module named 'ragas'"
```bash
pip install ragas datasets
```
### "Warning: LM returned 1 generations instead of requested 3" or Context Precision NaN
**Cause**: This warning indicates API rate limiting or concurrent request overload:
- RAGAS makes multiple LLM calls per test case (faithfulness, relevancy, recall, precision)
- Context Precision calls LLM once per retrieved document (with `EVAL_QUERY_TOP_K=10`, that's 10 calls)
- Concurrent evaluation multiplies these calls: `EVAL_MAX_CONCURRENT × LLM calls per test`
**Solutions** (in order of effectiveness):
1. **Serial Evaluation** (Default):
```bash
export EVAL_MAX_CONCURRENT=1
python lightrag/evaluation/eval_rag_quality.py
```
2. **Reduce Retrieved Documents**:
```bash
export EVAL_QUERY_TOP_K=5 # Halves Context Precision LLM calls
python lightrag/evaluation/eval_rag_quality.py
```
3. **Increase Retry & Timeout**:
```bash
export EVAL_LLM_MAX_RETRIES=10
export EVAL_LLM_TIMEOUT=180
python lightrag/evaluation/eval_rag_quality.py
```
4. **Use Higher Quota API** (if available):
- Upgrade to OpenAI Tier 2+ for higher RPM limits
- Use self-hosted OpenAI-compatible service with no rate limits
### "AttributeError: 'InstructorLLM' object has no attribute 'agenerate_prompt'" or NaN results
This error occurs with RAGAS 0.3.x when LLM and Embeddings are not explicitly configured. The evaluation framework now handles this automatically by:
- Using environment variables to configure evaluation models
- Creating proper LLM and Embeddings instances for RAGAS
**Solution**: Ensure you have set one of the following:
- `OPENAI_API_KEY` environment variable (default)
- `EVAL_LLM_BINDING_API_KEY` for custom API key
The framework will automatically configure the evaluation models.
### "No sample_dataset.json found"
Make sure you're running from the project root:
```bash
cd /path/to/LightRAG
python lightrag/evaluation/eval_rag_quality.py
```
### "LLM API errors during evaluation"
The evaluation uses your configured LLM (OpenAI by default). Ensure:
- API keys are set in `.env`
- Have sufficient API quota
- Network connection is stable
### Evaluation requires running LightRAG API
The evaluator queries a running LightRAG API server at `http://localhost:9621`. Make sure:
1. LightRAG API server is running (`python lightrag/api/lightrag_server.py`)
2. Documents are indexed in your LightRAG instance
3. API is accessible at the configured URL
---
## 📝 Next Steps
1. Index documents into LightRAG (WebUI or API)
2. Start LightRAG API server
3. Run `python lightrag/evaluation/eval_rag_quality.py`
4. Review results (JSON/CSV) in `results/` folder
5. Adjust entity extraction prompts or retrieval settings based on scores
---
**Happy Evaluating! 🚀**

View file

@ -0,0 +1,25 @@
"""
LightRAG Evaluation Module
RAGAS-based evaluation framework for assessing RAG system quality.
Usage:
from lightrag.evaluation import RAGEvaluator
evaluator = RAGEvaluator()
results = await evaluator.run()
Note: RAGEvaluator is imported lazily to avoid import errors
when ragas/datasets are not installed.
"""
__all__ = ["RAGEvaluator"]
def __getattr__(name):
"""Lazy import to avoid dependency errors when ragas is not installed."""
if name == "RAGEvaluator":
from .eval_rag_quality import RAGEvaluator
return RAGEvaluator
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")

View file

@ -0,0 +1,872 @@
#!/usr/bin/env python3
"""
RAGAS Evaluation Script for LightRAG System
Evaluates RAG response quality using RAGAS metrics:
- Faithfulness: Is the answer factually accurate based on context?
- Answer Relevance: Is the answer relevant to the question?
- Context Recall: Is all relevant information retrieved?
- Context Precision: Is retrieved context clean without noise?
Usage:
python lightrag/evaluation/eval_rag_quality.py
python lightrag/evaluation/eval_rag_quality.py http://localhost:9621
python lightrag/evaluation/eval_rag_quality.py http://your-rag-server.com:9621
Results are saved to: lightrag/evaluation/results/
- results_YYYYMMDD_HHMMSS.csv (CSV export for analysis)
- results_YYYYMMDD_HHMMSS.json (Full results with details)
Technical Notes:
- Uses stable RAGAS API (LangchainLLMWrapper) for maximum compatibility
- Supports custom OpenAI-compatible endpoints via EVAL_LLM_BINDING_HOST
- Enables bypass_n mode for endpoints that don't support 'n' parameter
- Deprecation warnings are suppressed for cleaner output
"""
import asyncio
import csv
import json
import math
import os
import sys
import time
import warnings
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List
import httpx
from dotenv import load_dotenv
from lightrag.utils import logger
# Suppress LangchainLLMWrapper deprecation warning
# We use LangchainLLMWrapper for stability and compatibility with all RAGAS versions
warnings.filterwarnings(
"ignore",
message=".*LangchainLLMWrapper is deprecated.*",
category=DeprecationWarning,
)
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
# use the .env that is inside the current folder
# allows to use different .env file for each lightrag instance
# the OS environment variables take precedence over the .env file
load_dotenv(dotenv_path=".env", override=False)
# Conditional imports - will raise ImportError if dependencies not installed
try:
from datasets import Dataset
from ragas import evaluate
from ragas.metrics import (
AnswerRelevancy,
ContextPrecision,
ContextRecall,
Faithfulness,
)
from ragas.llms import LangchainLLMWrapper
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
RAGAS_AVAILABLE = True
except ImportError:
RAGAS_AVAILABLE = False
Dataset = None
evaluate = None
LangchainLLMWrapper = None
CONNECT_TIMEOUT_SECONDS = 180.0
READ_TIMEOUT_SECONDS = 300.0
TOTAL_TIMEOUT_SECONDS = 180.0
def _is_nan(value: Any) -> bool:
"""Return True when value is a float NaN."""
return isinstance(value, float) and math.isnan(value)
class RAGEvaluator:
"""Evaluate RAG system quality using RAGAS metrics"""
def __init__(self, test_dataset_path: str = None, rag_api_url: str = None):
"""
Initialize evaluator with test dataset
Args:
test_dataset_path: Path to test dataset JSON file
rag_api_url: Base URL of LightRAG API (e.g., http://localhost:9621)
If None, will try to read from environment or use default
Environment Variables:
EVAL_LLM_MODEL: LLM model for evaluation (default: gpt-4o-mini)
EVAL_EMBEDDING_MODEL: Embedding model for evaluation (default: text-embedding-3-small)
EVAL_LLM_BINDING_API_KEY: API key for evaluation models (fallback to OPENAI_API_KEY)
EVAL_LLM_BINDING_HOST: Custom endpoint URL for evaluation models (optional)
Raises:
ImportError: If ragas or datasets packages are not installed
EnvironmentError: If EVAL_LLM_BINDING_API_KEY and OPENAI_API_KEY are both not set
"""
# Validate RAGAS dependencies are installed
if not RAGAS_AVAILABLE:
raise ImportError(
"RAGAS dependencies not installed. "
"Install with: pip install ragas datasets"
)
# Configure evaluation models (for RAGAS scoring)
eval_api_key = os.getenv("EVAL_LLM_BINDING_API_KEY") or os.getenv(
"OPENAI_API_KEY"
)
if not eval_api_key:
raise EnvironmentError(
"EVAL_LLM_BINDING_API_KEY or OPENAI_API_KEY is required for evaluation. "
"Set EVAL_LLM_BINDING_API_KEY to use a custom API key, "
"or ensure OPENAI_API_KEY is set."
)
eval_model = os.getenv("EVAL_LLM_MODEL", "gpt-4o-mini")
eval_embedding_model = os.getenv(
"EVAL_EMBEDDING_MODEL", "text-embedding-3-large"
)
eval_base_url = os.getenv("EVAL_LLM_BINDING_HOST")
# Create LLM and Embeddings instances for RAGAS
llm_kwargs = {
"model": eval_model,
"api_key": eval_api_key,
"max_retries": int(os.getenv("EVAL_LLM_MAX_RETRIES", "5")),
"request_timeout": int(os.getenv("EVAL_LLM_TIMEOUT", "180")),
}
embedding_kwargs = {"model": eval_embedding_model, "api_key": eval_api_key}
if eval_base_url:
llm_kwargs["base_url"] = eval_base_url
embedding_kwargs["base_url"] = eval_base_url
# Create base LangChain LLM
base_llm = ChatOpenAI(**llm_kwargs)
self.eval_embeddings = OpenAIEmbeddings(**embedding_kwargs)
# Wrap LLM with LangchainLLMWrapper and enable bypass_n mode for custom endpoints
# This ensures compatibility with endpoints that don't support the 'n' parameter
# by generating multiple outputs through repeated prompts instead of using 'n' parameter
try:
self.eval_llm = LangchainLLMWrapper(
langchain_llm=base_llm,
bypass_n=True, # Enable bypass_n to avoid passing 'n' to OpenAI API
)
logger.debug("Successfully configured bypass_n mode for LLM wrapper")
except Exception as e:
logger.warning(
"Could not configure LangchainLLMWrapper with bypass_n: %s. "
"Using base LLM directly, which may cause warnings with custom endpoints.",
e,
)
self.eval_llm = base_llm
if test_dataset_path is None:
test_dataset_path = Path(__file__).parent / "sample_dataset.json"
if rag_api_url is None:
rag_api_url = os.getenv("LIGHTRAG_API_URL", "http://localhost:9621")
self.test_dataset_path = Path(test_dataset_path)
self.rag_api_url = rag_api_url.rstrip("/")
self.results_dir = Path(__file__).parent / "results"
self.results_dir.mkdir(exist_ok=True)
# Load test dataset
self.test_cases = self._load_test_dataset()
# Store configuration values for display
self.eval_model = eval_model
self.eval_embedding_model = eval_embedding_model
self.eval_base_url = eval_base_url
self.eval_max_retries = llm_kwargs["max_retries"]
self.eval_timeout = llm_kwargs["request_timeout"]
# Display configuration
self._display_configuration()
def _display_configuration(self):
"""Display all evaluation configuration settings"""
logger.info("Evaluation Models:")
logger.info(" • LLM Model: %s", self.eval_model)
logger.info(" • Embedding Model: %s", self.eval_embedding_model)
if self.eval_base_url:
logger.info(" • Custom Endpoint: %s", self.eval_base_url)
logger.info(" • Bypass N-Parameter: Enabled (use LangchainLLMWrapperfor compatibility)")
else:
logger.info(" • Endpoint: OpenAI Official API")
logger.info("Concurrency & Rate Limiting:")
query_top_k = int(os.getenv("EVAL_QUERY_TOP_K", "10"))
logger.info(" • Query Top-K: %s Entities/Relations", query_top_k)
logger.info(" • LLM Max Retries: %s", self.eval_max_retries)
logger.info(" • LLM Timeout: %s seconds", self.eval_timeout)
logger.info("Test Configuration:")
logger.info(" • Total Test Cases: %s", len(self.test_cases))
logger.info(" • Test Dataset: %s", self.test_dataset_path.name)
logger.info(" • LightRAG API: %s", self.rag_api_url)
logger.info(" • Results Directory: %s", self.results_dir.name)
def _load_test_dataset(self) -> List[Dict[str, str]]:
"""Load test cases from JSON file"""
if not self.test_dataset_path.exists():
raise FileNotFoundError(f"Test dataset not found: {self.test_dataset_path}")
with open(self.test_dataset_path) as f:
data = json.load(f)
return data.get("test_cases", [])
async def generate_rag_response(
self,
question: str,
client: httpx.AsyncClient,
) -> Dict[str, Any]:
"""
Generate RAG response by calling LightRAG API.
Args:
question: The user query.
client: Shared httpx AsyncClient for connection pooling.
Returns:
Dictionary with 'answer' and 'contexts' keys.
'contexts' is a list of strings (one per retrieved document).
Raises:
Exception: If LightRAG API is unavailable.
"""
try:
payload = {
"query": question,
"mode": "mix",
"include_references": True,
"include_chunk_content": True, # NEW: Request chunk content in references
"response_type": "Multiple Paragraphs",
"top_k": int(os.getenv("EVAL_QUERY_TOP_K", "10")),
}
# Get API key from environment for authentication
api_key = os.getenv("LIGHTRAG_API_KEY")
# Prepare headers with optional authentication
headers = {}
if api_key:
headers["X-API-Key"] = api_key
# Single optimized API call - gets both answer AND chunk content
response = await client.post(
f"{self.rag_api_url}/query",
json=payload,
headers=headers if headers else None,
)
response.raise_for_status()
result = response.json()
answer = result.get("response", "No response generated")
references = result.get("references", [])
# DEBUG: Inspect the API response
logger.debug("🔍 References Count: %s", len(references))
if references:
first_ref = references[0]
logger.debug("🔍 First Reference Keys: %s", list(first_ref.keys()))
if "content" in first_ref:
content_preview = first_ref["content"]
if isinstance(content_preview, list) and content_preview:
logger.debug(
"🔍 Content Preview (first chunk): %s...",
content_preview[0][:100],
)
elif isinstance(content_preview, str):
logger.debug("🔍 Content Preview: %s...", content_preview[:100])
# Extract chunk content from enriched references
# Note: content is now a list of chunks per reference (one file may have multiple chunks)
contexts = []
for ref in references:
content = ref.get("content", [])
if isinstance(content, list):
# Flatten the list: each chunk becomes a separate context
contexts.extend(content)
elif isinstance(content, str):
# Backward compatibility: if content is still a string (shouldn't happen)
contexts.append(content)
return {
"answer": answer,
"contexts": contexts, # List of strings from actual retrieved chunks
}
except httpx.ConnectError as e:
raise Exception(
f"❌ Cannot connect to LightRAG API at {self.rag_api_url}\n"
f" Make sure LightRAG server is running:\n"
f" python -m lightrag.api.lightrag_server\n"
f" Error: {str(e)}"
)
except httpx.HTTPStatusError as e:
raise Exception(
f"LightRAG API error {e.response.status_code}: {e.response.text}"
)
except httpx.ReadTimeout as e:
raise Exception(
f"Request timeout after waiting for response\n"
f" Question: {question[:100]}...\n"
f" Error: {str(e)}"
)
except Exception as e:
raise Exception(f"Error calling LightRAG API: {type(e).__name__}: {str(e)}")
async def evaluate_single_case(
self,
idx: int,
test_case: Dict[str, str],
semaphore: asyncio.Semaphore,
client: httpx.AsyncClient,
progress_counter: Dict[str, int],
) -> Dict[str, Any]:
"""
Evaluate a single test case with concurrency control
Args:
idx: Test case index (1-based)
test_case: Test case dictionary with question and ground_truth
semaphore: Semaphore to control concurrency
client: Shared httpx AsyncClient for connection pooling
progress_counter: Shared dictionary for progress tracking
Returns:
Evaluation result dictionary
"""
async with semaphore:
question = test_case["question"]
ground_truth = test_case["ground_truth"]
# Generate RAG response by calling actual LightRAG API
try:
rag_response = await self.generate_rag_response(
question=question, client=client
)
except Exception as e:
logger.error("Error generating response for test %s: %s", idx, str(e))
progress_counter["completed"] += 1
return {
"test_number": idx,
"question": question,
"error": str(e),
"metrics": {},
"ragas_score": 0,
"timestamp": datetime.now().isoformat(),
}
# *** CRITICAL FIX: Use actual retrieved contexts, NOT ground_truth ***
retrieved_contexts = rag_response["contexts"]
# DEBUG: Print what was actually retrieved (only in debug mode)
logger.debug(
"📝 Test %s: Retrieved %s contexts", idx, len(retrieved_contexts)
)
# Prepare dataset for RAGAS evaluation with CORRECT contexts
eval_dataset = Dataset.from_dict(
{
"question": [question],
"answer": [rag_response["answer"]],
"contexts": [retrieved_contexts],
"ground_truth": [ground_truth],
}
)
# Run RAGAS evaluation
# IMPORTANT: Create fresh metric instances for each evaluation to avoid
# concurrent state conflicts when multiple tasks run in parallel
try:
eval_results = evaluate(
dataset=eval_dataset,
metrics=[
Faithfulness(),
AnswerRelevancy(),
ContextRecall(),
ContextPrecision(),
],
llm=self.eval_llm,
embeddings=self.eval_embeddings,
)
# Convert to DataFrame (RAGAS v0.3+ API)
df = eval_results.to_pandas()
# Extract scores from first row
scores_row = df.iloc[0]
# Extract scores (RAGAS v0.3+ uses .to_pandas())
result = {
"test_number": idx,
"question": question,
"answer": rag_response["answer"][:200] + "..."
if len(rag_response["answer"]) > 200
else rag_response["answer"],
"ground_truth": ground_truth[:200] + "..."
if len(ground_truth) > 200
else ground_truth,
"project": test_case.get("project", "unknown"),
"metrics": {
"faithfulness": float(scores_row.get("faithfulness", 0)),
"answer_relevance": float(
scores_row.get("answer_relevancy", 0)
),
"context_recall": float(scores_row.get("context_recall", 0)),
"context_precision": float(
scores_row.get("context_precision", 0)
),
},
"timestamp": datetime.now().isoformat(),
}
# Calculate RAGAS score (average of all metrics, excluding NaN values)
metrics = result["metrics"]
valid_metrics = [v for v in metrics.values() if not _is_nan(v)]
ragas_score = (
sum(valid_metrics) / len(valid_metrics) if valid_metrics else 0
)
result["ragas_score"] = round(ragas_score, 4)
# Update progress counter
progress_counter["completed"] += 1
return result
except Exception as e:
logger.error("Error evaluating test %s: %s", idx, str(e))
progress_counter["completed"] += 1
return {
"test_number": idx,
"question": question,
"error": str(e),
"metrics": {},
"ragas_score": 0,
"timestamp": datetime.now().isoformat(),
}
async def evaluate_responses(self) -> List[Dict[str, Any]]:
"""
Evaluate all test cases in parallel and return metrics
Returns:
List of evaluation results with metrics
"""
# Get evaluation concurrency from environment (default to 1 for serial evaluation)
max_async = int(os.getenv("EVAL_MAX_CONCURRENT", "3"))
logger.info("%s", "=" * 70)
logger.info("🚀 Starting RAGAS Evaluation of LightRAG System")
logger.info("🔧 Concurrent evaluations: %s", max_async)
logger.info("%s", "=" * 70)
# Create semaphore to limit concurrent evaluations
semaphore = asyncio.Semaphore(max_async)
# Create progress counter (shared across all tasks)
progress_counter = {"completed": 0}
# Create shared HTTP client with connection pooling and proper timeouts
# Timeout: 3 minutes for connect, 5 minutes for read (LLM can be slow)
timeout = httpx.Timeout(
TOTAL_TIMEOUT_SECONDS,
connect=CONNECT_TIMEOUT_SECONDS,
read=READ_TIMEOUT_SECONDS,
)
limits = httpx.Limits(
max_connections=max_async * 2, # Allow some buffer
max_keepalive_connections=max_async,
)
async with httpx.AsyncClient(timeout=timeout, limits=limits) as client:
# Create tasks for all test cases
tasks = [
self.evaluate_single_case(
idx, test_case, semaphore, client, progress_counter
)
for idx, test_case in enumerate(self.test_cases, 1)
]
# Run all evaluations in parallel (limited by semaphore)
results = await asyncio.gather(*tasks)
return list(results)
def _export_to_csv(self, results: List[Dict[str, Any]]) -> Path:
"""
Export evaluation results to CSV file
Args:
results: List of evaluation results
Returns:
Path to the CSV file
CSV Format:
- question: The test question
- project: Project context
- faithfulness: Faithfulness score (0-1)
- answer_relevance: Answer relevance score (0-1)
- context_recall: Context recall score (0-1)
- context_precision: Context precision score (0-1)
- ragas_score: Overall RAGAS score (0-1)
- timestamp: When evaluation was run
"""
csv_path = (
self.results_dir / f"results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
)
with open(csv_path, "w", newline="", encoding="utf-8") as f:
fieldnames = [
"test_number",
"question",
"project",
"faithfulness",
"answer_relevance",
"context_recall",
"context_precision",
"ragas_score",
"status",
"timestamp",
]
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for idx, result in enumerate(results, 1):
metrics = result.get("metrics", {})
writer.writerow(
{
"test_number": idx,
"question": result.get("question", ""),
"project": result.get("project", "unknown"),
"faithfulness": f"{metrics.get('faithfulness', 0):.4f}",
"answer_relevance": f"{metrics.get('answer_relevance', 0):.4f}",
"context_recall": f"{metrics.get('context_recall', 0):.4f}",
"context_precision": f"{metrics.get('context_precision', 0):.4f}",
"ragas_score": f"{result.get('ragas_score', 0):.4f}",
"status": "success" if metrics else "error",
"timestamp": result.get("timestamp", ""),
}
)
return csv_path
def _format_metric(self, value: float, width: int = 6) -> str:
"""
Format a metric value for display, handling NaN gracefully
Args:
value: The metric value to format
width: The width of the formatted string
Returns:
Formatted string (e.g., "0.8523" or " N/A ")
"""
if _is_nan(value):
return "N/A".center(width)
return f"{value:.4f}".rjust(width)
def _display_results_table(self, results: List[Dict[str, Any]]):
"""
Display evaluation results in a formatted table
Args:
results: List of evaluation results
"""
logger.info("%s", "=" * 115)
logger.info("📊 EVALUATION RESULTS SUMMARY")
logger.info("%s", "=" * 115)
# Table header
logger.info(
"%-4s | %-50s | %6s | %7s | %6s | %7s | %6s | %6s",
"#",
"Question",
"Faith",
"AnswRel",
"CtxRec",
"CtxPrec",
"RAGAS",
"Status",
)
logger.info("%s", "-" * 115)
# Table rows
for result in results:
test_num = result.get("test_number", 0)
question = result.get("question", "")
# Truncate question to 50 chars
question_display = (
(question[:47] + "...") if len(question) > 50 else question
)
metrics = result.get("metrics", {})
if metrics:
# Success case - format each metric, handling NaN values
faith = metrics.get("faithfulness", 0)
ans_rel = metrics.get("answer_relevance", 0)
ctx_rec = metrics.get("context_recall", 0)
ctx_prec = metrics.get("context_precision", 0)
ragas = result.get("ragas_score", 0)
status = ""
logger.info(
"%-4d | %-50s | %s | %s | %s | %s | %s | %6s",
test_num,
question_display,
self._format_metric(faith, 6),
self._format_metric(ans_rel, 7),
self._format_metric(ctx_rec, 6),
self._format_metric(ctx_prec, 7),
self._format_metric(ragas, 6),
status,
)
else:
# Error case
error = result.get("error", "Unknown error")
error_display = (error[:20] + "...") if len(error) > 23 else error
logger.info(
"%-4d | %-50s | %6s | %7s | %6s | %7s | %6s | ✗ %s",
test_num,
question_display,
"N/A",
"N/A",
"N/A",
"N/A",
"N/A",
error_display,
)
logger.info("%s", "=" * 115)
def _calculate_benchmark_stats(
self, results: List[Dict[str, Any]]
) -> Dict[str, Any]:
"""
Calculate benchmark statistics from evaluation results
Args:
results: List of evaluation results
Returns:
Dictionary with benchmark statistics
"""
# Filter out results with errors
valid_results = [r for r in results if r.get("metrics")]
total_tests = len(results)
successful_tests = len(valid_results)
failed_tests = total_tests - successful_tests
if not valid_results:
return {
"total_tests": total_tests,
"successful_tests": 0,
"failed_tests": failed_tests,
"success_rate": 0.0,
}
# Calculate averages for each metric (handling NaN values correctly)
# Track both sum and count for each metric to handle NaN values properly
metrics_data = {
"faithfulness": {"sum": 0.0, "count": 0},
"answer_relevance": {"sum": 0.0, "count": 0},
"context_recall": {"sum": 0.0, "count": 0},
"context_precision": {"sum": 0.0, "count": 0},
"ragas_score": {"sum": 0.0, "count": 0},
}
for result in valid_results:
metrics = result.get("metrics", {})
# For each metric, sum non-NaN values and count them
faithfulness = metrics.get("faithfulness", 0)
if not _is_nan(faithfulness):
metrics_data["faithfulness"]["sum"] += faithfulness
metrics_data["faithfulness"]["count"] += 1
answer_relevance = metrics.get("answer_relevance", 0)
if not _is_nan(answer_relevance):
metrics_data["answer_relevance"]["sum"] += answer_relevance
metrics_data["answer_relevance"]["count"] += 1
context_recall = metrics.get("context_recall", 0)
if not _is_nan(context_recall):
metrics_data["context_recall"]["sum"] += context_recall
metrics_data["context_recall"]["count"] += 1
context_precision = metrics.get("context_precision", 0)
if not _is_nan(context_precision):
metrics_data["context_precision"]["sum"] += context_precision
metrics_data["context_precision"]["count"] += 1
ragas_score = result.get("ragas_score", 0)
if not _is_nan(ragas_score):
metrics_data["ragas_score"]["sum"] += ragas_score
metrics_data["ragas_score"]["count"] += 1
# Calculate averages using actual counts for each metric
avg_metrics = {}
for metric_name, data in metrics_data.items():
if data["count"] > 0:
avg_val = data["sum"] / data["count"]
avg_metrics[metric_name] = (
round(avg_val, 4) if not _is_nan(avg_val) else 0.0
)
else:
avg_metrics[metric_name] = 0.0
# Find min and max RAGAS scores (filter out NaN)
ragas_scores = []
for r in valid_results:
score = r.get("ragas_score", 0)
if _is_nan(score):
continue # Skip NaN values
ragas_scores.append(score)
min_score = min(ragas_scores) if ragas_scores else 0
max_score = max(ragas_scores) if ragas_scores else 0
return {
"total_tests": total_tests,
"successful_tests": successful_tests,
"failed_tests": failed_tests,
"success_rate": round(successful_tests / total_tests * 100, 2),
"average_metrics": avg_metrics,
"min_ragas_score": round(min_score, 4),
"max_ragas_score": round(max_score, 4),
}
async def run(self) -> Dict[str, Any]:
"""Run complete evaluation pipeline"""
start_time = time.time()
# Evaluate responses
results = await self.evaluate_responses()
elapsed_time = time.time() - start_time
# Add a small delay to ensure all buffered output is completely written
await asyncio.sleep(0.5)
# Flush all output buffers to ensure RAGAS progress bars are fully displayed
sys.stdout.flush()
sys.stderr.flush()
sys.stdout.write("\n")
sys.stderr.write("\n")
sys.stdout.flush()
sys.stderr.flush()
# Display results table
self._display_results_table(results)
# Calculate benchmark statistics
benchmark_stats = self._calculate_benchmark_stats(results)
# Save results
summary = {
"timestamp": datetime.now().isoformat(),
"total_tests": len(results),
"elapsed_time_seconds": round(elapsed_time, 2),
"benchmark_stats": benchmark_stats,
"results": results,
}
# Save JSON results
json_path = (
self.results_dir
/ f"results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
)
with open(json_path, "w") as f:
json.dump(summary, f, indent=2)
logger.info("✅ JSON results saved to: %s", json_path)
# Export to CSV
csv_path = self._export_to_csv(results)
logger.info("✅ CSV results saved to: %s", csv_path)
# Print summary
logger.info("")
logger.info("%s", "=" * 70)
logger.info("📊 EVALUATION COMPLETE")
logger.info("%s", "=" * 70)
logger.info("Total Tests: %s", len(results))
logger.info("Successful: %s", benchmark_stats["successful_tests"])
logger.info("Failed: %s", benchmark_stats["failed_tests"])
logger.info("Success Rate: %.2f%%", benchmark_stats["success_rate"])
logger.info("Elapsed Time: %.2f seconds", elapsed_time)
logger.info("Avg Time/Test: %.2f seconds", elapsed_time / len(results))
# Print benchmark metrics
logger.info("")
logger.info("%s", "=" * 70)
logger.info("📈 BENCHMARK RESULTS (Average)")
logger.info("%s", "=" * 70)
avg = benchmark_stats["average_metrics"]
logger.info("Average Faithfulness: %.4f", avg["faithfulness"])
logger.info("Average Answer Relevance: %.4f", avg["answer_relevance"])
logger.info("Average Context Recall: %.4f", avg["context_recall"])
logger.info("Average Context Precision: %.4f", avg["context_precision"])
logger.info("Average RAGAS Score: %.4f", avg["ragas_score"])
logger.info("")
logger.info(
"Min RAGAS Score: %.4f",
benchmark_stats["min_ragas_score"],
)
logger.info(
"Max RAGAS Score: %.4f",
benchmark_stats["max_ragas_score"],
)
logger.info("")
logger.info("%s", "=" * 70)
logger.info("📁 GENERATED FILES")
logger.info("%s", "=" * 70)
logger.info("Results Dir: %s", self.results_dir.absolute())
logger.info(" • CSV: %s", csv_path.name)
logger.info(" • JSON: %s", json_path.name)
logger.info("%s", "=" * 70)
return summary
async def main():
"""
Main entry point for RAGAS evaluation
Usage:
python lightrag/evaluation/eval_rag_quality.py
python lightrag/evaluation/eval_rag_quality.py http://localhost:9621
python lightrag/evaluation/eval_rag_quality.py http://your-server.com:9621
"""
try:
# Get RAG API URL from command line or environment
rag_api_url = None
if len(sys.argv) > 1:
rag_api_url = sys.argv[1]
logger.info("%s", "=" * 70)
logger.info("🔍 RAGAS Evaluation - Using Real LightRAG API")
logger.info("%s", "=" * 70)
evaluator = RAGEvaluator(rag_api_url=rag_api_url)
await evaluator.run()
except Exception as e:
logger.exception("❌ Error: %s", e)
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())

View file

@ -0,0 +1,19 @@
{
"test_cases": [
{
"question": "How does LightRAG solve the hallucination problem in large language models?",
"ground_truth": "LightRAG solves the hallucination problem by combining large language models with external knowledge retrieval. The framework ensures accurate responses by grounding LLM outputs in actual documents. LightRAG provides contextual responses that reduce hallucinations significantly.",
"project": "lightrag_overview"
},
{
"question": "What are the three main components required in a RAG system?",
"ground_truth": "A RAG system requires three main components: a retrieval system (vector database or search engine) to find relevant documents, an embedding model to convert text into vector representations for similarity search, and a large language model (LLM) to generate responses based on retrieved context.",
"project": "rag_architecture"
},
{
"question": "How does LightRAG's retrieval performance compare to traditional RAG approaches?",
"ground_truth": "LightRAG delivers faster retrieval performance than traditional RAG approaches. The framework optimizes document retrieval operations for speed, while traditional RAG systems often suffer from slow query response times. LightRAG achieves high quality results with improved performance.",
"project": "lightrag_improvements"
}
]
}

View file

@ -0,0 +1,17 @@
# LightRAG Framework Overview
## What is LightRAG?
**LightRAG** is a Simple and Fast Retrieval-Augmented Generation framework. LightRAG was developed by HKUDS (Hong Kong University Data Science Lab). The framework provides developers with tools to build RAG applications efficiently.
## Problem Statement
Large language models face several limitations. LLMs have a knowledge cutoff date that prevents them from accessing recent information. Large language models generate hallucinations when providing responses without factual grounding. LLMs lack domain-specific expertise in specialized fields.
## How LightRAG Solves These Problems
LightRAG solves the hallucination problem by combining large language models with external knowledge retrieval. The framework ensures accurate responses by grounding LLM outputs in actual documents. LightRAG provides contextual responses that reduce hallucinations significantly. The system enables efficient retrieval from external knowledge bases to supplement LLM capabilities.
## Core Benefits
LightRAG offers accuracy through document-grounded responses. The framework provides up-to-date information without model retraining. LightRAG enables domain expertise through specialized document collections. The system delivers cost-effectiveness by avoiding expensive model fine-tuning. LightRAG ensures transparency by showing source documents for each response.

View file

@ -0,0 +1,21 @@
# RAG System Architecture
## Main Components of RAG Systems
A RAG system consists of three main components that work together to provide intelligent responses.
### Component 1: Retrieval System
The retrieval system is the first component of a RAG system. A retrieval system finds relevant documents from large document collections. Vector databases serve as the primary storage for the retrieval system. Search engines can also function as retrieval systems in RAG architectures.
### Component 2: Embedding Model
The embedding model is the second component of a RAG system. An embedding model converts text into vector representations for similarity search. The embedding model transforms documents and queries into numerical vectors. These vector representations enable semantic similarity matching between queries and documents.
### Component 3: Large Language Model
The large language model is the third component of a RAG system. An LLM generates responses based on retrieved context from documents. The large language model synthesizes information from multiple sources into coherent answers. LLMs provide natural language generation capabilities for the RAG system.
## How Components Work Together
The retrieval system fetches relevant documents for a user query. The embedding model enables similarity matching between query and documents. The LLM generates the final response using retrieved context. These three components collaborate to provide accurate, contextual responses.

View file

@ -0,0 +1,25 @@
# LightRAG Improvements Over Traditional RAG
## Key Improvements
LightRAG improves upon traditional RAG approaches in several significant ways.
### Simpler API Design
LightRAG offers a simpler API compared to traditional RAG frameworks. The framework provides intuitive interfaces for developers. Traditional RAG systems often require complex configuration and setup. LightRAG focuses on ease of use while maintaining functionality.
### Faster Retrieval Performance
LightRAG delivers faster retrieval performance than traditional RAG approaches. The framework optimizes document retrieval operations for speed. Traditional RAG systems often suffer from slow query response times. LightRAG achieves high quality results with improved performance.
### Better Vector Database Integration
LightRAG provides better integration with various vector databases. The framework supports multiple vector database backends seamlessly. Traditional RAG approaches typically lock developers into specific database choices. LightRAG enables flexible storage backend selection.
### Optimized Prompting Strategies
LightRAG implements optimized prompting strategies for better results. The framework uses refined prompt templates for accurate responses. Traditional RAG systems often use generic prompting approaches. LightRAG balances simplicity with high quality output.
## Design Philosophy
LightRAG prioritizes ease of use without sacrificing quality. The framework combines speed with accuracy in retrieval operations. LightRAG maintains flexibility in database and model selection.

View file

@ -0,0 +1,37 @@
# LightRAG Vector Database Support
## Supported Vector Databases
LightRAG supports multiple vector databases for flexible deployment options.
### ChromaDB
ChromaDB is a vector database supported by LightRAG. ChromaDB provides simple deployment for development environments. The database offers efficient vector similarity search capabilities.
### Neo4j
Neo4j is a graph database supported by LightRAG. Neo4j enables graph-based knowledge representation alongside vector search. The database combines relationship modeling with vector capabilities.
### Milvus
Milvus is a vector database supported by LightRAG. Milvus provides high-performance vector search at scale. The database handles large-scale vector collections efficiently.
### Qdrant
Qdrant is a vector database supported by LightRAG. Qdrant offers fast similarity search with filtering capabilities. The database provides production-ready vector search infrastructure.
### MongoDB Atlas Vector Search
MongoDB Atlas Vector Search is supported by LightRAG. MongoDB Atlas combines document storage with vector search capabilities. The database enables unified data management for RAG applications.
### Redis
Redis is supported by LightRAG for vector search operations. Redis provides in-memory vector search with low latency. The database offers fast retrieval for real-time applications.
### Built-in Nano-VectorDB
LightRAG includes a built-in nano-vectordb for simple deployments. Nano-vectordb eliminates external database dependencies for small projects. The built-in database provides basic vector search functionality without additional setup.
## Database Selection Benefits
The multiple database support enables developers to choose appropriate storage backends. LightRAG adapts to different deployment scenarios from development to production. Users can select databases based on scale, performance, and infrastructure requirements.

View file

@ -0,0 +1,41 @@
# RAG Evaluation Metrics and Deployment
## Key RAG Evaluation Metrics
RAG system quality is measured through four key metrics.
### Faithfulness Metric
Faithfulness measures whether answers are factually grounded in retrieved context. The faithfulness metric detects hallucinations in LLM responses. High faithfulness scores indicate answers based on actual document content. The metric evaluates factual accuracy of generated responses.
### Answer Relevance Metric
Answer Relevance measures how well answers address the user question. The answer relevance metric evaluates response quality and appropriateness. High answer relevance scores show responses that directly answer user queries. The metric assesses the connection between questions and generated answers.
### Context Recall Metric
Context Recall measures completeness of retrieval from documents. The context recall metric evaluates whether all relevant information was retrieved. High context recall scores indicate comprehensive document retrieval. The metric assesses retrieval system effectiveness.
### Context Precision Metric
Context Precision measures quality and relevance of retrieved documents. The context precision metric evaluates retrieval accuracy without noise. High context precision scores show clean retrieval without irrelevant content. The metric measures retrieval system selectivity.
## LightRAG Deployment Options
LightRAG can be deployed in production through multiple approaches.
### Docker Container Deployment
Docker containers enable consistent LightRAG deployment across environments. Docker provides isolated runtime environments for the framework. Container deployment simplifies dependency management and scaling.
### REST API Server with FastAPI
FastAPI serves as the REST API framework for LightRAG deployment. The FastAPI server exposes LightRAG functionality through HTTP endpoints. REST API deployment enables client-server architecture for RAG applications.
### Direct Python Integration
Direct Python integration embeds LightRAG into Python applications. Python integration provides programmatic access to RAG capabilities. Direct integration supports custom application workflows and pipelines.
### Deployment Features
LightRAG supports environment-based configuration for different deployment scenarios. The framework integrates with multiple LLM providers for flexibility. LightRAG enables horizontal scaling for production workloads.

View file

@ -0,0 +1,21 @@
# Sample Documents for Evaluation
These markdown files correspond to test questions in `../sample_dataset.json`.
## Usage
1. **Index documents** into LightRAG (via WebUI, API, or Python)
2. **Run evaluation**: `python lightrag/evaluation/eval_rag_quality.py`
3. **Expected results**: ~91-100% RAGAS score per question
## Files
- `01_lightrag_overview.md` - LightRAG framework and hallucination problem
- `02_rag_architecture.md` - RAG system components
- `03_lightrag_improvements.md` - LightRAG vs traditional RAG
- `04_supported_databases.md` - Vector database support
- `05_evaluation_and_deployment.md` - Metrics and deployment
## Note
Documents use clear entity-relationship patterns for LightRAG's default entity extraction prompts. For better results with your data, customize `lightrag/prompt.py`.

View file

@ -113,6 +113,15 @@ offline = [
"lightrag-hku[offline-docs,offline-storage,offline-llm]",
]
evaluation = [
# RAG evaluation dependencies (RAGAS framework)
"ragas>=0.3.7",
"datasets>=4.3.0",
"httpx>=0.28.1",
"pytest>=8.4.2",
"pytest-asyncio>=1.2.0",
]
observability = [
# LLM observability and tracing dependencies
"langfuse>=3.8.1",