Restructure semaphore control to manage entire evaluation pipeline

• Move rag_semaphore to wrap full function
• Increase RAG concurrency to 2x eval limit
• Prevent memory buildup from slow evals
• Keep eval_semaphore for RAGAS control
This commit is contained in:
yangdx 2025-11-05 01:07:53 +08:00
parent 83715a3ac1
commit e5abe9dd3d

View file

@ -358,7 +358,7 @@ class RAGEvaluator:
Args:
idx: Test case index (1-based)
test_case: Test case dictionary with question and ground_truth
rag_semaphore: Semaphore to control RAG generation concurrency (Stage 1)
rag_semaphore: Semaphore to control overall concurrency (covers entire function)
eval_semaphore: Semaphore to control RAGAS evaluation concurrency (Stage 2)
client: Shared httpx AsyncClient for connection pooling
progress_counter: Shared dictionary for progress tracking
@ -366,11 +366,13 @@ class RAGEvaluator:
Returns:
Evaluation result dictionary
"""
question = test_case["question"]
ground_truth = test_case["ground_truth"]
# Stage 1: Generate RAG response (controlled by rag_semaphore)
# rag_semaphore controls the entire evaluation process to prevent
# all RAG responses from being generated at once when eval is slow
async with rag_semaphore:
question = test_case["question"]
ground_truth = test_case["ground_truth"]
# Stage 1: Generate RAG response
try:
rag_response = await self.generate_rag_response(
question=question, client=client
@ -387,102 +389,104 @@ class RAGEvaluator:
"timestamp": datetime.now().isoformat(),
}
# *** CRITICAL FIX: Use actual retrieved contexts, NOT ground_truth ***
retrieved_contexts = rag_response["contexts"]
# *** CRITICAL FIX: Use actual retrieved contexts, NOT ground_truth ***
retrieved_contexts = rag_response["contexts"]
# DEBUG: Print what was actually retrieved (only in debug mode)
logger.debug("📝 Test %s: Retrieved %s contexts", idx, len(retrieved_contexts))
# Prepare dataset for RAGAS evaluation with CORRECT contexts
eval_dataset = Dataset.from_dict(
{
"question": [question],
"answer": [rag_response["answer"]],
"contexts": [retrieved_contexts],
"ground_truth": [ground_truth],
}
)
# Stage 2: Run RAGAS evaluation (controlled by eval_semaphore)
# IMPORTANT: Create fresh metric instances for each evaluation to avoid
# concurrent state conflicts when multiple tasks run in parallel
async with eval_semaphore:
pbar = None
try:
# Create standard tqdm progress bar for RAGAS evaluation
pbar = tqdm(total=4, desc=f"Eval-{idx}", leave=True)
eval_results = evaluate(
dataset=eval_dataset,
metrics=[
Faithfulness(),
AnswerRelevancy(),
ContextRecall(),
ContextPrecision(),
],
llm=self.eval_llm,
embeddings=self.eval_embeddings,
_pbar=pbar,
)
# Convert to DataFrame (RAGAS v0.3+ API)
df = eval_results.to_pandas()
# Extract scores from first row
scores_row = df.iloc[0]
# Extract scores (RAGAS v0.3+ uses .to_pandas())
result = {
"test_number": idx,
"question": question,
"answer": rag_response["answer"][:200] + "..."
if len(rag_response["answer"]) > 200
else rag_response["answer"],
"ground_truth": ground_truth[:200] + "..."
if len(ground_truth) > 200
else ground_truth,
"project": test_case.get("project", "unknown"),
"metrics": {
"faithfulness": float(scores_row.get("faithfulness", 0)),
"answer_relevance": float(
scores_row.get("answer_relevancy", 0)
),
"context_recall": float(scores_row.get("context_recall", 0)),
"context_precision": float(
scores_row.get("context_precision", 0)
),
},
"timestamp": datetime.now().isoformat(),
# Prepare dataset for RAGAS evaluation with CORRECT contexts
eval_dataset = Dataset.from_dict(
{
"question": [question],
"answer": [rag_response["answer"]],
"contexts": [retrieved_contexts],
"ground_truth": [ground_truth],
}
)
# Calculate RAGAS score (average of all metrics, excluding NaN values)
metrics = result["metrics"]
valid_metrics = [v for v in metrics.values() if not _is_nan(v)]
ragas_score = (
sum(valid_metrics) / len(valid_metrics) if valid_metrics else 0
)
result["ragas_score"] = round(ragas_score, 4)
# Stage 2: Run RAGAS evaluation (controlled by eval_semaphore)
# IMPORTANT: Create fresh metric instances for each evaluation to avoid
# concurrent state conflicts when multiple tasks run in parallel
async with eval_semaphore:
pbar = None
try:
# Create standard tqdm progress bar for RAGAS evaluation
pbar = tqdm(total=4, desc=f"Eval-{idx}", leave=True)
# Update progress counter
progress_counter["completed"] += 1
eval_results = evaluate(
dataset=eval_dataset,
metrics=[
Faithfulness(),
AnswerRelevancy(),
ContextRecall(),
ContextPrecision(),
],
llm=self.eval_llm,
embeddings=self.eval_embeddings,
_pbar=pbar,
)
return result
except Exception as e:
logger.error("Error evaluating test %s: %s", idx, str(e))
progress_counter["completed"] += 1
return {
"test_number": idx,
"question": question,
"error": str(e),
"metrics": {},
"ragas_score": 0,
"timestamp": datetime.now().isoformat(),
}
finally:
# Force close progress bar to ensure completion
if pbar is not None:
pbar.close()
pbar = None
# Convert to DataFrame (RAGAS v0.3+ API)
df = eval_results.to_pandas()
# Extract scores from first row
scores_row = df.iloc[0]
# Extract scores (RAGAS v0.3+ uses .to_pandas())
result = {
"test_number": idx,
"question": question,
"answer": rag_response["answer"][:200] + "..."
if len(rag_response["answer"]) > 200
else rag_response["answer"],
"ground_truth": ground_truth[:200] + "..."
if len(ground_truth) > 200
else ground_truth,
"project": test_case.get("project", "unknown"),
"metrics": {
"faithfulness": float(scores_row.get("faithfulness", 0)),
"answer_relevance": float(
scores_row.get("answer_relevancy", 0)
),
"context_recall": float(
scores_row.get("context_recall", 0)
),
"context_precision": float(
scores_row.get("context_precision", 0)
),
},
"timestamp": datetime.now().isoformat(),
}
# Calculate RAGAS score (average of all metrics, excluding NaN values)
metrics = result["metrics"]
valid_metrics = [v for v in metrics.values() if not _is_nan(v)]
ragas_score = (
sum(valid_metrics) / len(valid_metrics) if valid_metrics else 0
)
result["ragas_score"] = round(ragas_score, 4)
# Update progress counter
progress_counter["completed"] += 1
return result
except Exception as e:
logger.error("Error evaluating test %s: %s", idx, str(e))
progress_counter["completed"] += 1
return {
"test_number": idx,
"question": question,
"error": str(e),
"metrics": {},
"ragas_score": 0,
"timestamp": datetime.now().isoformat(),
}
finally:
# Force close progress bar to ensure completion
if pbar is not None:
pbar.close()
async def evaluate_responses(self) -> List[Dict[str, Any]]:
"""
@ -501,8 +505,8 @@ class RAGEvaluator:
logger.info("%s", "=" * 70)
# Create two-stage pipeline semaphores
# Stage 1: RAG generation - allow +1 concurrency to keep evaluation fed
rag_semaphore = asyncio.Semaphore(max_async + 1)
# Stage 1: RAG generation - allow x2 concurrency to keep evaluation fed
rag_semaphore = asyncio.Semaphore(max_async * 2)
# Stage 2: RAGAS evaluation - primary bottleneck
eval_semaphore = asyncio.Semaphore(max_async)