This commit is contained in:
Raphaël MANSUY 2025-12-04 19:19:23 +08:00
parent eef3178064
commit 7ce259fbb4
2 changed files with 338 additions and 166 deletions

View file

@ -3,13 +3,16 @@ This module contains all query-related routes for the LightRAG API.
""" """
import json import json
import logging
from typing import Any, Dict, List, Literal, Optional from typing import Any, Dict, List, Literal, Optional
from fastapi import APIRouter, Depends, HTTPException from fastapi import APIRouter, Depends, HTTPException
from lightrag.base import QueryParam from lightrag.base import QueryParam
from lightrag.api.utils_api import get_combined_auth_dependency from lightrag.api.utils_api import get_combined_auth_dependency
from lightrag.utils import logger
from pydantic import BaseModel, Field, field_validator from pydantic import BaseModel, Field, field_validator
from ascii_colors import trace_exception
router = APIRouter(tags=["query"]) router = APIRouter(tags=["query"])
@ -143,22 +146,11 @@ class QueryRequest(BaseModel):
return param return param
class ReferenceItem(BaseModel):
"""A single reference item in query responses."""
reference_id: str = Field(description="Unique reference identifier")
file_path: str = Field(description="Path to the source file")
content: Optional[List[str]] = Field(
default=None,
description="List of chunk contents from this file (only present when include_chunk_content=True)",
)
class QueryResponse(BaseModel): class QueryResponse(BaseModel):
response: str = Field( response: str = Field(
description="The generated response", description="The generated response",
) )
references: Optional[List[ReferenceItem]] = Field( references: Optional[List[Dict[str, str]]] = Field(
default=None, default=None,
description="Reference list (Disabled when include_references=False, /query/data always includes references.)", description="Reference list (Disabled when include_references=False, /query/data always includes references.)",
) )
@ -216,11 +208,6 @@ def create_query_routes(rag, api_key: Optional[str] = None, top_k: int = 60):
"properties": { "properties": {
"reference_id": {"type": "string"}, "reference_id": {"type": "string"},
"file_path": {"type": "string"}, "file_path": {"type": "string"},
"content": {
"type": "array",
"items": {"type": "string"},
"description": "List of chunk contents from this file (only included when include_chunk_content=True)",
},
}, },
}, },
"description": "Reference list (only included when include_references=True)", "description": "Reference list (only included when include_references=True)",
@ -248,24 +235,19 @@ def create_query_routes(rag, api_key: Optional[str] = None, top_k: int = 60):
}, },
"with_chunk_content": { "with_chunk_content": {
"summary": "Response with chunk content", "summary": "Response with chunk content",
"description": "Example response when include_references=True and include_chunk_content=True. Note: content is an array of chunks from the same file.", "description": "Example response when include_references=True and include_chunk_content=True",
"value": { "value": {
"response": "Artificial Intelligence (AI) is a branch of computer science that aims to create intelligent machines capable of performing tasks that typically require human intelligence, such as learning, reasoning, and problem-solving.", "response": "Artificial Intelligence (AI) is a branch of computer science that aims to create intelligent machines capable of performing tasks that typically require human intelligence, such as learning, reasoning, and problem-solving.",
"references": [ "references": [
{ {
"reference_id": "1", "reference_id": "1",
"file_path": "/documents/ai_overview.pdf", "file_path": "/documents/ai_overview.pdf",
"content": [ "content": "Artificial Intelligence (AI) represents a transformative field in computer science focused on creating systems that can perform tasks requiring human-like intelligence. These tasks include learning from experience, understanding natural language, recognizing patterns, and making decisions.",
"Artificial Intelligence (AI) represents a transformative field in computer science focused on creating systems that can perform tasks requiring human-like intelligence. These tasks include learning from experience, understanding natural language, recognizing patterns, and making decisions.",
"AI systems can be categorized into narrow AI, which is designed for specific tasks, and general AI, which aims to match human cognitive abilities across a wide range of domains.",
],
}, },
{ {
"reference_id": "2", "reference_id": "2",
"file_path": "/documents/machine_learning.txt", "file_path": "/documents/machine_learning.txt",
"content": [ "content": "Machine learning is a subset of AI that enables computers to learn and improve from experience without being explicitly programmed. It focuses on the development of algorithms that can access data and use it to learn for themselves.",
"Machine learning is a subset of AI that enables computers to learn and improve from experience without being explicitly programmed. It focuses on the development of algorithms that can access data and use it to learn for themselves."
],
}, },
], ],
}, },
@ -439,8 +421,7 @@ def create_query_routes(rag, api_key: Optional[str] = None, top_k: int = 60):
ref_copy = ref.copy() ref_copy = ref.copy()
ref_id = ref.get("reference_id", "") ref_id = ref.get("reference_id", "")
if ref_id in ref_id_to_content: if ref_id in ref_id_to_content:
# Keep content as a list of chunks (one file may have multiple chunks) ref_copy["content"] = "\n\n".join(ref_id_to_content[ref_id])
ref_copy["content"] = ref_id_to_content[ref_id]
enriched_references.append(ref_copy) enriched_references.append(ref_copy)
references = enriched_references references = enriched_references
@ -450,7 +431,7 @@ def create_query_routes(rag, api_key: Optional[str] = None, top_k: int = 60):
else: else:
return QueryResponse(response=response_content, references=None) return QueryResponse(response=response_content, references=None)
except Exception as e: except Exception as e:
logger.error(f"Error processing query: {str(e)}", exc_info=True) trace_exception(e)
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@router.post( @router.post(
@ -473,11 +454,6 @@ def create_query_routes(rag, api_key: Optional[str] = None, top_k: int = 60):
"description": "Multiple NDJSON lines when stream=True and include_references=True. First line contains references, subsequent lines contain response chunks.", "description": "Multiple NDJSON lines when stream=True and include_references=True. First line contains references, subsequent lines contain response chunks.",
"value": '{"references": [{"reference_id": "1", "file_path": "/documents/ai_overview.pdf"}, {"reference_id": "2", "file_path": "/documents/ml_basics.txt"}]}\n{"response": "Artificial Intelligence (AI) is a branch of computer science"}\n{"response": " that aims to create intelligent machines capable of performing"}\n{"response": " tasks that typically require human intelligence, such as learning,"}\n{"response": " reasoning, and problem-solving."}', "value": '{"references": [{"reference_id": "1", "file_path": "/documents/ai_overview.pdf"}, {"reference_id": "2", "file_path": "/documents/ml_basics.txt"}]}\n{"response": "Artificial Intelligence (AI) is a branch of computer science"}\n{"response": " that aims to create intelligent machines capable of performing"}\n{"response": " tasks that typically require human intelligence, such as learning,"}\n{"response": " reasoning, and problem-solving."}',
}, },
"streaming_with_chunk_content": {
"summary": "Streaming mode with chunk content (stream=true, include_chunk_content=true)",
"description": "Multiple NDJSON lines when stream=True, include_references=True, and include_chunk_content=True. First line contains references with content arrays (one file may have multiple chunks), subsequent lines contain response chunks.",
"value": '{"references": [{"reference_id": "1", "file_path": "/documents/ai_overview.pdf", "content": ["Artificial Intelligence (AI) represents a transformative field...", "AI systems can be categorized into narrow AI and general AI..."]}, {"reference_id": "2", "file_path": "/documents/ml_basics.txt", "content": ["Machine learning is a subset of AI that enables computers to learn..."]}]}\n{"response": "Artificial Intelligence (AI) is a branch of computer science"}\n{"response": " that aims to create intelligent machines capable of performing"}\n{"response": " tasks that typically require human intelligence."}',
},
"streaming_without_references": { "streaming_without_references": {
"summary": "Streaming mode without references (stream=true)", "summary": "Streaming mode without references (stream=true)",
"description": "Multiple NDJSON lines when stream=True and include_references=False. Only response chunks are sent.", "description": "Multiple NDJSON lines when stream=True and include_references=False. Only response chunks are sent.",
@ -674,30 +650,6 @@ def create_query_routes(rag, api_key: Optional[str] = None, top_k: int = 60):
references = result.get("data", {}).get("references", []) references = result.get("data", {}).get("references", [])
llm_response = result.get("llm_response", {}) llm_response = result.get("llm_response", {})
# Enrich references with chunk content if requested
if request.include_references and request.include_chunk_content:
data = result.get("data", {})
chunks = data.get("chunks", [])
# Create a mapping from reference_id to chunk content
ref_id_to_content = {}
for chunk in chunks:
ref_id = chunk.get("reference_id", "")
content = chunk.get("content", "")
if ref_id and content:
# Collect chunk content
ref_id_to_content.setdefault(ref_id, []).append(content)
# Add content to references
enriched_references = []
for ref in references:
ref_copy = ref.copy()
ref_id = ref.get("reference_id", "")
if ref_id in ref_id_to_content:
# Keep content as a list of chunks (one file may have multiple chunks)
ref_copy["content"] = ref_id_to_content[ref_id]
enriched_references.append(ref_copy)
references = enriched_references
if llm_response.get("is_streaming"): if llm_response.get("is_streaming"):
# Streaming mode: send references first, then stream response chunks # Streaming mode: send references first, then stream response chunks
if request.include_references: if request.include_references:
@ -710,7 +662,7 @@ def create_query_routes(rag, api_key: Optional[str] = None, top_k: int = 60):
if chunk: # Only send non-empty content if chunk: # Only send non-empty content
yield f"{json.dumps({'response': chunk})}\n" yield f"{json.dumps({'response': chunk})}\n"
except Exception as e: except Exception as e:
logger.error(f"Streaming error: {str(e)}") logging.error(f"Streaming error: {str(e)}")
yield f"{json.dumps({'error': str(e)})}\n" yield f"{json.dumps({'error': str(e)})}\n"
else: else:
# Non-streaming mode: send complete response in one message # Non-streaming mode: send complete response in one message
@ -736,7 +688,7 @@ def create_query_routes(rag, api_key: Optional[str] = None, top_k: int = 60):
}, },
) )
except Exception as e: except Exception as e:
logger.error(f"Error processing streaming query: {str(e)}", exc_info=True) trace_exception(e)
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@router.post( @router.post(
@ -1153,7 +1105,7 @@ def create_query_routes(rag, api_key: Optional[str] = None, top_k: int = 60):
data={}, data={},
) )
except Exception as e: except Exception as e:
logger.error(f"Error processing data query: {str(e)}", exc_info=True) trace_exception(e)
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
return router return router

View file

@ -10,25 +10,28 @@ Evaluates RAG response quality using RAGAS metrics:
Usage: Usage:
python lightrag/evaluation/eval_rag_quality.py python lightrag/evaluation/eval_rag_quality.py
python lightrag/evaluation/eval_rag_quality.py http://localhost:8000 python lightrag/evaluation/eval_rag_quality.py http://localhost:9621
python lightrag/evaluation/eval_rag_quality.py http://your-rag-server.com:8000 python lightrag/evaluation/eval_rag_quality.py http://your-rag-server.com:9621
Results are saved to: lightrag/evaluation/results/ Results are saved to: lightrag/evaluation/results/
- results_YYYYMMDD_HHMMSS.csv (CSV export for analysis) - results_YYYYMMDD_HHMMSS.csv (CSV export for analysis)
- results_YYYYMMDD_HHMMSS.json (Full results with details) - results_YYYYMMDD_HHMMSS.json (Full results with details)
""" """
import json
import asyncio import asyncio
import time
import csv import csv
from pathlib import Path import json
from datetime import datetime import math
from typing import Any, Dict, List
import sys
import httpx
import os import os
import sys
import time
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List
import httpx
from dotenv import load_dotenv from dotenv import load_dotenv
from lightrag.utils import logger
# Add parent directory to path # Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent.parent)) sys.path.insert(0, str(Path(__file__).parent.parent.parent))
@ -46,20 +49,30 @@ if "OPENAI_API_KEY" not in os.environ:
os.environ["OPENAI_API_KEY"] = input("Enter your OpenAI API key: ") os.environ["OPENAI_API_KEY"] = input("Enter your OpenAI API key: ")
try: try:
from datasets import Dataset
from ragas import evaluate from ragas import evaluate
from ragas.metrics import ( from ragas.metrics import (
faithfulness,
answer_relevancy, answer_relevancy,
context_recall,
context_precision, context_precision,
context_recall,
faithfulness,
) )
from datasets import Dataset
except ImportError as e: except ImportError as e:
print(f"❌ RAGAS import error: {e}") logger.error("❌ RAGAS import error: %s", e)
print(" Install with: pip install ragas datasets") logger.error(" Install with: pip install ragas datasets")
sys.exit(1) sys.exit(1)
CONNECT_TIMEOUT_SECONDS = 180.0
READ_TIMEOUT_SECONDS = 300.0
TOTAL_TIMEOUT_SECONDS = 180.0
def _is_nan(value: Any) -> bool:
"""Return True when value is a float NaN."""
return isinstance(value, float) and math.isnan(value)
class RAGEvaluator: class RAGEvaluator:
"""Evaluate RAG system quality using RAGAS metrics""" """Evaluate RAG system quality using RAGAS metrics"""
@ -69,17 +82,17 @@ class RAGEvaluator:
Args: Args:
test_dataset_path: Path to test dataset JSON file test_dataset_path: Path to test dataset JSON file
rag_api_url: Base URL of LightRAG API (e.g., http://localhost:8000) rag_api_url: Base URL of LightRAG API (e.g., http://localhost:9621)
If None, will try to read from environment or use default If None, will try to read from environment or use default
""" """
if test_dataset_path is None: if test_dataset_path is None:
test_dataset_path = Path(__file__).parent / "test_dataset.json" test_dataset_path = Path(__file__).parent / "sample_dataset.json"
if rag_api_url is None: if rag_api_url is None:
rag_api_url = os.getenv("LIGHTRAG_API_URL", "http://localhost:8000") rag_api_url = os.getenv("LIGHTRAG_API_URL", "http://localhost:9621")
self.test_dataset_path = Path(test_dataset_path) self.test_dataset_path = Path(test_dataset_path)
self.rag_api_url = rag_api_url.rstrip("/") # Remove trailing slash self.rag_api_url = rag_api_url.rstrip("/")
self.results_dir = Path(__file__).parent / "results" self.results_dir = Path(__file__).parent / "results"
self.results_dir.mkdir(exist_ok=True) self.results_dir.mkdir(exist_ok=True)
@ -99,12 +112,14 @@ class RAGEvaluator:
async def generate_rag_response( async def generate_rag_response(
self, self,
question: str, question: str,
client: httpx.AsyncClient,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Generate RAG response by calling LightRAG API. Generate RAG response by calling LightRAG API.
Args: Args:
question: The user query. question: The user query.
client: Shared httpx AsyncClient for connection pooling.
Returns: Returns:
Dictionary with 'answer' and 'contexts' keys. Dictionary with 'answer' and 'contexts' keys.
@ -114,71 +129,110 @@ class RAGEvaluator:
Exception: If LightRAG API is unavailable. Exception: If LightRAG API is unavailable.
""" """
try: try:
async with httpx.AsyncClient(timeout=60.0) as client: payload = {
payload = { "query": question,
"query": question, "mode": "mix",
"mode": "mix", "include_references": True,
"include_references": True, "include_chunk_content": True, # NEW: Request chunk content in references
"response_type": "Multiple Paragraphs", "response_type": "Multiple Paragraphs",
"top_k": 10, "top_k": 10,
} }
response = await client.post( # Single optimized API call - gets both answer AND chunk content
f"{self.rag_api_url}/query", response = await client.post(
json=payload, f"{self.rag_api_url}/query",
) json=payload,
response.raise_for_status() # Better error handling )
result = response.json() response.raise_for_status()
result = response.json()
# Extract text content from each reference document answer = result.get("response", "No response generated")
references = result.get("references", []) references = result.get("references", [])
contexts = [
ref.get("text", "") for ref in references if ref.get("text")
]
return { # DEBUG: Inspect the API response
"answer": result.get("response", "No response generated"), logger.debug("🔍 References Count: %s", len(references))
"contexts": contexts, # List of strings, not JSON dump if references:
} first_ref = references[0]
logger.debug("🔍 First Reference Keys: %s", list(first_ref.keys()))
if "content" in first_ref:
logger.debug(
"🔍 Content Preview: %s...", first_ref["content"][:100]
)
except httpx.ConnectError: # Extract chunk content from enriched references
contexts = [
ref.get("content", "") for ref in references if ref.get("content")
]
return {
"answer": answer,
"contexts": contexts, # List of strings from actual retrieved chunks
}
except httpx.ConnectError as e:
raise Exception( raise Exception(
f"❌ Cannot connect to LightRAG API at {self.rag_api_url}\n" f"❌ Cannot connect to LightRAG API at {self.rag_api_url}\n"
f" Make sure LightRAG server is running:\n" f" Make sure LightRAG server is running:\n"
f" python -m lightrag.api.lightrag_server" f" python -m lightrag.api.lightrag_server\n"
f" Error: {str(e)}"
) )
except httpx.HTTPStatusError as e: except httpx.HTTPStatusError as e:
raise Exception( raise Exception(
f"LightRAG API error {e.response.status_code}: {e.response.text}" f"LightRAG API error {e.response.status_code}: {e.response.text}"
) )
except httpx.ReadTimeout as e:
raise Exception(
f"Request timeout after waiting for response\n"
f" Question: {question[:100]}...\n"
f" Error: {str(e)}"
)
except Exception as e: except Exception as e:
raise Exception(f"Error calling LightRAG API: {str(e)}") raise Exception(f"Error calling LightRAG API: {type(e).__name__}: {str(e)}")
async def evaluate_responses(self) -> List[Dict[str, Any]]: async def evaluate_single_case(
self,
idx: int,
test_case: Dict[str, str],
semaphore: asyncio.Semaphore,
client: httpx.AsyncClient,
) -> Dict[str, Any]:
""" """
Evaluate all test cases and return metrics Evaluate a single test case with concurrency control
Args:
idx: Test case index (1-based)
test_case: Test case dictionary with question and ground_truth
semaphore: Semaphore to control concurrency
client: Shared httpx AsyncClient for connection pooling
Returns: Returns:
List of evaluation results with metrics Evaluation result dictionary
""" """
print("\n" + "=" * 70) total_cases = len(self.test_cases)
print("🚀 Starting RAGAS Evaluation of Portfolio RAG System")
print("=" * 70 + "\n")
results = [] async with semaphore:
for idx, test_case in enumerate(self.test_cases, 1):
question = test_case["question"] question = test_case["question"]
ground_truth = test_case["ground_truth"] ground_truth = test_case["ground_truth"]
print(f"[{idx}/{len(self.test_cases)}] Evaluating: {question[:60]}...") logger.info("[%s/%s] Evaluating: %s...", idx, total_cases, question[:60])
# Generate RAG response by calling actual LightRAG API # Generate RAG response by calling actual LightRAG API
rag_response = await self.generate_rag_response(question=question) rag_response = await self.generate_rag_response(
question=question, client=client
)
# *** CRITICAL FIX: Use actual retrieved contexts, NOT ground_truth *** # *** CRITICAL FIX: Use actual retrieved contexts, NOT ground_truth ***
retrieved_contexts = rag_response["contexts"] retrieved_contexts = rag_response["contexts"]
# DEBUG: Print what was actually retrieved
logger.debug("📝 Retrieved %s contexts", len(retrieved_contexts))
if retrieved_contexts:
logger.debug(
"📄 First context preview: %s...", retrieved_contexts[0][:100]
)
else:
logger.warning("⚠️ No contexts retrieved!")
# Prepare dataset for RAGAS evaluation with CORRECT contexts # Prepare dataset for RAGAS evaluation with CORRECT contexts
eval_dataset = Dataset.from_dict( eval_dataset = Dataset.from_dict(
{ {
@ -235,29 +289,66 @@ class RAGEvaluator:
ragas_score = sum(metrics.values()) / len(metrics) if metrics else 0 ragas_score = sum(metrics.values()) / len(metrics) if metrics else 0
result["ragas_score"] = round(ragas_score, 4) result["ragas_score"] = round(ragas_score, 4)
results.append(result) logger.info("✅ Faithfulness: %.4f", metrics["faithfulness"])
logger.info("✅ Answer Relevance: %.4f", metrics["answer_relevance"])
logger.info("✅ Context Recall: %.4f", metrics["context_recall"])
logger.info("✅ Context Precision: %.4f", metrics["context_precision"])
logger.info("📊 RAGAS Score: %.4f", result["ragas_score"])
# Print metrics return result
print(f" ✅ Faithfulness: {metrics['faithfulness']:.4f}")
print(f" ✅ Answer Relevance: {metrics['answer_relevance']:.4f}")
print(f" ✅ Context Recall: {metrics['context_recall']:.4f}")
print(f" ✅ Context Precision: {metrics['context_precision']:.4f}")
print(f" 📊 RAGAS Score: {result['ragas_score']:.4f}\n")
except Exception as e: except Exception as e:
import traceback logger.exception("❌ Error evaluating: %s", e)
print(f" ❌ Error evaluating: {str(e)}") return {
print(f" 🔍 Full traceback:\n{traceback.format_exc()}\n")
result = {
"question": question, "question": question,
"error": str(e), "error": str(e),
"metrics": {}, "metrics": {},
"ragas_score": 0, "ragas_score": 0,
"timestamp": datetime.now().isoformat() "timestamp": datetime.now().isoformat(),
} }
results.append(result)
return results async def evaluate_responses(self) -> List[Dict[str, Any]]:
"""
Evaluate all test cases in parallel and return metrics
Returns:
List of evaluation results with metrics
"""
# Get MAX_ASYNC from environment (default to 4 if not set)
max_async = int(os.getenv("MAX_ASYNC", "4"))
logger.info("")
logger.info("%s", "=" * 70)
logger.info("🚀 Starting RAGAS Evaluation of Portfolio RAG System")
logger.info("🔧 Parallel evaluations: %s", max_async)
logger.info("%s", "=" * 70)
# Create semaphore to limit concurrent evaluations
semaphore = asyncio.Semaphore(max_async)
# Create shared HTTP client with connection pooling and proper timeouts
# Timeout: 3 minutes for connect, 5 minutes for read (LLM can be slow)
timeout = httpx.Timeout(
TOTAL_TIMEOUT_SECONDS,
connect=CONNECT_TIMEOUT_SECONDS,
read=READ_TIMEOUT_SECONDS,
)
limits = httpx.Limits(
max_connections=max_async * 2, # Allow some buffer
max_keepalive_connections=max_async,
)
async with httpx.AsyncClient(timeout=timeout, limits=limits) as client:
# Create tasks for all test cases
tasks = [
self.evaluate_single_case(idx, test_case, semaphore, client)
for idx, test_case in enumerate(self.test_cases, 1)
]
# Run all evaluations in parallel (limited by semaphore)
results = await asyncio.gather(*tasks)
return list(results)
def _export_to_csv(self, results: List[Dict[str, Any]]) -> Path: def _export_to_csv(self, results: List[Dict[str, Any]]) -> Path:
""" """
@ -279,7 +370,9 @@ class RAGEvaluator:
- ragas_score: Overall RAGAS score (0-1) - ragas_score: Overall RAGAS score (0-1)
- timestamp: When evaluation was run - timestamp: When evaluation was run
""" """
csv_path = self.results_dir / f"results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" csv_path = (
self.results_dir / f"results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
)
with open(csv_path, "w", newline="", encoding="utf-8") as f: with open(csv_path, "w", newline="", encoding="utf-8") as f:
fieldnames = [ fieldnames = [
@ -300,21 +393,110 @@ class RAGEvaluator:
for idx, result in enumerate(results, 1): for idx, result in enumerate(results, 1):
metrics = result.get("metrics", {}) metrics = result.get("metrics", {})
writer.writerow({ writer.writerow(
"test_number": idx, {
"question": result.get("question", ""), "test_number": idx,
"project": result.get("project", "unknown"), "question": result.get("question", ""),
"faithfulness": f"{metrics.get('faithfulness', 0):.4f}", "project": result.get("project", "unknown"),
"answer_relevance": f"{metrics.get('answer_relevance', 0):.4f}", "faithfulness": f"{metrics.get('faithfulness', 0):.4f}",
"context_recall": f"{metrics.get('context_recall', 0):.4f}", "answer_relevance": f"{metrics.get('answer_relevance', 0):.4f}",
"context_precision": f"{metrics.get('context_precision', 0):.4f}", "context_recall": f"{metrics.get('context_recall', 0):.4f}",
"ragas_score": f"{result.get('ragas_score', 0):.4f}", "context_precision": f"{metrics.get('context_precision', 0):.4f}",
"status": "success" if metrics else "error", "ragas_score": f"{result.get('ragas_score', 0):.4f}",
"timestamp": result.get("timestamp", ""), "status": "success" if metrics else "error",
}) "timestamp": result.get("timestamp", ""),
}
)
return csv_path return csv_path
def _calculate_benchmark_stats(
self, results: List[Dict[str, Any]]
) -> Dict[str, Any]:
"""
Calculate benchmark statistics from evaluation results
Args:
results: List of evaluation results
Returns:
Dictionary with benchmark statistics
"""
# Filter out results with errors
valid_results = [r for r in results if r.get("metrics")]
total_tests = len(results)
successful_tests = len(valid_results)
failed_tests = total_tests - successful_tests
if not valid_results:
return {
"total_tests": total_tests,
"successful_tests": 0,
"failed_tests": failed_tests,
"success_rate": 0.0,
}
# Calculate averages for each metric (handling NaN values)
metrics_sum = {
"faithfulness": 0.0,
"answer_relevance": 0.0,
"context_recall": 0.0,
"context_precision": 0.0,
"ragas_score": 0.0,
}
for result in valid_results:
metrics = result.get("metrics", {})
# Skip NaN values when summing
faithfulness = metrics.get("faithfulness", 0)
if not _is_nan(faithfulness):
metrics_sum["faithfulness"] += faithfulness
answer_relevance = metrics.get("answer_relevance", 0)
if not _is_nan(answer_relevance):
metrics_sum["answer_relevance"] += answer_relevance
context_recall = metrics.get("context_recall", 0)
if not _is_nan(context_recall):
metrics_sum["context_recall"] += context_recall
context_precision = metrics.get("context_precision", 0)
if not _is_nan(context_precision):
metrics_sum["context_precision"] += context_precision
ragas_score = result.get("ragas_score", 0)
if not _is_nan(ragas_score):
metrics_sum["ragas_score"] += ragas_score
# Calculate averages
n = len(valid_results)
avg_metrics = {}
for k, v in metrics_sum.items():
avg_val = v / n if n > 0 else 0
# Handle NaN in average
avg_metrics[k] = round(avg_val, 4) if not _is_nan(avg_val) else 0.0
# Find min and max RAGAS scores (filter out NaN)
ragas_scores = []
for r in valid_results:
score = r.get("ragas_score", 0)
if _is_nan(score):
continue # Skip NaN values
ragas_scores.append(score)
min_score = min(ragas_scores) if ragas_scores else 0
max_score = max(ragas_scores) if ragas_scores else 0
return {
"total_tests": total_tests,
"successful_tests": successful_tests,
"failed_tests": failed_tests,
"success_rate": round(successful_tests / total_tests * 100, 2),
"average_metrics": avg_metrics,
"min_ragas_score": round(min_score, 4),
"max_ragas_score": round(max_score, 4),
}
async def run(self) -> Dict[str, Any]: async def run(self) -> Dict[str, Any]:
"""Run complete evaluation pipeline""" """Run complete evaluation pipeline"""
@ -325,35 +507,72 @@ class RAGEvaluator:
elapsed_time = time.time() - start_time elapsed_time = time.time() - start_time
# Calculate benchmark statistics
benchmark_stats = self._calculate_benchmark_stats(results)
# Save results # Save results
summary = { summary = {
"timestamp": datetime.now().isoformat(), "timestamp": datetime.now().isoformat(),
"total_tests": len(results), "total_tests": len(results),
"elapsed_time_seconds": round(elapsed_time, 2), "elapsed_time_seconds": round(elapsed_time, 2),
"results": results "benchmark_stats": benchmark_stats,
"results": results,
} }
# Save JSON results # Save JSON results
json_path = self.results_dir / f"results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" json_path = (
self.results_dir
/ f"results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
)
with open(json_path, "w") as f: with open(json_path, "w") as f:
json.dump(summary, f, indent=2) json.dump(summary, f, indent=2)
print(f"✅ JSON results saved to: {json_path}") logger.info("✅ JSON results saved to: %s", json_path)
# Export to CSV # Export to CSV
csv_path = self._export_to_csv(results) csv_path = self._export_to_csv(results)
print(f"✅ CSV results saved to: {csv_path}") logger.info("✅ CSV results saved to: %s", csv_path)
# Print summary # Print summary
print("\n" + "="*70) logger.info("")
print("📊 EVALUATION COMPLETE") logger.info("%s", "=" * 70)
print("="*70) logger.info("📊 EVALUATION COMPLETE")
print(f"Total Tests: {len(results)}") logger.info("%s", "=" * 70)
print(f"Elapsed Time: {elapsed_time:.2f} seconds") logger.info("Total Tests: %s", len(results))
print(f"Results Dir: {self.results_dir.absolute()}") logger.info("Successful: %s", benchmark_stats["successful_tests"])
print("\n📁 Generated Files:") logger.info("Failed: %s", benchmark_stats["failed_tests"])
print(f" • CSV: {csv_path.name}") logger.info("Success Rate: %.2f%%", benchmark_stats["success_rate"])
print(f" • JSON: {json_path.name}") logger.info("Elapsed Time: %.2f seconds", elapsed_time)
print("="*70 + "\n") logger.info("Avg Time/Test: %.2f seconds", elapsed_time / len(results))
# Print benchmark metrics
logger.info("")
logger.info("%s", "=" * 70)
logger.info("📈 BENCHMARK RESULTS (Moyennes)")
logger.info("%s", "=" * 70)
avg = benchmark_stats["average_metrics"]
logger.info("Moyenne Faithfulness: %.4f", avg["faithfulness"])
logger.info("Moyenne Answer Relevance: %.4f", avg["answer_relevance"])
logger.info("Moyenne Context Recall: %.4f", avg["context_recall"])
logger.info("Moyenne Context Precision: %.4f", avg["context_precision"])
logger.info("Moyenne RAGAS Score: %.4f", avg["ragas_score"])
logger.info("")
logger.info(
"Min RAGAS Score: %.4f",
benchmark_stats["min_ragas_score"],
)
logger.info(
"Max RAGAS Score: %.4f",
benchmark_stats["max_ragas_score"],
)
logger.info("")
logger.info("%s", "=" * 70)
logger.info("📁 GENERATED FILES")
logger.info("%s", "=" * 70)
logger.info("Results Dir: %s", self.results_dir.absolute())
logger.info(" • CSV: %s", csv_path.name)
logger.info(" • JSON: %s", json_path.name)
logger.info("%s", "=" * 70)
return summary return summary
@ -364,8 +583,8 @@ async def main():
Usage: Usage:
python lightrag/evaluation/eval_rag_quality.py python lightrag/evaluation/eval_rag_quality.py
python lightrag/evaluation/eval_rag_quality.py http://localhost:8000 python lightrag/evaluation/eval_rag_quality.py http://localhost:9621
python lightrag/evaluation/eval_rag_quality.py http://your-server.com:8000 python lightrag/evaluation/eval_rag_quality.py http://your-server.com:9621
""" """
try: try:
# Get RAG API URL from command line or environment # Get RAG API URL from command line or environment
@ -373,19 +592,20 @@ async def main():
if len(sys.argv) > 1: if len(sys.argv) > 1:
rag_api_url = sys.argv[1] rag_api_url = sys.argv[1]
print("\n" + "="*70) logger.info("")
print("🔍 RAGAS Evaluation - Using Real LightRAG API") logger.info("%s", "=" * 70)
print("="*70) logger.info("🔍 RAGAS Evaluation - Using Real LightRAG API")
logger.info("%s", "=" * 70)
if rag_api_url: if rag_api_url:
print(f"📡 RAG API URL: {rag_api_url}") logger.info("📡 RAG API URL: %s", rag_api_url)
else: else:
print(f"📡 RAG API URL: http://localhost:8000 (default)") logger.info("📡 RAG API URL: http://localhost:9621 (default)")
print("="*70 + "\n") logger.info("%s", "=" * 70)
evaluator = RAGEvaluator(rag_api_url=rag_api_url) evaluator = RAGEvaluator(rag_api_url=rag_api_url)
await evaluator.run() await evaluator.run()
except Exception as e: except Exception as e:
print(f"\n❌ Error: {str(e)}\n") logger.exception("❌ Error: %s", e)
sys.exit(1) sys.exit(1)