Restructure semaphore control to manage entire evaluation pipeline
• Move rag_semaphore to wrap full function
• Increase RAG concurrency to 2x eval limit
• Prevent memory buildup from slow evals
• Keep eval_semaphore for RAGAS control
(cherry picked from commit e5abe9dd3d)
This commit is contained in:
parent
c459caed26
commit
7896c42fba
1 changed files with 101 additions and 97 deletions
|
|
@ -358,7 +358,7 @@ class RAGEvaluator:
|
|||
Args:
|
||||
idx: Test case index (1-based)
|
||||
test_case: Test case dictionary with question and ground_truth
|
||||
rag_semaphore: Semaphore to control RAG generation concurrency (Stage 1)
|
||||
rag_semaphore: Semaphore to control overall concurrency (covers entire function)
|
||||
eval_semaphore: Semaphore to control RAGAS evaluation concurrency (Stage 2)
|
||||
client: Shared httpx AsyncClient for connection pooling
|
||||
progress_counter: Shared dictionary for progress tracking
|
||||
|
|
@ -366,11 +366,13 @@ class RAGEvaluator:
|
|||
Returns:
|
||||
Evaluation result dictionary
|
||||
"""
|
||||
# rag_semaphore controls the entire evaluation process to prevent
|
||||
# all RAG responses from being generated at once when eval is slow
|
||||
async with rag_semaphore:
|
||||
question = test_case["question"]
|
||||
ground_truth = test_case["ground_truth"]
|
||||
|
||||
# Stage 1: Generate RAG response (controlled by rag_semaphore)
|
||||
async with rag_semaphore:
|
||||
# Stage 1: Generate RAG response
|
||||
try:
|
||||
rag_response = await self.generate_rag_response(
|
||||
question=question, client=client
|
||||
|
|
@ -390,9 +392,6 @@ class RAGEvaluator:
|
|||
# *** CRITICAL FIX: Use actual retrieved contexts, NOT ground_truth ***
|
||||
retrieved_contexts = rag_response["contexts"]
|
||||
|
||||
# DEBUG: Print what was actually retrieved (only in debug mode)
|
||||
logger.debug("📝 Test %s: Retrieved %s contexts", idx, len(retrieved_contexts))
|
||||
|
||||
# Prepare dataset for RAGAS evaluation with CORRECT contexts
|
||||
eval_dataset = Dataset.from_dict(
|
||||
{
|
||||
|
|
@ -425,6 +424,9 @@ class RAGEvaluator:
|
|||
_pbar=pbar,
|
||||
)
|
||||
|
||||
pbar.close()
|
||||
pbar = None
|
||||
|
||||
# Convert to DataFrame (RAGAS v0.3+ API)
|
||||
df = eval_results.to_pandas()
|
||||
|
||||
|
|
@ -447,7 +449,9 @@ class RAGEvaluator:
|
|||
"answer_relevance": float(
|
||||
scores_row.get("answer_relevancy", 0)
|
||||
),
|
||||
"context_recall": float(scores_row.get("context_recall", 0)),
|
||||
"context_recall": float(
|
||||
scores_row.get("context_recall", 0)
|
||||
),
|
||||
"context_precision": float(
|
||||
scores_row.get("context_precision", 0)
|
||||
),
|
||||
|
|
@ -501,8 +505,8 @@ class RAGEvaluator:
|
|||
logger.info("%s", "=" * 70)
|
||||
|
||||
# Create two-stage pipeline semaphores
|
||||
# Stage 1: RAG generation - allow +1 concurrency to keep evaluation fed
|
||||
rag_semaphore = asyncio.Semaphore(max_async + 1)
|
||||
# Stage 1: RAG generation - allow x2 concurrency to keep evaluation fed
|
||||
rag_semaphore = asyncio.Semaphore(max_async * 2)
|
||||
# Stage 2: RAGAS evaluation - primary bottleneck
|
||||
eval_semaphore = asyncio.Semaphore(max_async)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue