diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 23e6f575..43614a93 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -469,6 +469,7 @@ class LightRAG: self.embedding_func = priority_limit_async_func_call( self.embedding_func_max_async, llm_timeout=self.default_embedding_timeout, + queue_name="Embedding func:", )(self.embedding_func) # Initialize all storages @@ -565,6 +566,7 @@ class LightRAG: self.llm_model_func = priority_limit_async_func_call( self.llm_model_max_async, llm_timeout=self.default_llm_timeout, + queue_name="LLM func:", )( partial( self.llm_model_func, # type: ignore diff --git a/lightrag/utils.py b/lightrag/utils.py index 9dc5a5b2..ab4787c4 100644 --- a/lightrag/utils.py +++ b/lightrag/utils.py @@ -374,6 +374,7 @@ def priority_limit_async_func_call( max_task_duration: float = None, max_queue_size: int = 1000, cleanup_timeout: float = 2.0, + queue_name: str = "limit_async", ): """ Enhanced priority-limited asynchronous function call decorator with robust timeout handling @@ -391,6 +392,7 @@ def priority_limit_async_func_call( max_execution_timeout: Maximum time for worker to execute function (defaults to llm_timeout + 30s) max_task_duration: Maximum time before health check intervenes (defaults to llm_timeout + 60s) cleanup_timeout: Maximum time to wait for cleanup operations (defaults to 2.0s) + queue_name: Optional queue name for logging identification (defaults to "limit_async") Returns: Decorator function @@ -482,7 +484,7 @@ def priority_limit_async_func_call( except asyncio.TimeoutError: # Worker-level timeout (max_execution_timeout exceeded) logger.warning( - f"limit_async: Worker timeout for task {task_id} after {max_execution_timeout}s" + f"{queue_name}: Worker timeout for task {task_id} after {max_execution_timeout}s" ) if not task_state.future.done(): task_state.future.set_exception( @@ -495,12 +497,12 @@ def priority_limit_async_func_call( if not task_state.future.done(): task_state.future.cancel() logger.debug( - f"limit_async: Task {task_id} cancelled during execution" + f"{queue_name}: Task {task_id} cancelled during execution" ) except Exception as e: # Function execution error logger.error( - f"limit_async: Error in decorated function for task {task_id}: {str(e)}" + f"{queue_name}: Error in decorated function for task {task_id}: {str(e)}" ) if not task_state.future.done(): task_state.future.set_exception(e) @@ -512,10 +514,12 @@ def priority_limit_async_func_call( except Exception as e: # Critical error in worker loop - logger.error(f"limit_async: Critical error in worker: {str(e)}") + logger.error( + f"{queue_name}: Critical error in worker: {str(e)}" + ) await asyncio.sleep(0.1) finally: - logger.debug("limit_async: Worker exiting") + logger.debug(f"{queue_name}: Worker exiting") async def enhanced_health_check(): """Enhanced health check with stuck task detection and recovery""" @@ -549,7 +553,7 @@ def priority_limit_async_func_call( # Force cleanup of stuck tasks for task_id, execution_duration in stuck_tasks: logger.warning( - f"limit_async: Detected stuck task {task_id} (execution time: {execution_duration:.1f}s), forcing cleanup" + f"{queue_name}: Detected stuck task {task_id} (execution time: {execution_duration:.1f}s), forcing cleanup" ) async with task_states_lock: if task_id in task_states: @@ -572,7 +576,7 @@ def priority_limit_async_func_call( if workers_needed > 0: logger.info( - f"limit_async: Creating {workers_needed} new workers" + f"{queue_name}: Creating {workers_needed} new workers" ) new_tasks = set() for _ in range(workers_needed): @@ -582,9 +586,9 @@ def priority_limit_async_func_call( tasks.update(new_tasks) except Exception as e: - logger.error(f"limit_async: Error in enhanced health check: {str(e)}") + logger.error(f"{queue_name}: Error in enhanced health check: {str(e)}") finally: - logger.debug("limit_async: Enhanced health check task exiting") + logger.debug(f"{queue_name}: Enhanced health check task exiting") initialized = False async def ensure_workers(): @@ -601,7 +605,7 @@ def priority_limit_async_func_call( if reinit_count > 0: reinit_count += 1 logger.warning( - f"limit_async: Reinitializing system (count: {reinit_count})" + f"{queue_name}: Reinitializing system (count: {reinit_count})" ) else: reinit_count = 1 @@ -614,7 +618,7 @@ def priority_limit_async_func_call( active_tasks_count = len(tasks) if active_tasks_count > 0 and reinit_count > 1: logger.warning( - f"limit_async: {active_tasks_count} tasks still running during reinitialization" + f"{queue_name}: {active_tasks_count} tasks still running during reinitialization" ) # Create worker tasks @@ -641,12 +645,12 @@ def priority_limit_async_func_call( f" (Timeouts: {', '.join(timeout_info)})" if timeout_info else "" ) logger.info( - f"limit_async: {workers_needed} new workers initialized {timeout_str}" + f"{queue_name}: {workers_needed} new workers initialized {timeout_str}" ) async def shutdown(): """Gracefully shut down all workers and cleanup resources""" - logger.info("limit_async: Shutting down priority queue workers") + logger.info(f"{queue_name}: Shutting down priority queue workers") shutdown_event.set() @@ -667,7 +671,7 @@ def priority_limit_async_func_call( await asyncio.wait_for(queue.join(), timeout=5.0) except asyncio.TimeoutError: logger.warning( - "limit_async: Timeout waiting for queue to empty during shutdown" + f"{queue_name}: Timeout waiting for queue to empty during shutdown" ) # Cancel worker tasks @@ -687,7 +691,7 @@ def priority_limit_async_func_call( except asyncio.CancelledError: pass - logger.info("limit_async: Priority queue workers shutdown complete") + logger.info(f"{queue_name}: Priority queue workers shutdown complete") @wraps(func) async def wait_func( @@ -750,7 +754,7 @@ def priority_limit_async_func_call( ) except asyncio.TimeoutError: raise QueueFullError( - f"Queue full, timeout after {_queue_timeout} seconds" + f"{queue_name}: Queue full, timeout after {_queue_timeout} seconds" ) except Exception as e: # Clean up on queue error @@ -785,14 +789,14 @@ def priority_limit_async_func_call( await asyncio.sleep(0.1) raise TimeoutError( - f"limit_async: User timeout after {_timeout} seconds" + f"{queue_name}: User timeout after {_timeout} seconds" ) except WorkerTimeoutError as e: # This is Worker-level timeout, directly propagate exception information - raise TimeoutError(f"limit_async: {str(e)}") + raise TimeoutError(f"{queue_name}: {str(e)}") except HealthCheckTimeoutError as e: # This is Health Check-level timeout, directly propagate exception information - raise TimeoutError(f"limit_async: {str(e)}") + raise TimeoutError(f"{queue_name}: {str(e)}") finally: # Ensure cleanup