diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index d5780f2e..c9e26614 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -265,13 +265,13 @@ def _get_or_create_shared_raw_mp_lock(factory_name: str, key: str) -> Optional[m if raw is None: raw = _manager.Lock() _lock_registry[combined_key] = raw - _lock_registry_count[combined_key] = 0 + _lock_registry_count[combined_key] = 1 # 修复:新锁初始化为1,与释放逻辑保持一致 else: if count is None: raise RuntimeError(f"Shared-Data lock registry for {factory_name} is corrupted for key {key}") count += 1 _lock_registry_count[combined_key] = count - if count == 1 and combined_key in _lock_cleanup_data: + if count == 1 and combined_key in _lock_cleanup_data: # 把再次使用的锁添剔除出待清理字典 _lock_cleanup_data.pop(combined_key) return raw @@ -292,18 +292,20 @@ def _release_shared_raw_mp_lock(factory_name: str, key: str): count -= 1 if count < 0: - raise RuntimeError(f"Attempting to remove lock for {key} but it is not in the registry") - else: - _lock_registry_count[combined_key] = count + raise RuntimeError(f"Attempting to release lock for {key} more times than it was acquired") + _lock_registry_count[combined_key] = count + + current_time = time.time() if count == 0: - _lock_cleanup_data[combined_key] = time.time() + _lock_cleanup_data[combined_key] = current_time - for combined_key, value in list(_lock_cleanup_data.items()): - if time.time() - value > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: - _lock_registry.pop(combined_key) - _lock_registry_count.pop(combined_key) - _lock_cleanup_data.pop(combined_key) + # 清理过期的锁 + for cleanup_key, cleanup_time in list(_lock_cleanup_data.items()): + if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: + _lock_registry.pop(cleanup_key, None) + _lock_registry_count.pop(cleanup_key, None) + _lock_cleanup_data.pop(cleanup_key, None) # ───────────────────────────────────────────────────────────────────────────── @@ -355,15 +357,18 @@ class KeyedUnifiedLock: def _release_async_lock(self, key: str): count = self._async_lock_count.get(key, 0) count -= 1 + + current_time = time.time() # 优化:只调用一次 time.time() if count == 0: - self._async_lock_cleanup_data[key] = time.time() + self._async_lock_cleanup_data[key] = current_time self._async_lock_count[key] = count - for key, value in list(self._async_lock_cleanup_data.items()): - if time.time() - value > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: - self._async_lock.pop(key) - self._async_lock_count.pop(key) - self._async_lock_cleanup_data.pop(key) + # 使用缓存的时间戳进行清理,避免在循环中重复调用 time.time() + for cleanup_key, cleanup_time in list(self._async_lock_cleanup_data.items()): + if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: + self._async_lock.pop(cleanup_key) + self._async_lock_count.pop(cleanup_key) + self._async_lock_cleanup_data.pop(cleanup_key) def _get_lock_for_key(self, key: str, enable_logging: bool = False) -> UnifiedLock: diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index f7b1dcff..c07c6a53 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1094,9 +1094,8 @@ class LightRAG: } ) - # Semphore is released here - # Concurrency is controlled by graph db lock for individual entities and relationships - + # Semphore is released here + # Concurrency is controlled by graph db lock for individual entities and relationships if file_extraction_stage_ok: try: # Get chunk_results from entity_relation_task