diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 0f961687..fe7e7e00 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -5,21 +5,42 @@ import multiprocessing as mp from multiprocessing.synchronize import Lock as ProcessLock from multiprocessing import Manager import time +import logging from typing import Any, Dict, List, Optional, Union, TypeVar, Generic # Define a direct print function for critical logs that must be visible in all processes -def direct_log(message, level="INFO", enable_output: bool = True): +def direct_log(message, enable_output: bool = True, level: str = "DEBUG"): """ Log a message directly to stderr to ensure visibility in all processes, including the Gunicorn master process. Args: message: The message to log - level: Log level (default: "INFO") + level: Log level (default: "DEBUG") enable_output: Whether to actually output the log (default: True) """ - if enable_output: + # Get the current logger level from the lightrag logger + try: + from lightrag.utils import logger + + current_level = logger.getEffectiveLevel() + except ImportError: + # Fallback if lightrag.utils is not available + current_level = logging.INFO + + # Convert string level to numeric level for comparison + level_mapping = { + "DEBUG": logging.DEBUG, # 10 + "INFO": logging.INFO, # 20 + "WARNING": logging.WARNING, # 30 + "ERROR": logging.ERROR, # 40 + "CRITICAL": logging.CRITICAL, # 50 + } + message_level = level_mapping.get(level.upper(), logging.DEBUG) + + # print(f"Diret_log: {level.upper()} {message_level} ? {current_level}", file=sys.stderr, flush=True) + if enable_output or (message_level >= current_level): print(f"{level}: {message}", file=sys.stderr, flush=True) @@ -67,11 +88,7 @@ def inc_debug_n_locks_acquired(): global _debug_n_locks_acquired if DEBUG_LOCKS: _debug_n_locks_acquired += 1 - print( - f"DEBUG: Keyed Lock acquired, total: {_debug_n_locks_acquired:>5}", - end="\r", - flush=True, - ) + print(f"DEBUG: Keyed Lock acquired, total: {_debug_n_locks_acquired:>5}") def dec_debug_n_locks_acquired(): @@ -79,11 +96,7 @@ def dec_debug_n_locks_acquired(): if DEBUG_LOCKS: if _debug_n_locks_acquired > 0: _debug_n_locks_acquired -= 1 - print( - f"DEBUG: Keyed Lock released, total: {_debug_n_locks_acquired:>5}", - end="\r", - flush=True, - ) + print(f"DEBUG: Keyed Lock released, total: {_debug_n_locks_acquired:>5}") else: raise RuntimeError("Attempting to release lock when no locks are acquired") @@ -113,17 +126,8 @@ class UnifiedLock(Generic[T]): async def __aenter__(self) -> "UnifiedLock[T]": try: - # direct_log( - # f"== Lock == Process {self._pid}: Acquiring lock '{self._name}' (async={self._is_async})", - # enable_output=self._enable_logging, - # ) - # If in multiprocess mode and async lock exists, acquire it first if not self._is_async and self._async_lock is not None: - # direct_log( - # f"== Lock == Process {self._pid}: Acquiring async lock for '{self._name}'", - # enable_output=self._enable_logging, - # ) await self._async_lock.acquire() direct_log( f"== Lock == Process {self._pid}: Async lock for '{self._name}' acquired", @@ -328,12 +332,20 @@ def _release_shared_raw_mp_lock(factory_name: str, key: str): _lock_cleanup_data[combined_key] = current_time # Only perform cleanup when the pending cleanup list exceeds threshold - if len(_lock_cleanup_data) > CLEANUP_THRESHOLD: + total_cleanup_len = len(_lock_cleanup_data) + if total_cleanup_len >= CLEANUP_THRESHOLD: + cleaned_count = 0 for cleanup_key, cleanup_time in list(_lock_cleanup_data.items()): if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: _lock_registry.pop(cleanup_key, None) _lock_registry_count.pop(cleanup_key, None) _lock_cleanup_data.pop(cleanup_key, None) + cleaned_count += 1 + direct_log( + f"== mp Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired locks", + enable_output=False, + level="DEBUG", + ) class KeyedUnifiedLock: @@ -400,7 +412,9 @@ class KeyedUnifiedLock: self._async_lock_count[key] = count # Only perform cleanup when the pending cleanup list exceeds threshold - if len(self._async_lock_cleanup_data) > CLEANUP_THRESHOLD: + total_cleanup_len = len(self._async_lock_cleanup_data) + if total_cleanup_len >= CLEANUP_THRESHOLD: + cleaned_count = 0 for cleanup_key, cleanup_time in list( self._async_lock_cleanup_data.items() ): @@ -408,6 +422,12 @@ class KeyedUnifiedLock: self._async_lock.pop(cleanup_key) self._async_lock_count.pop(cleanup_key) self._async_lock_cleanup_data.pop(cleanup_key) + cleaned_count += 1 + direct_log( + f"== async Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired async locks", + enable_output=False, + level="DEBUG", + ) def _get_lock_for_key(self, key: str, enable_logging: bool = False) -> UnifiedLock: # 1. get (or create) the per‑process async gate for this key @@ -626,7 +646,7 @@ def initialize_share_data(workers: int = 1): _update_flags = _manager.dict() _graph_db_lock_keyed = KeyedUnifiedLock( - factory_name="graph_db_lock", + factory_name="GraphDB", ) # Initialize async locks for multiprocess mode @@ -654,7 +674,7 @@ def initialize_share_data(workers: int = 1): _async_locks = None # No need for async locks in single process mode _graph_db_lock_keyed = KeyedUnifiedLock( - factory_name="graph_db_lock", + factory_name="GraphDB", ) direct_log(f"Process {os.getpid()} Shared-Data created for Single Process") diff --git a/lightrag/operate.py b/lightrag/operate.py index 84e99c14..2008cb51 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -1143,19 +1143,18 @@ async def merge_nodes_and_edges( total_relations_count = len(all_edges) # Merge nodes and edges + log_message = f"Merging stage {current_file_number}/{total_files}: {file_path}" + logger.info(log_message) async with pipeline_status_lock: - log_message = f"Merging stage {current_file_number}/{total_files}: {file_path}" - logger.info(log_message) pipeline_status["latest_message"] = log_message pipeline_status["history_messages"].append(log_message) # Process and update all entities and relationships in parallel - log_message = f"Updating {total_entities_count} entities and {total_relations_count} relations {current_file_number}/{total_files}: {file_path}" + log_message = f"Processing: {total_entities_count} entities and {total_relations_count} relations" logger.info(log_message) - if pipeline_status is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) + async with pipeline_status_lock: + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) # Get max async tasks limit from global_config for semaphore control llm_model_max_async = global_config.get("llm_model_max_async", 4)