Add queue_name parameter to priority_limit_async_func_call for better logging

• Add queue_name parameter to decorator
• Update all log messages with queue names
• Pass specific names for LLM and embedding
This commit is contained in:
yangdx 2025-08-31 23:47:22 +08:00
parent 57fe1403c3
commit 1a015a7015
2 changed files with 25 additions and 19 deletions

View file

@ -469,6 +469,7 @@ class LightRAG:
self.embedding_func = priority_limit_async_func_call(
self.embedding_func_max_async,
llm_timeout=self.default_embedding_timeout,
queue_name="Embedding func:",
)(self.embedding_func)
# Initialize all storages
@ -565,6 +566,7 @@ class LightRAG:
self.llm_model_func = priority_limit_async_func_call(
self.llm_model_max_async,
llm_timeout=self.default_llm_timeout,
queue_name="LLM func:",
)(
partial(
self.llm_model_func, # type: ignore

View file

@ -374,6 +374,7 @@ def priority_limit_async_func_call(
max_task_duration: float = None,
max_queue_size: int = 1000,
cleanup_timeout: float = 2.0,
queue_name: str = "limit_async",
):
"""
Enhanced priority-limited asynchronous function call decorator with robust timeout handling
@ -391,6 +392,7 @@ def priority_limit_async_func_call(
max_execution_timeout: Maximum time for worker to execute function (defaults to llm_timeout + 30s)
max_task_duration: Maximum time before health check intervenes (defaults to llm_timeout + 60s)
cleanup_timeout: Maximum time to wait for cleanup operations (defaults to 2.0s)
queue_name: Optional queue name for logging identification (defaults to "limit_async")
Returns:
Decorator function
@ -482,7 +484,7 @@ def priority_limit_async_func_call(
except asyncio.TimeoutError:
# Worker-level timeout (max_execution_timeout exceeded)
logger.warning(
f"limit_async: Worker timeout for task {task_id} after {max_execution_timeout}s"
f"{queue_name}: Worker timeout for task {task_id} after {max_execution_timeout}s"
)
if not task_state.future.done():
task_state.future.set_exception(
@ -495,12 +497,12 @@ def priority_limit_async_func_call(
if not task_state.future.done():
task_state.future.cancel()
logger.debug(
f"limit_async: Task {task_id} cancelled during execution"
f"{queue_name}: Task {task_id} cancelled during execution"
)
except Exception as e:
# Function execution error
logger.error(
f"limit_async: Error in decorated function for task {task_id}: {str(e)}"
f"{queue_name}: Error in decorated function for task {task_id}: {str(e)}"
)
if not task_state.future.done():
task_state.future.set_exception(e)
@ -512,10 +514,12 @@ def priority_limit_async_func_call(
except Exception as e:
# Critical error in worker loop
logger.error(f"limit_async: Critical error in worker: {str(e)}")
logger.error(
f"{queue_name}: Critical error in worker: {str(e)}"
)
await asyncio.sleep(0.1)
finally:
logger.debug("limit_async: Worker exiting")
logger.debug(f"{queue_name}: Worker exiting")
async def enhanced_health_check():
"""Enhanced health check with stuck task detection and recovery"""
@ -549,7 +553,7 @@ def priority_limit_async_func_call(
# Force cleanup of stuck tasks
for task_id, execution_duration in stuck_tasks:
logger.warning(
f"limit_async: Detected stuck task {task_id} (execution time: {execution_duration:.1f}s), forcing cleanup"
f"{queue_name}: Detected stuck task {task_id} (execution time: {execution_duration:.1f}s), forcing cleanup"
)
async with task_states_lock:
if task_id in task_states:
@ -572,7 +576,7 @@ def priority_limit_async_func_call(
if workers_needed > 0:
logger.info(
f"limit_async: Creating {workers_needed} new workers"
f"{queue_name}: Creating {workers_needed} new workers"
)
new_tasks = set()
for _ in range(workers_needed):
@ -582,9 +586,9 @@ def priority_limit_async_func_call(
tasks.update(new_tasks)
except Exception as e:
logger.error(f"limit_async: Error in enhanced health check: {str(e)}")
logger.error(f"{queue_name}: Error in enhanced health check: {str(e)}")
finally:
logger.debug("limit_async: Enhanced health check task exiting")
logger.debug(f"{queue_name}: Enhanced health check task exiting")
initialized = False
async def ensure_workers():
@ -601,7 +605,7 @@ def priority_limit_async_func_call(
if reinit_count > 0:
reinit_count += 1
logger.warning(
f"limit_async: Reinitializing system (count: {reinit_count})"
f"{queue_name}: Reinitializing system (count: {reinit_count})"
)
else:
reinit_count = 1
@ -614,7 +618,7 @@ def priority_limit_async_func_call(
active_tasks_count = len(tasks)
if active_tasks_count > 0 and reinit_count > 1:
logger.warning(
f"limit_async: {active_tasks_count} tasks still running during reinitialization"
f"{queue_name}: {active_tasks_count} tasks still running during reinitialization"
)
# Create worker tasks
@ -641,12 +645,12 @@ def priority_limit_async_func_call(
f" (Timeouts: {', '.join(timeout_info)})" if timeout_info else ""
)
logger.info(
f"limit_async: {workers_needed} new workers initialized {timeout_str}"
f"{queue_name}: {workers_needed} new workers initialized {timeout_str}"
)
async def shutdown():
"""Gracefully shut down all workers and cleanup resources"""
logger.info("limit_async: Shutting down priority queue workers")
logger.info(f"{queue_name}: Shutting down priority queue workers")
shutdown_event.set()
@ -667,7 +671,7 @@ def priority_limit_async_func_call(
await asyncio.wait_for(queue.join(), timeout=5.0)
except asyncio.TimeoutError:
logger.warning(
"limit_async: Timeout waiting for queue to empty during shutdown"
f"{queue_name}: Timeout waiting for queue to empty during shutdown"
)
# Cancel worker tasks
@ -687,7 +691,7 @@ def priority_limit_async_func_call(
except asyncio.CancelledError:
pass
logger.info("limit_async: Priority queue workers shutdown complete")
logger.info(f"{queue_name}: Priority queue workers shutdown complete")
@wraps(func)
async def wait_func(
@ -750,7 +754,7 @@ def priority_limit_async_func_call(
)
except asyncio.TimeoutError:
raise QueueFullError(
f"Queue full, timeout after {_queue_timeout} seconds"
f"{queue_name}: Queue full, timeout after {_queue_timeout} seconds"
)
except Exception as e:
# Clean up on queue error
@ -785,14 +789,14 @@ def priority_limit_async_func_call(
await asyncio.sleep(0.1)
raise TimeoutError(
f"limit_async: User timeout after {_timeout} seconds"
f"{queue_name}: User timeout after {_timeout} seconds"
)
except WorkerTimeoutError as e:
# This is Worker-level timeout, directly propagate exception information
raise TimeoutError(f"limit_async: {str(e)}")
raise TimeoutError(f"{queue_name}: {str(e)}")
except HealthCheckTimeoutError as e:
# This is Health Check-level timeout, directly propagate exception information
raise TimeoutError(f"limit_async: {str(e)}")
raise TimeoutError(f"{queue_name}: {str(e)}")
finally:
# Ensure cleanup