Fix tqdm progress bar conflicts in concurrent RAG evaluation
• Add position pool for tqdm bars • Serialize tqdm creation with lock • Set leave=False to clear completed bars • Pass position/lock to eval tasks • Import tqdm.auto for better display
This commit is contained in:
parent
e5abe9dd3d
commit
2823f92fb6
1 changed files with 35 additions and 6 deletions
|
|
@ -82,7 +82,7 @@ try:
|
||||||
)
|
)
|
||||||
from ragas.llms import LangchainLLMWrapper
|
from ragas.llms import LangchainLLMWrapper
|
||||||
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
|
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
|
||||||
from tqdm import tqdm
|
from tqdm.auto import tqdm
|
||||||
|
|
||||||
RAGAS_AVAILABLE = True
|
RAGAS_AVAILABLE = True
|
||||||
|
|
||||||
|
|
@ -351,6 +351,8 @@ class RAGEvaluator:
|
||||||
eval_semaphore: asyncio.Semaphore,
|
eval_semaphore: asyncio.Semaphore,
|
||||||
client: httpx.AsyncClient,
|
client: httpx.AsyncClient,
|
||||||
progress_counter: Dict[str, int],
|
progress_counter: Dict[str, int],
|
||||||
|
position_pool: asyncio.Queue,
|
||||||
|
pbar_creation_lock: asyncio.Lock,
|
||||||
) -> Dict[str, Any]:
|
) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
Evaluate a single test case with two-stage pipeline concurrency control
|
Evaluate a single test case with two-stage pipeline concurrency control
|
||||||
|
|
@ -362,6 +364,8 @@ class RAGEvaluator:
|
||||||
eval_semaphore: Semaphore to control RAGAS evaluation concurrency (Stage 2)
|
eval_semaphore: Semaphore to control RAGAS evaluation concurrency (Stage 2)
|
||||||
client: Shared httpx AsyncClient for connection pooling
|
client: Shared httpx AsyncClient for connection pooling
|
||||||
progress_counter: Shared dictionary for progress tracking
|
progress_counter: Shared dictionary for progress tracking
|
||||||
|
position_pool: Queue of available tqdm position indices
|
||||||
|
pbar_creation_lock: Lock to serialize tqdm creation and prevent race conditions
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Evaluation result dictionary
|
Evaluation result dictionary
|
||||||
|
|
@ -407,9 +411,22 @@ class RAGEvaluator:
|
||||||
# concurrent state conflicts when multiple tasks run in parallel
|
# concurrent state conflicts when multiple tasks run in parallel
|
||||||
async with eval_semaphore:
|
async with eval_semaphore:
|
||||||
pbar = None
|
pbar = None
|
||||||
|
position = None
|
||||||
try:
|
try:
|
||||||
# Create standard tqdm progress bar for RAGAS evaluation
|
# Acquire a position from the pool for this tqdm progress bar
|
||||||
pbar = tqdm(total=4, desc=f"Eval-{idx}", leave=True)
|
position = await position_pool.get()
|
||||||
|
|
||||||
|
# Serialize tqdm creation to prevent race conditions
|
||||||
|
# Multiple tasks creating tqdm simultaneously can cause display conflicts
|
||||||
|
async with pbar_creation_lock:
|
||||||
|
# Create tqdm progress bar with assigned position to avoid overlapping
|
||||||
|
# leave=False ensures the progress bar is cleared after completion,
|
||||||
|
# preventing accumulation of completed bars and allowing position reuse
|
||||||
|
pbar = tqdm(
|
||||||
|
total=4, desc=f"Eval-{idx}", position=position, leave=False
|
||||||
|
)
|
||||||
|
# Give tqdm time to initialize and claim its screen position
|
||||||
|
await asyncio.sleep(0.05)
|
||||||
|
|
||||||
eval_results = evaluate(
|
eval_results = evaluate(
|
||||||
dataset=eval_dataset,
|
dataset=eval_dataset,
|
||||||
|
|
@ -424,9 +441,6 @@ class RAGEvaluator:
|
||||||
_pbar=pbar,
|
_pbar=pbar,
|
||||||
)
|
)
|
||||||
|
|
||||||
pbar.close()
|
|
||||||
pbar = None
|
|
||||||
|
|
||||||
# Convert to DataFrame (RAGAS v0.3+ API)
|
# Convert to DataFrame (RAGAS v0.3+ API)
|
||||||
df = eval_results.to_pandas()
|
df = eval_results.to_pandas()
|
||||||
|
|
||||||
|
|
@ -487,6 +501,9 @@ class RAGEvaluator:
|
||||||
# Force close progress bar to ensure completion
|
# Force close progress bar to ensure completion
|
||||||
if pbar is not None:
|
if pbar is not None:
|
||||||
pbar.close()
|
pbar.close()
|
||||||
|
# Release the position back to the pool for reuse
|
||||||
|
if position is not None:
|
||||||
|
await position_pool.put(position)
|
||||||
|
|
||||||
async def evaluate_responses(self) -> List[Dict[str, Any]]:
|
async def evaluate_responses(self) -> List[Dict[str, Any]]:
|
||||||
"""
|
"""
|
||||||
|
|
@ -513,6 +530,16 @@ class RAGEvaluator:
|
||||||
# Create progress counter (shared across all tasks)
|
# Create progress counter (shared across all tasks)
|
||||||
progress_counter = {"completed": 0}
|
progress_counter = {"completed": 0}
|
||||||
|
|
||||||
|
# Create position pool for tqdm progress bars
|
||||||
|
# Positions range from 0 to max_async-1, ensuring no overlapping displays
|
||||||
|
position_pool = asyncio.Queue()
|
||||||
|
for i in range(max_async):
|
||||||
|
await position_pool.put(i)
|
||||||
|
|
||||||
|
# Create lock to serialize tqdm creation and prevent race conditions
|
||||||
|
# This ensures progress bars are created one at a time, avoiding display conflicts
|
||||||
|
pbar_creation_lock = asyncio.Lock()
|
||||||
|
|
||||||
# Create shared HTTP client with connection pooling and proper timeouts
|
# Create shared HTTP client with connection pooling and proper timeouts
|
||||||
# Timeout: 3 minutes for connect, 5 minutes for read (LLM can be slow)
|
# Timeout: 3 minutes for connect, 5 minutes for read (LLM can be slow)
|
||||||
timeout = httpx.Timeout(
|
timeout = httpx.Timeout(
|
||||||
|
|
@ -535,6 +562,8 @@ class RAGEvaluator:
|
||||||
eval_semaphore,
|
eval_semaphore,
|
||||||
client,
|
client,
|
||||||
progress_counter,
|
progress_counter,
|
||||||
|
position_pool,
|
||||||
|
pbar_creation_lock,
|
||||||
)
|
)
|
||||||
for idx, test_case in enumerate(self.test_cases, 1)
|
for idx, test_case in enumerate(self.test_cases, 1)
|
||||||
]
|
]
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue