cherry-pick d36be1f4
This commit is contained in:
parent
248c389fda
commit
7eda6d0a4e
1 changed files with 97 additions and 138 deletions
|
|
@ -82,7 +82,7 @@ try:
|
|||
)
|
||||
from ragas.llms import LangchainLLMWrapper
|
||||
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
|
||||
from tqdm.auto import tqdm
|
||||
from tqdm import tqdm
|
||||
|
||||
RAGAS_AVAILABLE = True
|
||||
|
||||
|
|
@ -347,36 +347,28 @@ class RAGEvaluator:
|
|||
self,
|
||||
idx: int,
|
||||
test_case: Dict[str, str],
|
||||
rag_semaphore: asyncio.Semaphore,
|
||||
eval_semaphore: asyncio.Semaphore,
|
||||
semaphore: asyncio.Semaphore,
|
||||
client: httpx.AsyncClient,
|
||||
progress_counter: Dict[str, int],
|
||||
position_pool: asyncio.Queue,
|
||||
pbar_creation_lock: asyncio.Lock,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Evaluate a single test case with two-stage pipeline concurrency control
|
||||
Evaluate a single test case with concurrency control
|
||||
|
||||
Args:
|
||||
idx: Test case index (1-based)
|
||||
test_case: Test case dictionary with question and ground_truth
|
||||
rag_semaphore: Semaphore to control overall concurrency (covers entire function)
|
||||
eval_semaphore: Semaphore to control RAGAS evaluation concurrency (Stage 2)
|
||||
semaphore: Semaphore to control concurrency
|
||||
client: Shared httpx AsyncClient for connection pooling
|
||||
progress_counter: Shared dictionary for progress tracking
|
||||
position_pool: Queue of available tqdm position indices
|
||||
pbar_creation_lock: Lock to serialize tqdm creation and prevent race conditions
|
||||
|
||||
Returns:
|
||||
Evaluation result dictionary
|
||||
"""
|
||||
# rag_semaphore controls the entire evaluation process to prevent
|
||||
# all RAG responses from being generated at once when eval is slow
|
||||
async with rag_semaphore:
|
||||
async with semaphore:
|
||||
question = test_case["question"]
|
||||
ground_truth = test_case["ground_truth"]
|
||||
|
||||
# Stage 1: Generate RAG response
|
||||
# Generate RAG response by calling actual LightRAG API
|
||||
try:
|
||||
rag_response = await self.generate_rag_response(
|
||||
question=question, client=client
|
||||
|
|
@ -396,6 +388,11 @@ class RAGEvaluator:
|
|||
# *** CRITICAL FIX: Use actual retrieved contexts, NOT ground_truth ***
|
||||
retrieved_contexts = rag_response["contexts"]
|
||||
|
||||
# DEBUG: Print what was actually retrieved (only in debug mode)
|
||||
logger.debug(
|
||||
"📝 Test %s: Retrieved %s contexts", idx, len(retrieved_contexts)
|
||||
)
|
||||
|
||||
# Prepare dataset for RAGAS evaluation with CORRECT contexts
|
||||
eval_dataset = Dataset.from_dict(
|
||||
{
|
||||
|
|
@ -406,140 +403,107 @@ class RAGEvaluator:
|
|||
}
|
||||
)
|
||||
|
||||
# Stage 2: Run RAGAS evaluation (controlled by eval_semaphore)
|
||||
# Run RAGAS evaluation
|
||||
# IMPORTANT: Create fresh metric instances for each evaluation to avoid
|
||||
# concurrent state conflicts when multiple tasks run in parallel
|
||||
async with eval_semaphore:
|
||||
pbar = None
|
||||
position = None
|
||||
try:
|
||||
# Acquire a position from the pool for this tqdm progress bar
|
||||
position = await position_pool.get()
|
||||
pbar = None
|
||||
try:
|
||||
# Create standard tqdm progress bar for RAGAS evaluation
|
||||
pbar = tqdm(total=4, desc=f"Eval-{idx}", leave=True)
|
||||
|
||||
eval_results = evaluate(
|
||||
dataset=eval_dataset,
|
||||
metrics=[
|
||||
Faithfulness(),
|
||||
AnswerRelevancy(),
|
||||
ContextRecall(),
|
||||
ContextPrecision(),
|
||||
],
|
||||
llm=self.eval_llm,
|
||||
embeddings=self.eval_embeddings,
|
||||
_pbar=pbar,
|
||||
)
|
||||
|
||||
# Serialize tqdm creation to prevent race conditions
|
||||
# Multiple tasks creating tqdm simultaneously can cause display conflicts
|
||||
async with pbar_creation_lock:
|
||||
# Create tqdm progress bar with assigned position to avoid overlapping
|
||||
# leave=False ensures the progress bar is cleared after completion,
|
||||
# preventing accumulation of completed bars and allowing position reuse
|
||||
pbar = tqdm(
|
||||
total=4, desc=f"Eval-{idx}", position=position, leave=False
|
||||
)
|
||||
# Give tqdm time to initialize and claim its screen position
|
||||
await asyncio.sleep(0.05)
|
||||
# Convert to DataFrame (RAGAS v0.3+ API)
|
||||
df = eval_results.to_pandas()
|
||||
|
||||
eval_results = evaluate(
|
||||
dataset=eval_dataset,
|
||||
metrics=[
|
||||
Faithfulness(),
|
||||
AnswerRelevancy(),
|
||||
ContextRecall(),
|
||||
ContextPrecision(),
|
||||
],
|
||||
llm=self.eval_llm,
|
||||
embeddings=self.eval_embeddings,
|
||||
_pbar=pbar,
|
||||
)
|
||||
# Extract scores from first row
|
||||
scores_row = df.iloc[0]
|
||||
|
||||
# Convert to DataFrame (RAGAS v0.3+ API)
|
||||
df = eval_results.to_pandas()
|
||||
# Extract scores (RAGAS v0.3+ uses .to_pandas())
|
||||
result = {
|
||||
"test_number": idx,
|
||||
"question": question,
|
||||
"answer": rag_response["answer"][:200] + "..."
|
||||
if len(rag_response["answer"]) > 200
|
||||
else rag_response["answer"],
|
||||
"ground_truth": ground_truth[:200] + "..."
|
||||
if len(ground_truth) > 200
|
||||
else ground_truth,
|
||||
"project": test_case.get("project", "unknown"),
|
||||
"metrics": {
|
||||
"faithfulness": float(scores_row.get("faithfulness", 0)),
|
||||
"answer_relevance": float(
|
||||
scores_row.get("answer_relevancy", 0)
|
||||
),
|
||||
"context_recall": float(scores_row.get("context_recall", 0)),
|
||||
"context_precision": float(
|
||||
scores_row.get("context_precision", 0)
|
||||
),
|
||||
},
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
}
|
||||
|
||||
# Extract scores from first row
|
||||
scores_row = df.iloc[0]
|
||||
# Calculate RAGAS score (average of all metrics, excluding NaN values)
|
||||
metrics = result["metrics"]
|
||||
valid_metrics = [v for v in metrics.values() if not _is_nan(v)]
|
||||
ragas_score = (
|
||||
sum(valid_metrics) / len(valid_metrics) if valid_metrics else 0
|
||||
)
|
||||
result["ragas_score"] = round(ragas_score, 4)
|
||||
|
||||
# Extract scores (RAGAS v0.3+ uses .to_pandas())
|
||||
result = {
|
||||
"test_number": idx,
|
||||
"question": question,
|
||||
"answer": rag_response["answer"][:200] + "..."
|
||||
if len(rag_response["answer"]) > 200
|
||||
else rag_response["answer"],
|
||||
"ground_truth": ground_truth[:200] + "..."
|
||||
if len(ground_truth) > 200
|
||||
else ground_truth,
|
||||
"project": test_case.get("project", "unknown"),
|
||||
"metrics": {
|
||||
"faithfulness": float(scores_row.get("faithfulness", 0)),
|
||||
"answer_relevance": float(
|
||||
scores_row.get("answer_relevancy", 0)
|
||||
),
|
||||
"context_recall": float(
|
||||
scores_row.get("context_recall", 0)
|
||||
),
|
||||
"context_precision": float(
|
||||
scores_row.get("context_precision", 0)
|
||||
),
|
||||
},
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
}
|
||||
# Update progress counter
|
||||
progress_counter["completed"] += 1
|
||||
|
||||
# Calculate RAGAS score (average of all metrics, excluding NaN values)
|
||||
metrics = result["metrics"]
|
||||
valid_metrics = [v for v in metrics.values() if not _is_nan(v)]
|
||||
ragas_score = (
|
||||
sum(valid_metrics) / len(valid_metrics) if valid_metrics else 0
|
||||
)
|
||||
result["ragas_score"] = round(ragas_score, 4)
|
||||
return result
|
||||
|
||||
# Update progress counter
|
||||
progress_counter["completed"] += 1
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Error evaluating test %s: %s", idx, str(e))
|
||||
progress_counter["completed"] += 1
|
||||
return {
|
||||
"test_number": idx,
|
||||
"question": question,
|
||||
"error": str(e),
|
||||
"metrics": {},
|
||||
"ragas_score": 0,
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
}
|
||||
finally:
|
||||
# Force close progress bar to ensure completion
|
||||
if pbar is not None:
|
||||
pbar.close()
|
||||
# Release the position back to the pool for reuse
|
||||
if position is not None:
|
||||
await position_pool.put(position)
|
||||
except Exception as e:
|
||||
logger.error("Error evaluating test %s: %s", idx, str(e))
|
||||
progress_counter["completed"] += 1
|
||||
return {
|
||||
"test_number": idx,
|
||||
"question": question,
|
||||
"error": str(e),
|
||||
"metrics": {},
|
||||
"ragas_score": 0,
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
}
|
||||
finally:
|
||||
# Force close progress bar to ensure completion
|
||||
if pbar is not None:
|
||||
pbar.close()
|
||||
|
||||
async def evaluate_responses(self) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Evaluate all test cases in parallel with two-stage pipeline and return metrics
|
||||
Evaluate all test cases in parallel and return metrics
|
||||
|
||||
Returns:
|
||||
List of evaluation results with metrics
|
||||
"""
|
||||
# Get evaluation concurrency from environment (default to 2 for parallel evaluation)
|
||||
# Get evaluation concurrency from environment (default to 1 for serial evaluation)
|
||||
max_async = int(os.getenv("EVAL_MAX_CONCURRENT", "2"))
|
||||
|
||||
logger.info("%s", "=" * 70)
|
||||
logger.info("🚀 Starting RAGAS Evaluation of LightRAG System")
|
||||
logger.info("🔧 Two-Stage Pipeline Configuration:")
|
||||
logger.info(" • RAGAS Evaluation (Stage 2): %s concurrent", max_async)
|
||||
logger.info("🔧 Concurrent evaluations: %s", max_async)
|
||||
logger.info("%s", "=" * 70)
|
||||
|
||||
# Create two-stage pipeline semaphores
|
||||
# Stage 1: RAG generation - allow x2 concurrency to keep evaluation fed
|
||||
rag_semaphore = asyncio.Semaphore(max_async * 2)
|
||||
# Stage 2: RAGAS evaluation - primary bottleneck
|
||||
eval_semaphore = asyncio.Semaphore(max_async)
|
||||
# Create semaphore to limit concurrent evaluations
|
||||
semaphore = asyncio.Semaphore(max_async)
|
||||
|
||||
# Create progress counter (shared across all tasks)
|
||||
progress_counter = {"completed": 0}
|
||||
|
||||
# Create position pool for tqdm progress bars
|
||||
# Positions range from 0 to max_async-1, ensuring no overlapping displays
|
||||
position_pool = asyncio.Queue()
|
||||
for i in range(max_async):
|
||||
await position_pool.put(i)
|
||||
|
||||
# Create lock to serialize tqdm creation and prevent race conditions
|
||||
# This ensures progress bars are created one at a time, avoiding display conflicts
|
||||
pbar_creation_lock = asyncio.Lock()
|
||||
|
||||
# Create shared HTTP client with connection pooling and proper timeouts
|
||||
# Timeout: 3 minutes for connect, 5 minutes for read (LLM can be slow)
|
||||
timeout = httpx.Timeout(
|
||||
|
|
@ -548,27 +512,20 @@ class RAGEvaluator:
|
|||
read=READ_TIMEOUT_SECONDS,
|
||||
)
|
||||
limits = httpx.Limits(
|
||||
max_connections=(max_async + 1) * 2, # Allow buffer for RAG stage
|
||||
max_keepalive_connections=max_async + 1,
|
||||
max_connections=max_async * 2, # Allow some buffer
|
||||
max_keepalive_connections=max_async,
|
||||
)
|
||||
|
||||
async with httpx.AsyncClient(timeout=timeout, limits=limits) as client:
|
||||
# Create tasks for all test cases
|
||||
tasks = [
|
||||
self.evaluate_single_case(
|
||||
idx,
|
||||
test_case,
|
||||
rag_semaphore,
|
||||
eval_semaphore,
|
||||
client,
|
||||
progress_counter,
|
||||
position_pool,
|
||||
pbar_creation_lock,
|
||||
idx, test_case, semaphore, client, progress_counter
|
||||
)
|
||||
for idx, test_case in enumerate(self.test_cases, 1)
|
||||
]
|
||||
|
||||
# Run all evaluations in parallel (limited by two-stage semaphores)
|
||||
# Run all evaluations in parallel (limited by semaphore)
|
||||
results = await asyncio.gather(*tasks)
|
||||
|
||||
return list(results)
|
||||
|
|
@ -655,7 +612,6 @@ class RAGEvaluator:
|
|||
Args:
|
||||
results: List of evaluation results
|
||||
"""
|
||||
logger.info("")
|
||||
logger.info("%s", "=" * 115)
|
||||
logger.info("📊 EVALUATION RESULTS SUMMARY")
|
||||
logger.info("%s", "=" * 115)
|
||||
|
|
@ -841,9 +797,6 @@ class RAGEvaluator:
|
|||
"results": results,
|
||||
}
|
||||
|
||||
# Display results table
|
||||
self._display_results_table(results)
|
||||
|
||||
# Save JSON results
|
||||
json_path = (
|
||||
self.results_dir
|
||||
|
|
@ -852,8 +805,14 @@ class RAGEvaluator:
|
|||
with open(json_path, "w") as f:
|
||||
json.dump(summary, f, indent=2)
|
||||
|
||||
# Display results table
|
||||
self._display_results_table(results)
|
||||
|
||||
logger.info("✅ JSON results saved to: %s", json_path)
|
||||
|
||||
# Export to CSV
|
||||
csv_path = self._export_to_csv(results)
|
||||
logger.info("✅ CSV results saved to: %s", csv_path)
|
||||
|
||||
# Print summary
|
||||
logger.info("")
|
||||
|
|
@ -878,7 +837,7 @@ class RAGEvaluator:
|
|||
logger.info("Average Context Recall: %.4f", avg["context_recall"])
|
||||
logger.info("Average Context Precision: %.4f", avg["context_precision"])
|
||||
logger.info("Average RAGAS Score: %.4f", avg["ragas_score"])
|
||||
logger.info("%s", "-" * 70)
|
||||
logger.info("")
|
||||
logger.info(
|
||||
"Min RAGAS Score: %.4f",
|
||||
benchmark_stats["min_ragas_score"],
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue