diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 9871d4aa..dd40c700 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -60,6 +60,12 @@ _registry_guard = None CLEANUP_KEYED_LOCKS_AFTER_SECONDS = 300 # Threshold for triggering cleanup - only clean when pending list exceeds this size CLEANUP_THRESHOLD = 500 +# Minimum interval between cleanup operations in seconds +MIN_CLEANUP_INTERVAL_SECONDS = 30 +# Track the earliest cleanup time for efficient cleanup triggering (multiprocess locks only) +_earliest_mp_cleanup_time: Optional[float] = None +# Track the last cleanup time to enforce minimum interval (multiprocess locks only) +_last_mp_cleanup_time: Optional[float] = None _initialized = None @@ -308,6 +314,8 @@ def _release_shared_raw_mp_lock(factory_name: str, key: str): if not _is_multiprocess: return + global _earliest_mp_cleanup_time, _last_mp_cleanup_time + with _registry_guard: combined_key = _get_combined_key(factory_name, key) raw = _lock_registry.get(combined_key) @@ -330,23 +338,77 @@ def _release_shared_raw_mp_lock(factory_name: str, key: str): current_time = time.time() if count == 0: _lock_cleanup_data[combined_key] = current_time + + # Update earliest multiprocess cleanup time (only when earlier) + if _earliest_mp_cleanup_time is None or current_time < _earliest_mp_cleanup_time: + _earliest_mp_cleanup_time = current_time - # Only perform cleanup when the pending cleanup list exceeds threshold + # Efficient cleanup triggering with minimum interval control total_cleanup_len = len(_lock_cleanup_data) if total_cleanup_len >= CLEANUP_THRESHOLD: - cleaned_count = 0 - for cleanup_key, cleanup_time in list(_lock_cleanup_data.items()): - if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: - _lock_registry.pop(cleanup_key, None) - _lock_registry_count.pop(cleanup_key, None) - _lock_cleanup_data.pop(cleanup_key, None) - cleaned_count += 1 - if cleaned_count > 0: + # Time rollback detection + if _last_mp_cleanup_time is not None and current_time < _last_mp_cleanup_time: direct_log( - f"== mp Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired locks", + "== mp Lock == Time rollback detected, resetting cleanup time", + level="WARNING", enable_output=False, - level="INFO", ) + _last_mp_cleanup_time = None + + # Check cleanup conditions + has_expired_locks = ( + _earliest_mp_cleanup_time is not None and + current_time - _earliest_mp_cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS + ) + + interval_satisfied = ( + _last_mp_cleanup_time is None or + current_time - _last_mp_cleanup_time > MIN_CLEANUP_INTERVAL_SECONDS + ) + + if has_expired_locks and interval_satisfied: + try: + cleaned_count = 0 + new_earliest_time = None + + # Perform cleanup while maintaining the new earliest time + for cleanup_key, cleanup_time in list(_lock_cleanup_data.items()): + if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: + # Clean expired locks + _lock_registry.pop(cleanup_key, None) + _lock_registry_count.pop(cleanup_key, None) + _lock_cleanup_data.pop(cleanup_key, None) + cleaned_count += 1 + else: + # Track the earliest time among remaining locks + if new_earliest_time is None or cleanup_time < new_earliest_time: + new_earliest_time = cleanup_time + + # Update state only after successful cleanup + _earliest_mp_cleanup_time = new_earliest_time + _last_mp_cleanup_time = current_time + + if cleaned_count > 0: + next_cleanup_in = ( + max( + (new_earliest_time + CLEANUP_KEYED_LOCKS_AFTER_SECONDS - current_time) if new_earliest_time else float('inf'), + MIN_CLEANUP_INTERVAL_SECONDS + ) + ) + direct_log( + f"== mp Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired locks, " + f"next cleanup in {next_cleanup_in:.1f}s", + enable_output=False, + level="INFO", + ) + + except Exception as e: + direct_log( + f"== mp Lock == Cleanup failed: {e}", + level="ERROR", + enable_output=False, + ) + # Don't update _last_mp_cleanup_time to allow retry class KeyedUnifiedLock: @@ -374,6 +436,8 @@ class KeyedUnifiedLock: self._mp_locks: Dict[ str, mp.synchronize.Lock ] = {} # multi-process lock proxies + self._earliest_async_cleanup_time: Optional[float] = None # track earliest async cleanup time + self._last_async_cleanup_time: Optional[float] = None # track last async cleanup time for minimum interval def __call__(self, keys: list[str], *, enable_logging: Optional[bool] = None): """ @@ -410,26 +474,78 @@ class KeyedUnifiedLock: current_time = time.time() if count == 0: self._async_lock_cleanup_data[key] = current_time + + # Update earliest async cleanup time (only when earlier) + if self._earliest_async_cleanup_time is None or current_time < self._earliest_async_cleanup_time: + self._earliest_async_cleanup_time = current_time self._async_lock_count[key] = count - # Only perform cleanup when the pending cleanup list exceeds threshold + # Efficient cleanup triggering with minimum interval control total_cleanup_len = len(self._async_lock_cleanup_data) if total_cleanup_len >= CLEANUP_THRESHOLD: - cleaned_count = 0 - for cleanup_key, cleanup_time in list( - self._async_lock_cleanup_data.items() - ): - if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: - self._async_lock.pop(cleanup_key) - self._async_lock_count.pop(cleanup_key) - self._async_lock_cleanup_data.pop(cleanup_key) - cleaned_count += 1 - if cleaned_count > 0: + # Time rollback detection + if self._last_async_cleanup_time is not None and current_time < self._last_async_cleanup_time: direct_log( - f"== async Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired async locks", + "== async Lock == Time rollback detected, resetting cleanup time", + level="WARNING", enable_output=False, - level="INFO", ) + self._last_async_cleanup_time = None + + # Check cleanup conditions + has_expired_locks = ( + self._earliest_async_cleanup_time is not None and + current_time - self._earliest_async_cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS + ) + + interval_satisfied = ( + self._last_async_cleanup_time is None or + current_time - self._last_async_cleanup_time > MIN_CLEANUP_INTERVAL_SECONDS + ) + + if has_expired_locks and interval_satisfied: + try: + cleaned_count = 0 + new_earliest_time = None + + # Perform cleanup while maintaining the new earliest time + for cleanup_key, cleanup_time in list(self._async_lock_cleanup_data.items()): + if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: + # Clean expired async locks + self._async_lock.pop(cleanup_key) + self._async_lock_count.pop(cleanup_key) + self._async_lock_cleanup_data.pop(cleanup_key) + cleaned_count += 1 + else: + # Track the earliest time among remaining locks + if new_earliest_time is None or cleanup_time < new_earliest_time: + new_earliest_time = cleanup_time + + # Update state only after successful cleanup + self._earliest_async_cleanup_time = new_earliest_time + self._last_async_cleanup_time = current_time + + if cleaned_count > 0: + next_cleanup_in = ( + max( + (new_earliest_time + CLEANUP_KEYED_LOCKS_AFTER_SECONDS - current_time) if new_earliest_time else float('inf'), + MIN_CLEANUP_INTERVAL_SECONDS + ) + ) + direct_log( + f"== async Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired async locks, " + f"next cleanup in {next_cleanup_in:.1f}s", + enable_output=False, + level="INFO", + ) + + except Exception as e: + direct_log( + f"== async Lock == Cleanup failed: {e}", + level="ERROR", + enable_output=False, + ) + # Don't update _last_async_cleanup_time to allow retry def _get_lock_for_key(self, key: str, enable_logging: bool = False) -> UnifiedLock: # 1. get (or create) the per‑process async gate for this key @@ -620,7 +736,9 @@ def initialize_share_data(workers: int = 1): _initialized, \ _update_flags, \ _async_locks, \ - _graph_db_lock_keyed + _graph_db_lock_keyed, \ + _earliest_mp_cleanup_time, \ + _last_mp_cleanup_time # Check if already initialized if _initialized: @@ -680,6 +798,10 @@ def initialize_share_data(workers: int = 1): ) direct_log(f"Process {os.getpid()} Shared-Data created for Single Process") + # Initialize multiprocess cleanup times + _earliest_mp_cleanup_time = None + _last_mp_cleanup_time = None + # Mark as initialized _initialized = True