Fix initial count error for multi-process lock with key
This commit is contained in:
parent
c47747da9e
commit
3afdd1b67c
2 changed files with 24 additions and 20 deletions
|
|
@ -265,13 +265,13 @@ def _get_or_create_shared_raw_mp_lock(factory_name: str, key: str) -> Optional[m
|
||||||
if raw is None:
|
if raw is None:
|
||||||
raw = _manager.Lock()
|
raw = _manager.Lock()
|
||||||
_lock_registry[combined_key] = raw
|
_lock_registry[combined_key] = raw
|
||||||
_lock_registry_count[combined_key] = 0
|
_lock_registry_count[combined_key] = 1 # 修复:新锁初始化为1,与释放逻辑保持一致
|
||||||
else:
|
else:
|
||||||
if count is None:
|
if count is None:
|
||||||
raise RuntimeError(f"Shared-Data lock registry for {factory_name} is corrupted for key {key}")
|
raise RuntimeError(f"Shared-Data lock registry for {factory_name} is corrupted for key {key}")
|
||||||
count += 1
|
count += 1
|
||||||
_lock_registry_count[combined_key] = count
|
_lock_registry_count[combined_key] = count
|
||||||
if count == 1 and combined_key in _lock_cleanup_data:
|
if count == 1 and combined_key in _lock_cleanup_data: # 把再次使用的锁添剔除出待清理字典
|
||||||
_lock_cleanup_data.pop(combined_key)
|
_lock_cleanup_data.pop(combined_key)
|
||||||
return raw
|
return raw
|
||||||
|
|
||||||
|
|
@ -292,18 +292,20 @@ def _release_shared_raw_mp_lock(factory_name: str, key: str):
|
||||||
|
|
||||||
count -= 1
|
count -= 1
|
||||||
if count < 0:
|
if count < 0:
|
||||||
raise RuntimeError(f"Attempting to remove lock for {key} but it is not in the registry")
|
raise RuntimeError(f"Attempting to release lock for {key} more times than it was acquired")
|
||||||
else:
|
|
||||||
_lock_registry_count[combined_key] = count
|
|
||||||
|
|
||||||
|
_lock_registry_count[combined_key] = count
|
||||||
|
|
||||||
|
current_time = time.time()
|
||||||
if count == 0:
|
if count == 0:
|
||||||
_lock_cleanup_data[combined_key] = time.time()
|
_lock_cleanup_data[combined_key] = current_time
|
||||||
|
|
||||||
for combined_key, value in list(_lock_cleanup_data.items()):
|
# 清理过期的锁
|
||||||
if time.time() - value > CLEANUP_KEYED_LOCKS_AFTER_SECONDS:
|
for cleanup_key, cleanup_time in list(_lock_cleanup_data.items()):
|
||||||
_lock_registry.pop(combined_key)
|
if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS:
|
||||||
_lock_registry_count.pop(combined_key)
|
_lock_registry.pop(cleanup_key, None)
|
||||||
_lock_cleanup_data.pop(combined_key)
|
_lock_registry_count.pop(cleanup_key, None)
|
||||||
|
_lock_cleanup_data.pop(cleanup_key, None)
|
||||||
|
|
||||||
|
|
||||||
# ─────────────────────────────────────────────────────────────────────────────
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
@ -355,15 +357,18 @@ class KeyedUnifiedLock:
|
||||||
def _release_async_lock(self, key: str):
|
def _release_async_lock(self, key: str):
|
||||||
count = self._async_lock_count.get(key, 0)
|
count = self._async_lock_count.get(key, 0)
|
||||||
count -= 1
|
count -= 1
|
||||||
|
|
||||||
|
current_time = time.time() # 优化:只调用一次 time.time()
|
||||||
if count == 0:
|
if count == 0:
|
||||||
self._async_lock_cleanup_data[key] = time.time()
|
self._async_lock_cleanup_data[key] = current_time
|
||||||
self._async_lock_count[key] = count
|
self._async_lock_count[key] = count
|
||||||
|
|
||||||
for key, value in list(self._async_lock_cleanup_data.items()):
|
# 使用缓存的时间戳进行清理,避免在循环中重复调用 time.time()
|
||||||
if time.time() - value > CLEANUP_KEYED_LOCKS_AFTER_SECONDS:
|
for cleanup_key, cleanup_time in list(self._async_lock_cleanup_data.items()):
|
||||||
self._async_lock.pop(key)
|
if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS:
|
||||||
self._async_lock_count.pop(key)
|
self._async_lock.pop(cleanup_key)
|
||||||
self._async_lock_cleanup_data.pop(key)
|
self._async_lock_count.pop(cleanup_key)
|
||||||
|
self._async_lock_cleanup_data.pop(cleanup_key)
|
||||||
|
|
||||||
|
|
||||||
def _get_lock_for_key(self, key: str, enable_logging: bool = False) -> UnifiedLock:
|
def _get_lock_for_key(self, key: str, enable_logging: bool = False) -> UnifiedLock:
|
||||||
|
|
|
||||||
|
|
@ -1094,9 +1094,8 @@ class LightRAG:
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
# Semphore is released here
|
# Semphore is released here
|
||||||
# Concurrency is controlled by graph db lock for individual entities and relationships
|
# Concurrency is controlled by graph db lock for individual entities and relationships
|
||||||
|
|
||||||
if file_extraction_stage_ok:
|
if file_extraction_stage_ok:
|
||||||
try:
|
try:
|
||||||
# Get chunk_results from entity_relation_task
|
# Get chunk_results from entity_relation_task
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue