From 2823f92fb6ee63125dc8ab037828883b45c149b6 Mon Sep 17 00:00:00 2001 From: yangdx Date: Wed, 5 Nov 2025 02:04:13 +0800 Subject: [PATCH] Fix tqdm progress bar conflicts in concurrent RAG evaluation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Add position pool for tqdm bars • Serialize tqdm creation with lock • Set leave=False to clear completed bars • Pass position/lock to eval tasks • Import tqdm.auto for better display --- lightrag/evaluation/eval_rag_quality.py | 41 +++++++++++++++++++++---- 1 file changed, 35 insertions(+), 6 deletions(-) diff --git a/lightrag/evaluation/eval_rag_quality.py b/lightrag/evaluation/eval_rag_quality.py index 0d9633b4..0338a4f0 100644 --- a/lightrag/evaluation/eval_rag_quality.py +++ b/lightrag/evaluation/eval_rag_quality.py @@ -82,7 +82,7 @@ try: ) from ragas.llms import LangchainLLMWrapper from langchain_openai import ChatOpenAI, OpenAIEmbeddings - from tqdm import tqdm + from tqdm.auto import tqdm RAGAS_AVAILABLE = True @@ -351,6 +351,8 @@ class RAGEvaluator: eval_semaphore: asyncio.Semaphore, client: httpx.AsyncClient, progress_counter: Dict[str, int], + position_pool: asyncio.Queue, + pbar_creation_lock: asyncio.Lock, ) -> Dict[str, Any]: """ Evaluate a single test case with two-stage pipeline concurrency control @@ -362,6 +364,8 @@ class RAGEvaluator: eval_semaphore: Semaphore to control RAGAS evaluation concurrency (Stage 2) client: Shared httpx AsyncClient for connection pooling progress_counter: Shared dictionary for progress tracking + position_pool: Queue of available tqdm position indices + pbar_creation_lock: Lock to serialize tqdm creation and prevent race conditions Returns: Evaluation result dictionary @@ -407,9 +411,22 @@ class RAGEvaluator: # concurrent state conflicts when multiple tasks run in parallel async with eval_semaphore: pbar = None + position = None try: - # Create standard tqdm progress bar for RAGAS evaluation - pbar = tqdm(total=4, desc=f"Eval-{idx}", leave=True) + # Acquire a position from the pool for this tqdm progress bar + position = await position_pool.get() + + # Serialize tqdm creation to prevent race conditions + # Multiple tasks creating tqdm simultaneously can cause display conflicts + async with pbar_creation_lock: + # Create tqdm progress bar with assigned position to avoid overlapping + # leave=False ensures the progress bar is cleared after completion, + # preventing accumulation of completed bars and allowing position reuse + pbar = tqdm( + total=4, desc=f"Eval-{idx}", position=position, leave=False + ) + # Give tqdm time to initialize and claim its screen position + await asyncio.sleep(0.05) eval_results = evaluate( dataset=eval_dataset, @@ -424,9 +441,6 @@ class RAGEvaluator: _pbar=pbar, ) - pbar.close() - pbar = None - # Convert to DataFrame (RAGAS v0.3+ API) df = eval_results.to_pandas() @@ -487,6 +501,9 @@ class RAGEvaluator: # Force close progress bar to ensure completion if pbar is not None: pbar.close() + # Release the position back to the pool for reuse + if position is not None: + await position_pool.put(position) async def evaluate_responses(self) -> List[Dict[str, Any]]: """ @@ -513,6 +530,16 @@ class RAGEvaluator: # Create progress counter (shared across all tasks) progress_counter = {"completed": 0} + # Create position pool for tqdm progress bars + # Positions range from 0 to max_async-1, ensuring no overlapping displays + position_pool = asyncio.Queue() + for i in range(max_async): + await position_pool.put(i) + + # Create lock to serialize tqdm creation and prevent race conditions + # This ensures progress bars are created one at a time, avoiding display conflicts + pbar_creation_lock = asyncio.Lock() + # Create shared HTTP client with connection pooling and proper timeouts # Timeout: 3 minutes for connect, 5 minutes for read (LLM can be slow) timeout = httpx.Timeout( @@ -535,6 +562,8 @@ class RAGEvaluator: eval_semaphore, client, progress_counter, + position_pool, + pbar_creation_lock, ) for idx, test_case in enumerate(self.test_cases, 1) ]