From 5ee509e67101509330537009da034bfe02af4cc9 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 12 Jul 2025 05:17:44 +0800 Subject: [PATCH] Fix linting --- lightrag/kg/shared_storage.py | 129 ++++++++++++++++++++++------------ 1 file changed, 85 insertions(+), 44 deletions(-) diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index dd40c700..b83e058c 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -338,42 +338,52 @@ def _release_shared_raw_mp_lock(factory_name: str, key: str): current_time = time.time() if count == 0: _lock_cleanup_data[combined_key] = current_time - + # Update earliest multiprocess cleanup time (only when earlier) - if _earliest_mp_cleanup_time is None or current_time < _earliest_mp_cleanup_time: + if ( + _earliest_mp_cleanup_time is None + or current_time < _earliest_mp_cleanup_time + ): _earliest_mp_cleanup_time = current_time # Efficient cleanup triggering with minimum interval control total_cleanup_len = len(_lock_cleanup_data) if total_cleanup_len >= CLEANUP_THRESHOLD: # Time rollback detection - if _last_mp_cleanup_time is not None and current_time < _last_mp_cleanup_time: + if ( + _last_mp_cleanup_time is not None + and current_time < _last_mp_cleanup_time + ): direct_log( "== mp Lock == Time rollback detected, resetting cleanup time", level="WARNING", enable_output=False, ) _last_mp_cleanup_time = None - + # Check cleanup conditions has_expired_locks = ( - _earliest_mp_cleanup_time is not None and - current_time - _earliest_mp_cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS + _earliest_mp_cleanup_time is not None + and current_time - _earliest_mp_cleanup_time + > CLEANUP_KEYED_LOCKS_AFTER_SECONDS ) - + interval_satisfied = ( - _last_mp_cleanup_time is None or - current_time - _last_mp_cleanup_time > MIN_CLEANUP_INTERVAL_SECONDS + _last_mp_cleanup_time is None + or current_time - _last_mp_cleanup_time > MIN_CLEANUP_INTERVAL_SECONDS ) - + if has_expired_locks and interval_satisfied: try: cleaned_count = 0 new_earliest_time = None - + # Perform cleanup while maintaining the new earliest time for cleanup_key, cleanup_time in list(_lock_cleanup_data.items()): - if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: + if ( + current_time - cleanup_time + > CLEANUP_KEYED_LOCKS_AFTER_SECONDS + ): # Clean expired locks _lock_registry.pop(cleanup_key, None) _lock_registry_count.pop(cleanup_key, None) @@ -381,19 +391,26 @@ def _release_shared_raw_mp_lock(factory_name: str, key: str): cleaned_count += 1 else: # Track the earliest time among remaining locks - if new_earliest_time is None or cleanup_time < new_earliest_time: + if ( + new_earliest_time is None + or cleanup_time < new_earliest_time + ): new_earliest_time = cleanup_time - + # Update state only after successful cleanup _earliest_mp_cleanup_time = new_earliest_time _last_mp_cleanup_time = current_time - + if cleaned_count > 0: - next_cleanup_in = ( - max( - (new_earliest_time + CLEANUP_KEYED_LOCKS_AFTER_SECONDS - current_time) if new_earliest_time else float('inf'), - MIN_CLEANUP_INTERVAL_SECONDS + next_cleanup_in = max( + ( + new_earliest_time + + CLEANUP_KEYED_LOCKS_AFTER_SECONDS + - current_time ) + if new_earliest_time + else float("inf"), + MIN_CLEANUP_INTERVAL_SECONDS, ) direct_log( f"== mp Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired locks, " @@ -401,7 +418,7 @@ def _release_shared_raw_mp_lock(factory_name: str, key: str): enable_output=False, level="INFO", ) - + except Exception as e: direct_log( f"== mp Lock == Cleanup failed: {e}", @@ -436,8 +453,12 @@ class KeyedUnifiedLock: self._mp_locks: Dict[ str, mp.synchronize.Lock ] = {} # multi-process lock proxies - self._earliest_async_cleanup_time: Optional[float] = None # track earliest async cleanup time - self._last_async_cleanup_time: Optional[float] = None # track last async cleanup time for minimum interval + self._earliest_async_cleanup_time: Optional[float] = ( + None # track earliest async cleanup time + ) + self._last_async_cleanup_time: Optional[float] = ( + None # track last async cleanup time for minimum interval + ) def __call__(self, keys: list[str], *, enable_logging: Optional[bool] = None): """ @@ -474,9 +495,12 @@ class KeyedUnifiedLock: current_time = time.time() if count == 0: self._async_lock_cleanup_data[key] = current_time - + # Update earliest async cleanup time (only when earlier) - if self._earliest_async_cleanup_time is None or current_time < self._earliest_async_cleanup_time: + if ( + self._earliest_async_cleanup_time is None + or current_time < self._earliest_async_cleanup_time + ): self._earliest_async_cleanup_time = current_time self._async_lock_count[key] = count @@ -484,33 +508,43 @@ class KeyedUnifiedLock: total_cleanup_len = len(self._async_lock_cleanup_data) if total_cleanup_len >= CLEANUP_THRESHOLD: # Time rollback detection - if self._last_async_cleanup_time is not None and current_time < self._last_async_cleanup_time: + if ( + self._last_async_cleanup_time is not None + and current_time < self._last_async_cleanup_time + ): direct_log( "== async Lock == Time rollback detected, resetting cleanup time", level="WARNING", enable_output=False, ) self._last_async_cleanup_time = None - + # Check cleanup conditions has_expired_locks = ( - self._earliest_async_cleanup_time is not None and - current_time - self._earliest_async_cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS + self._earliest_async_cleanup_time is not None + and current_time - self._earliest_async_cleanup_time + > CLEANUP_KEYED_LOCKS_AFTER_SECONDS ) - + interval_satisfied = ( - self._last_async_cleanup_time is None or - current_time - self._last_async_cleanup_time > MIN_CLEANUP_INTERVAL_SECONDS + self._last_async_cleanup_time is None + or current_time - self._last_async_cleanup_time + > MIN_CLEANUP_INTERVAL_SECONDS ) - + if has_expired_locks and interval_satisfied: try: cleaned_count = 0 new_earliest_time = None - + # Perform cleanup while maintaining the new earliest time - for cleanup_key, cleanup_time in list(self._async_lock_cleanup_data.items()): - if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: + for cleanup_key, cleanup_time in list( + self._async_lock_cleanup_data.items() + ): + if ( + current_time - cleanup_time + > CLEANUP_KEYED_LOCKS_AFTER_SECONDS + ): # Clean expired async locks self._async_lock.pop(cleanup_key) self._async_lock_count.pop(cleanup_key) @@ -518,19 +552,26 @@ class KeyedUnifiedLock: cleaned_count += 1 else: # Track the earliest time among remaining locks - if new_earliest_time is None or cleanup_time < new_earliest_time: + if ( + new_earliest_time is None + or cleanup_time < new_earliest_time + ): new_earliest_time = cleanup_time - + # Update state only after successful cleanup self._earliest_async_cleanup_time = new_earliest_time self._last_async_cleanup_time = current_time - + if cleaned_count > 0: - next_cleanup_in = ( - max( - (new_earliest_time + CLEANUP_KEYED_LOCKS_AFTER_SECONDS - current_time) if new_earliest_time else float('inf'), - MIN_CLEANUP_INTERVAL_SECONDS + next_cleanup_in = max( + ( + new_earliest_time + + CLEANUP_KEYED_LOCKS_AFTER_SECONDS + - current_time ) + if new_earliest_time + else float("inf"), + MIN_CLEANUP_INTERVAL_SECONDS, ) direct_log( f"== async Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired async locks, " @@ -538,7 +579,7 @@ class KeyedUnifiedLock: enable_output=False, level="INFO", ) - + except Exception as e: direct_log( f"== async Lock == Cleanup failed: {e}", @@ -801,7 +842,7 @@ def initialize_share_data(workers: int = 1): # Initialize multiprocess cleanup times _earliest_mp_cleanup_time = None _last_mp_cleanup_time = None - + # Mark as initialized _initialized = True