Implement two-stage pipeline for RAG evaluation with separate semaphores

• Split RAG gen and eval stages
• Add rag_semaphore for stage 1
• Add eval_semaphore for stage 2
• Improve concurrency control
• Update connection pool limits
This commit is contained in:
yangdx 2025-11-05 00:36:09 +08:00
parent d36be1f499
commit 83715a3ac1

View file

@ -347,28 +347,30 @@ class RAGEvaluator:
self,
idx: int,
test_case: Dict[str, str],
semaphore: asyncio.Semaphore,
rag_semaphore: asyncio.Semaphore,
eval_semaphore: asyncio.Semaphore,
client: httpx.AsyncClient,
progress_counter: Dict[str, int],
) -> Dict[str, Any]:
"""
Evaluate a single test case with concurrency control
Evaluate a single test case with two-stage pipeline concurrency control
Args:
idx: Test case index (1-based)
test_case: Test case dictionary with question and ground_truth
semaphore: Semaphore to control concurrency
rag_semaphore: Semaphore to control RAG generation concurrency (Stage 1)
eval_semaphore: Semaphore to control RAGAS evaluation concurrency (Stage 2)
client: Shared httpx AsyncClient for connection pooling
progress_counter: Shared dictionary for progress tracking
Returns:
Evaluation result dictionary
"""
async with semaphore:
question = test_case["question"]
ground_truth = test_case["ground_truth"]
question = test_case["question"]
ground_truth = test_case["ground_truth"]
# Generate RAG response by calling actual LightRAG API
# Stage 1: Generate RAG response (controlled by rag_semaphore)
async with rag_semaphore:
try:
rag_response = await self.generate_rag_response(
question=question, client=client
@ -385,32 +387,31 @@ class RAGEvaluator:
"timestamp": datetime.now().isoformat(),
}
# *** CRITICAL FIX: Use actual retrieved contexts, NOT ground_truth ***
retrieved_contexts = rag_response["contexts"]
# *** CRITICAL FIX: Use actual retrieved contexts, NOT ground_truth ***
retrieved_contexts = rag_response["contexts"]
# DEBUG: Print what was actually retrieved (only in debug mode)
logger.debug(
"📝 Test %s: Retrieved %s contexts", idx, len(retrieved_contexts)
)
# DEBUG: Print what was actually retrieved (only in debug mode)
logger.debug("📝 Test %s: Retrieved %s contexts", idx, len(retrieved_contexts))
# Prepare dataset for RAGAS evaluation with CORRECT contexts
eval_dataset = Dataset.from_dict(
{
"question": [question],
"answer": [rag_response["answer"]],
"contexts": [retrieved_contexts],
"ground_truth": [ground_truth],
}
)
# Prepare dataset for RAGAS evaluation with CORRECT contexts
eval_dataset = Dataset.from_dict(
{
"question": [question],
"answer": [rag_response["answer"]],
"contexts": [retrieved_contexts],
"ground_truth": [ground_truth],
}
)
# Run RAGAS evaluation
# IMPORTANT: Create fresh metric instances for each evaluation to avoid
# concurrent state conflicts when multiple tasks run in parallel
# Stage 2: Run RAGAS evaluation (controlled by eval_semaphore)
# IMPORTANT: Create fresh metric instances for each evaluation to avoid
# concurrent state conflicts when multiple tasks run in parallel
async with eval_semaphore:
pbar = None
try:
# Create standard tqdm progress bar for RAGAS evaluation
pbar = tqdm(total=4, desc=f"Eval-{idx}", leave=True)
eval_results = evaluate(
dataset=eval_dataset,
metrics=[
@ -485,21 +486,25 @@ class RAGEvaluator:
async def evaluate_responses(self) -> List[Dict[str, Any]]:
"""
Evaluate all test cases in parallel and return metrics
Evaluate all test cases in parallel with two-stage pipeline and return metrics
Returns:
List of evaluation results with metrics
"""
# Get evaluation concurrency from environment (default to 1 for serial evaluation)
# Get evaluation concurrency from environment (default to 2 for parallel evaluation)
max_async = int(os.getenv("EVAL_MAX_CONCURRENT", "2"))
logger.info("%s", "=" * 70)
logger.info("🚀 Starting RAGAS Evaluation of LightRAG System")
logger.info("🔧 Concurrent evaluations: %s", max_async)
logger.info("🔧 Two-Stage Pipeline Configuration:")
logger.info(" • RAGAS Evaluation (Stage 2): %s concurrent", max_async)
logger.info("%s", "=" * 70)
# Create semaphore to limit concurrent evaluations
semaphore = asyncio.Semaphore(max_async)
# Create two-stage pipeline semaphores
# Stage 1: RAG generation - allow +1 concurrency to keep evaluation fed
rag_semaphore = asyncio.Semaphore(max_async + 1)
# Stage 2: RAGAS evaluation - primary bottleneck
eval_semaphore = asyncio.Semaphore(max_async)
# Create progress counter (shared across all tasks)
progress_counter = {"completed": 0}
@ -512,20 +517,25 @@ class RAGEvaluator:
read=READ_TIMEOUT_SECONDS,
)
limits = httpx.Limits(
max_connections=max_async * 2, # Allow some buffer
max_keepalive_connections=max_async,
max_connections=(max_async + 1) * 2, # Allow buffer for RAG stage
max_keepalive_connections=max_async + 1,
)
async with httpx.AsyncClient(timeout=timeout, limits=limits) as client:
# Create tasks for all test cases
tasks = [
self.evaluate_single_case(
idx, test_case, semaphore, client, progress_counter
idx,
test_case,
rag_semaphore,
eval_semaphore,
client,
progress_counter,
)
for idx, test_case in enumerate(self.test_cases, 1)
]
# Run all evaluations in parallel (limited by semaphore)
# Run all evaluations in parallel (limited by two-stage semaphores)
results = await asyncio.gather(*tasks)
return list(results)