From f8149790e48000f4c781f3ee615171708437f692 Mon Sep 17 00:00:00 2001 From: Arjun Rao Date: Thu, 8 May 2025 11:35:10 +1000 Subject: [PATCH 01/14] Initial commit with keyed graph lock --- lightrag/kg/shared_storage.py | 260 +++++++++++++++++++++++++++++++++- lightrag/lightrag.py | 118 +++++++-------- lightrag/operate.py | 167 +++++++++++----------- 3 files changed, 405 insertions(+), 140 deletions(-) diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 6f36f2c4..d5780f2e 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -1,9 +1,12 @@ +from collections import defaultdict import os import sys import asyncio +import multiprocessing as mp from multiprocessing.synchronize import Lock as ProcessLock from multiprocessing import Manager -from typing import Any, Dict, Optional, Union, TypeVar, Generic +import time +from typing import Any, Callable, Dict, List, Optional, Union, TypeVar, Generic # Define a direct print function for critical logs that must be visible in all processes @@ -27,8 +30,14 @@ LockType = Union[ProcessLock, asyncio.Lock] _is_multiprocess = None _workers = None _manager = None +_lock_registry: Optional[Dict[str, mp.synchronize.Lock]] = None +_lock_registry_count: Optional[Dict[str, int]] = None +_lock_cleanup_data: Optional[Dict[str, time.time]] = None +_registry_guard = None _initialized = None +CLEANUP_KEYED_LOCKS_AFTER_SECONDS = 300 + # shared data for storage across processes _shared_dicts: Optional[Dict[str, Any]] = None _init_flags: Optional[Dict[str, bool]] = None # namespace -> initialized @@ -40,10 +49,31 @@ _internal_lock: Optional[LockType] = None _pipeline_status_lock: Optional[LockType] = None _graph_db_lock: Optional[LockType] = None _data_init_lock: Optional[LockType] = None +_graph_db_lock_keyed: Optional["KeyedUnifiedLock"] = None # async locks for coroutine synchronization in multiprocess mode _async_locks: Optional[Dict[str, asyncio.Lock]] = None +DEBUG_LOCKS = False +_debug_n_locks_acquired: int = 0 +def inc_debug_n_locks_acquired(): + global _debug_n_locks_acquired + if DEBUG_LOCKS: + _debug_n_locks_acquired += 1 + print(f"DEBUG: Keyed Lock acquired, total: {_debug_n_locks_acquired:>5}", end="\r", flush=True) + +def dec_debug_n_locks_acquired(): + global _debug_n_locks_acquired + if DEBUG_LOCKS: + if _debug_n_locks_acquired > 0: + _debug_n_locks_acquired -= 1 + print(f"DEBUG: Keyed Lock released, total: {_debug_n_locks_acquired:>5}", end="\r", flush=True) + else: + raise RuntimeError("Attempting to release lock when no locks are acquired") + +def get_debug_n_locks_acquired(): + global _debug_n_locks_acquired + return _debug_n_locks_acquired class UnifiedLock(Generic[T]): """Provide a unified lock interface type for asyncio.Lock and multiprocessing.Lock""" @@ -210,6 +240,207 @@ class UnifiedLock(Generic[T]): ) raise + def locked(self) -> bool: + if self._is_async: + return self._lock.locked() + else: + return self._lock.locked() + +# ───────────────────────────────────────────────────────────────────────────── +# 2. CROSS‑PROCESS FACTORY (one manager.Lock shared by *all* processes) +# ───────────────────────────────────────────────────────────────────────────── +def _get_combined_key(factory_name: str, key: str) -> str: + """Return the combined key for the factory and key.""" + return f"{factory_name}:{key}" + +def _get_or_create_shared_raw_mp_lock(factory_name: str, key: str) -> Optional[mp.synchronize.Lock]: + """Return the *singleton* manager.Lock() proxy for *key*, creating if needed.""" + if not _is_multiprocess: + return None + + with _registry_guard: + combined_key = _get_combined_key(factory_name, key) + raw = _lock_registry.get(combined_key) + count = _lock_registry_count.get(combined_key) + if raw is None: + raw = _manager.Lock() + _lock_registry[combined_key] = raw + _lock_registry_count[combined_key] = 0 + else: + if count is None: + raise RuntimeError(f"Shared-Data lock registry for {factory_name} is corrupted for key {key}") + count += 1 + _lock_registry_count[combined_key] = count + if count == 1 and combined_key in _lock_cleanup_data: + _lock_cleanup_data.pop(combined_key) + return raw + + +def _release_shared_raw_mp_lock(factory_name: str, key: str): + """Release the *singleton* manager.Lock() proxy for *key*.""" + if not _is_multiprocess: + return + + with _registry_guard: + combined_key = _get_combined_key(factory_name, key) + raw = _lock_registry.get(combined_key) + count = _lock_registry_count.get(combined_key) + if raw is None and count is None: + return + elif raw is None or count is None: + raise RuntimeError(f"Shared-Data lock registry for {factory_name} is corrupted for key {key}") + + count -= 1 + if count < 0: + raise RuntimeError(f"Attempting to remove lock for {key} but it is not in the registry") + else: + _lock_registry_count[combined_key] = count + + if count == 0: + _lock_cleanup_data[combined_key] = time.time() + + for combined_key, value in list(_lock_cleanup_data.items()): + if time.time() - value > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: + _lock_registry.pop(combined_key) + _lock_registry_count.pop(combined_key) + _lock_cleanup_data.pop(combined_key) + + +# ───────────────────────────────────────────────────────────────────────────── +# 3. PARAMETER‑KEYED WRAPPER (unchanged except it *accepts a factory*) +# ───────────────────────────────────────────────────────────────────────────── +class KeyedUnifiedLock: + """ + Parameter‑keyed wrapper around `UnifiedLock`. + + • Keeps only a table of per‑key *asyncio* gates locally + • Fetches the shared process‑wide mutex on *every* acquire + • Builds a fresh `UnifiedLock` each time, so `enable_logging` + (or future options) can vary per call. + """ + + # ---------------- construction ---------------- + def __init__(self, factory_name: str, *, default_enable_logging: bool = True) -> None: + self._factory_name = factory_name + self._default_enable_logging = default_enable_logging + self._async_lock: Dict[str, asyncio.Lock] = {} # key → asyncio.Lock + self._async_lock_count: Dict[str, int] = {} # key → asyncio.Lock count + self._async_lock_cleanup_data: Dict[str, time.time] = {} # key → time.time + self._mp_locks: Dict[str, mp.synchronize.Lock] = {} # key → mp.synchronize.Lock + + # ---------------- public API ------------------ + def __call__(self, keys: list[str], *, enable_logging: Optional[bool] = None): + """ + Ergonomic helper so you can write: + + async with keyed_locks("alpha"): + ... + """ + if enable_logging is None: + enable_logging = self._default_enable_logging + return _KeyedLockContext(self, factory_name=self._factory_name, keys=keys, enable_logging=enable_logging) + + def _get_or_create_async_lock(self, key: str) -> asyncio.Lock: + async_lock = self._async_lock.get(key) + count = self._async_lock_count.get(key, 0) + if async_lock is None: + async_lock = asyncio.Lock() + self._async_lock[key] = async_lock + elif count == 0 and key in self._async_lock_cleanup_data: + self._async_lock_cleanup_data.pop(key) + count += 1 + self._async_lock_count[key] = count + return async_lock + + def _release_async_lock(self, key: str): + count = self._async_lock_count.get(key, 0) + count -= 1 + if count == 0: + self._async_lock_cleanup_data[key] = time.time() + self._async_lock_count[key] = count + + for key, value in list(self._async_lock_cleanup_data.items()): + if time.time() - value > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: + self._async_lock.pop(key) + self._async_lock_count.pop(key) + self._async_lock_cleanup_data.pop(key) + + + def _get_lock_for_key(self, key: str, enable_logging: bool = False) -> UnifiedLock: + # 1. get (or create) the per‑process async gate for this key + # Is synchronous, so no need to acquire a lock + async_lock = self._get_or_create_async_lock(key) + + # 2. fetch the shared raw lock + raw_lock = _get_or_create_shared_raw_mp_lock(self._factory_name, key) + is_multiprocess = raw_lock is not None + if not is_multiprocess: + raw_lock = async_lock + + # 3. build a *fresh* UnifiedLock with the chosen logging flag + if is_multiprocess: + return UnifiedLock( + lock=raw_lock, + is_async=False, # manager.Lock is synchronous + name=f"key:{self._factory_name}:{key}", + enable_logging=enable_logging, + async_lock=async_lock, # prevents event‑loop blocking + ) + else: + return UnifiedLock( + lock=raw_lock, + is_async=True, + name=f"key:{self._factory_name}:{key}", + enable_logging=enable_logging, + async_lock=None, # No need for async lock in single process mode + ) + + def _release_lock_for_key(self, key: str): + self._release_async_lock(key) + _release_shared_raw_mp_lock(self._factory_name, key) + +class _KeyedLockContext: + def __init__( + self, + parent: KeyedUnifiedLock, + factory_name: str, + keys: list[str], + enable_logging: bool, + ) -> None: + self._parent = parent + self._factory_name = factory_name + + # The sorting is critical to ensure proper lock and release order + # to avoid deadlocks + self._keys = sorted(keys) + self._enable_logging = ( + enable_logging if enable_logging is not None + else parent._default_enable_logging + ) + self._ul: Optional[List["UnifiedLock"]] = None # set in __aenter__ + + # ----- enter ----- + async def __aenter__(self): + if self._ul is not None: + raise RuntimeError("KeyedUnifiedLock already acquired in current context") + + # 4. acquire it + self._ul = [] + for key in self._keys: + lock = self._parent._get_lock_for_key(key, enable_logging=self._enable_logging) + await lock.__aenter__() + inc_debug_n_locks_acquired() + self._ul.append(lock) + return self # or return self._key if you prefer + + # ----- exit ----- + async def __aexit__(self, exc_type, exc, tb): + # The UnifiedLock takes care of proper release order + for ul, key in zip(reversed(self._ul), reversed(self._keys)): + await ul.__aexit__(exc_type, exc, tb) + self._parent._release_lock_for_key(key) + dec_debug_n_locks_acquired() + self._ul = None def get_internal_lock(enable_logging: bool = False) -> UnifiedLock: """return unified storage lock for data consistency""" @@ -258,6 +489,14 @@ def get_graph_db_lock(enable_logging: bool = False) -> UnifiedLock: async_lock=async_lock, ) +def get_graph_db_lock_keyed(keys: str | list[str], enable_logging: bool = False) -> KeyedUnifiedLock: + """return unified graph database lock for ensuring atomic operations""" + global _graph_db_lock_keyed + if _graph_db_lock_keyed is None: + raise RuntimeError("Shared-Data is not initialized") + if isinstance(keys, str): + keys = [keys] + return _graph_db_lock_keyed(keys, enable_logging=enable_logging) def get_data_init_lock(enable_logging: bool = False) -> UnifiedLock: """return unified data initialization lock for ensuring atomic data initialization""" @@ -294,6 +533,10 @@ def initialize_share_data(workers: int = 1): _workers, \ _is_multiprocess, \ _storage_lock, \ + _lock_registry, \ + _lock_registry_count, \ + _lock_cleanup_data, \ + _registry_guard, \ _internal_lock, \ _pipeline_status_lock, \ _graph_db_lock, \ @@ -302,7 +545,8 @@ def initialize_share_data(workers: int = 1): _init_flags, \ _initialized, \ _update_flags, \ - _async_locks + _async_locks, \ + _graph_db_lock_keyed # Check if already initialized if _initialized: @@ -316,6 +560,10 @@ def initialize_share_data(workers: int = 1): if workers > 1: _is_multiprocess = True _manager = Manager() + _lock_registry = _manager.dict() + _lock_registry_count = _manager.dict() + _lock_cleanup_data = _manager.dict() + _registry_guard = _manager.RLock() _internal_lock = _manager.Lock() _storage_lock = _manager.Lock() _pipeline_status_lock = _manager.Lock() @@ -324,6 +572,10 @@ def initialize_share_data(workers: int = 1): _shared_dicts = _manager.dict() _init_flags = _manager.dict() _update_flags = _manager.dict() + + _graph_db_lock_keyed = KeyedUnifiedLock( + factory_name="graph_db_lock", + ) # Initialize async locks for multiprocess mode _async_locks = { @@ -348,6 +600,10 @@ def initialize_share_data(workers: int = 1): _init_flags = {} _update_flags = {} _async_locks = None # No need for async locks in single process mode + + _graph_db_lock_keyed = KeyedUnifiedLock( + factory_name="graph_db_lock", + ) direct_log(f"Process {os.getpid()} Shared-Data created for Single Process") # Mark as initialized diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 7a79da31..d4c28b17 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1024,73 +1024,73 @@ class LightRAG: } ) - # Semphore was released here + # Semphore is NOT released here, however, the profile context is - if file_extraction_stage_ok: - try: - # Get chunk_results from entity_relation_task - chunk_results = await entity_relation_task - await merge_nodes_and_edges( - chunk_results=chunk_results, # result collected from entity_relation_task - knowledge_graph_inst=self.chunk_entity_relation_graph, - entity_vdb=self.entities_vdb, - relationships_vdb=self.relationships_vdb, - global_config=asdict(self), - pipeline_status=pipeline_status, - pipeline_status_lock=pipeline_status_lock, - llm_response_cache=self.llm_response_cache, - current_file_number=current_file_number, - total_files=total_files, - file_path=file_path, - ) + if file_extraction_stage_ok: + try: + # Get chunk_results from entity_relation_task + chunk_results = await entity_relation_task + await merge_nodes_and_edges( + chunk_results=chunk_results, # result collected from entity_relation_task + knowledge_graph_inst=self.chunk_entity_relation_graph, + entity_vdb=self.entities_vdb, + relationships_vdb=self.relationships_vdb, + global_config=asdict(self), + pipeline_status=pipeline_status, + pipeline_status_lock=pipeline_status_lock, + llm_response_cache=self.llm_response_cache, + current_file_number=current_file_number, + total_files=total_files, + file_path=file_path, + ) - await self.doc_status.upsert( - { - doc_id: { - "status": DocStatus.PROCESSED, - "chunks_count": len(chunks), - "content": status_doc.content, - "content_summary": status_doc.content_summary, - "content_length": status_doc.content_length, - "created_at": status_doc.created_at, - "updated_at": datetime.now().isoformat(), - "file_path": file_path, + await self.doc_status.upsert( + { + doc_id: { + "status": DocStatus.PROCESSED, + "chunks_count": len(chunks), + "content": status_doc.content, + "content_summary": status_doc.content_summary, + "content_length": status_doc.content_length, + "created_at": status_doc.created_at, + "updated_at": datetime.now().isoformat(), + "file_path": file_path, + } } - } - ) + ) - # Call _insert_done after processing each file - await self._insert_done() + # Call _insert_done after processing each file + await self._insert_done() - async with pipeline_status_lock: - log_message = f"Completed processing file {current_file_number}/{total_files}: {file_path}" - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) + async with pipeline_status_lock: + log_message = f"Completed processing file {current_file_number}/{total_files}: {file_path}" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) - except Exception as e: - # Log error and update pipeline status - error_msg = f"Merging stage failed in document {doc_id}: {traceback.format_exc()}" - logger.error(error_msg) - async with pipeline_status_lock: - pipeline_status["latest_message"] = error_msg - pipeline_status["history_messages"].append(error_msg) + except Exception as e: + # Log error and update pipeline status + error_msg = f"Merging stage failed in document {doc_id}: {traceback.format_exc()}" + logger.error(error_msg) + async with pipeline_status_lock: + pipeline_status["latest_message"] = error_msg + pipeline_status["history_messages"].append(error_msg) - # Update document status to failed - await self.doc_status.upsert( - { - doc_id: { - "status": DocStatus.FAILED, - "error": str(e), - "content": status_doc.content, - "content_summary": status_doc.content_summary, - "content_length": status_doc.content_length, - "created_at": status_doc.created_at, - "updated_at": datetime.now().isoformat(), - "file_path": file_path, + # Update document status to failed + await self.doc_status.upsert( + { + doc_id: { + "status": DocStatus.FAILED, + "error": str(e), + "content": status_doc.content, + "content_summary": status_doc.content_summary, + "content_length": status_doc.content_length, + "created_at": status_doc.created_at, + "updated_at": datetime.now().isoformat(), + "file_path": file_path, + } } - } - ) + ) # Create processing tasks for all documents doc_tasks = [] diff --git a/lightrag/operate.py b/lightrag/operate.py index d82965e2..a0a74c52 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -9,6 +9,8 @@ import os from typing import Any, AsyncIterator from collections import Counter, defaultdict +from .kg.shared_storage import get_graph_db_lock_keyed + from .utils import ( logger, clean_str, @@ -403,27 +405,31 @@ async def _merge_edges_then_upsert( ) for need_insert_id in [src_id, tgt_id]: - if not (await knowledge_graph_inst.has_node(need_insert_id)): - # # Discard this edge if the node does not exist - # if need_insert_id == src_id: - # logger.warning( - # f"Discard edge: {src_id} - {tgt_id} | Source node missing" - # ) - # else: - # logger.warning( - # f"Discard edge: {src_id} - {tgt_id} | Target node missing" - # ) - # return None - await knowledge_graph_inst.upsert_node( - need_insert_id, - node_data={ - "entity_id": need_insert_id, - "source_id": source_id, - "description": description, - "entity_type": "UNKNOWN", - "file_path": file_path, - }, - ) + if (await knowledge_graph_inst.has_node(need_insert_id)): + # This is so that the initial check for the existence of the node need not be locked + continue + async with get_graph_db_lock_keyed([need_insert_id], enable_logging=False): + if not (await knowledge_graph_inst.has_node(need_insert_id)): + # # Discard this edge if the node does not exist + # if need_insert_id == src_id: + # logger.warning( + # f"Discard edge: {src_id} - {tgt_id} | Source node missing" + # ) + # else: + # logger.warning( + # f"Discard edge: {src_id} - {tgt_id} | Target node missing" + # ) + # return None + await knowledge_graph_inst.upsert_node( + need_insert_id, + node_data={ + "entity_id": need_insert_id, + "source_id": source_id, + "description": description, + "entity_type": "UNKNOWN", + "file_path": file_path, + }, + ) force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] @@ -523,23 +529,30 @@ async def merge_nodes_and_edges( all_edges[sorted_edge_key].extend(edges) # Centralized processing of all nodes and edges - entities_data = [] - relationships_data = [] + total_entities_count = len(all_nodes) + total_relations_count = len(all_edges) # Merge nodes and edges # Use graph database lock to ensure atomic merges and updates graph_db_lock = get_graph_db_lock(enable_logging=False) - async with graph_db_lock: + async with pipeline_status_lock: + log_message = ( + f"Merging stage {current_file_number}/{total_files}: {file_path}" + ) + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + + # Process and update all entities at once + log_message = f"Updating {total_entities_count} entities {current_file_number}/{total_files}: {file_path}" + logger.info(log_message) + if pipeline_status is not None: async with pipeline_status_lock: - log_message = ( - f"Merging stage {current_file_number}/{total_files}: {file_path}" - ) - logger.info(log_message) pipeline_status["latest_message"] = log_message pipeline_status["history_messages"].append(log_message) - # Process and update all entities at once - for entity_name, entities in all_nodes.items(): + async def _locked_process_entity_name(entity_name, entities): + async with get_graph_db_lock_keyed([entity_name], enable_logging=False): entity_data = await _merge_nodes_then_upsert( entity_name, entities, @@ -549,10 +562,34 @@ async def merge_nodes_and_edges( pipeline_status_lock, llm_response_cache, ) - entities_data.append(entity_data) + if entity_vdb is not None: + data_for_vdb = { + compute_mdhash_id(entity_data["entity_name"], prefix="ent-"): { + "entity_name": entity_data["entity_name"], + "entity_type": entity_data["entity_type"], + "content": f"{entity_data['entity_name']}\n{entity_data['description']}", + "source_id": entity_data["source_id"], + "file_path": entity_data.get("file_path", "unknown_source"), + } + } + await entity_vdb.upsert(data_for_vdb) + return entity_data - # Process and update all relationships at once - for edge_key, edges in all_edges.items(): + tasks = [] + for entity_name, entities in all_nodes.items(): + tasks.append(asyncio.create_task(_locked_process_entity_name(entity_name, entities))) + await asyncio.gather(*tasks) + + # Process and update all relationships at once + log_message = f"Updating {total_relations_count} relations {current_file_number}/{total_files}: {file_path}" + logger.info(log_message) + if pipeline_status is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + + async def _locked_process_edges(edge_key, edges): + async with get_graph_db_lock_keyed(f"{edge_key[0]}-{edge_key[1]}", enable_logging=False): edge_data = await _merge_edges_then_upsert( edge_key[0], edge_key[1], @@ -563,55 +600,27 @@ async def merge_nodes_and_edges( pipeline_status_lock, llm_response_cache, ) - if edge_data is not None: - relationships_data.append(edge_data) + if edge_data is None: + return None - # Update total counts - total_entities_count = len(entities_data) - total_relations_count = len(relationships_data) - - log_message = f"Updating {total_entities_count} entities {current_file_number}/{total_files}: {file_path}" - logger.info(log_message) - if pipeline_status is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) - - # Update vector databases with all collected data - if entity_vdb is not None and entities_data: - data_for_vdb = { - compute_mdhash_id(dp["entity_name"], prefix="ent-"): { - "entity_name": dp["entity_name"], - "entity_type": dp["entity_type"], - "content": f"{dp['entity_name']}\n{dp['description']}", - "source_id": dp["source_id"], - "file_path": dp.get("file_path", "unknown_source"), + if relationships_vdb is not None: + data_for_vdb = { + compute_mdhash_id(edge_data["src_id"] + edge_data["tgt_id"], prefix="rel-"): { + "src_id": edge_data["src_id"], + "tgt_id": edge_data["tgt_id"], + "keywords": edge_data["keywords"], + "content": f"{edge_data['src_id']}\t{edge_data['tgt_id']}\n{edge_data['keywords']}\n{edge_data['description']}", + "source_id": edge_data["source_id"], + "file_path": edge_data.get("file_path", "unknown_source"), + } } - for dp in entities_data - } - await entity_vdb.upsert(data_for_vdb) - - log_message = f"Updating {total_relations_count} relations {current_file_number}/{total_files}: {file_path}" - logger.info(log_message) - if pipeline_status is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) - - if relationships_vdb is not None and relationships_data: - data_for_vdb = { - compute_mdhash_id(dp["src_id"] + dp["tgt_id"], prefix="rel-"): { - "src_id": dp["src_id"], - "tgt_id": dp["tgt_id"], - "keywords": dp["keywords"], - "content": f"{dp['src_id']}\t{dp['tgt_id']}\n{dp['keywords']}\n{dp['description']}", - "source_id": dp["source_id"], - "file_path": dp.get("file_path", "unknown_source"), - } - for dp in relationships_data - } - await relationships_vdb.upsert(data_for_vdb) + await relationships_vdb.upsert(data_for_vdb) + return edge_data + tasks = [] + for edge_key, edges in all_edges.items(): + tasks.append(asyncio.create_task(_locked_process_edges(edge_key, edges))) + await asyncio.gather(*tasks) async def extract_entities( chunks: dict[str, TextChunkSchema], From 6ad9d528b4013f8dc26bd55d2f47b3d4fa61a9db Mon Sep 17 00:00:00 2001 From: Arjun Rao Date: Thu, 8 May 2025 14:22:11 +1000 Subject: [PATCH 02/14] Updated semaphore release message --- lightrag/lightrag.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 9ae6178f..02ec060d 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1046,7 +1046,9 @@ class LightRAG: } ) - # Semphore is NOT released here, however, the profile context is + # Semphore is NOT released here, because the merge_nodes_and_edges function is highly concurrent + # and more importantly, it is the bottleneck and needs to be resource controlled in massively + # parallel insertions if file_extraction_stage_ok: try: From cb3bfc0e5b4f15057b3f7c57ba33fa53e3767648 Mon Sep 17 00:00:00 2001 From: yangdx Date: Wed, 9 Jul 2025 09:24:44 +0800 Subject: [PATCH 03/14] Release semphore before merge stage --- lightrag/lightrag.py | 114 +++++++++++++++++++++---------------------- lightrag/operate.py | 3 +- 2 files changed, 58 insertions(+), 59 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 7fde953b..01e513c5 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1069,27 +1069,27 @@ class LightRAG: } ) - # Semphore is NOT released here, because the merge_nodes_and_edges function is highly concurrent - # and more importantly, it is the bottleneck and needs to be resource controlled in massively - # parallel insertions + # Semphore is released here + # Concurrency is controlled by graph db lock for individual entities and relationships - if file_extraction_stage_ok: - try: - # Get chunk_results from entity_relation_task - chunk_results = await entity_relation_task - await merge_nodes_and_edges( - chunk_results=chunk_results, # result collected from entity_relation_task - knowledge_graph_inst=self.chunk_entity_relation_graph, - entity_vdb=self.entities_vdb, - relationships_vdb=self.relationships_vdb, - global_config=asdict(self), - pipeline_status=pipeline_status, - pipeline_status_lock=pipeline_status_lock, - llm_response_cache=self.llm_response_cache, - current_file_number=current_file_number, - total_files=total_files, - file_path=file_path, - ) + + if file_extraction_stage_ok: + try: + # Get chunk_results from entity_relation_task + chunk_results = await entity_relation_task + await merge_nodes_and_edges( + chunk_results=chunk_results, # result collected from entity_relation_task + knowledge_graph_inst=self.chunk_entity_relation_graph, + entity_vdb=self.entities_vdb, + relationships_vdb=self.relationships_vdb, + global_config=asdict(self), + pipeline_status=pipeline_status, + pipeline_status_lock=pipeline_status_lock, + llm_response_cache=self.llm_response_cache, + current_file_number=current_file_number, + total_files=total_files, + file_path=file_path, + ) await self.doc_status.upsert( { @@ -1111,46 +1111,46 @@ class LightRAG: } ) - # Call _insert_done after processing each file - await self._insert_done() + # Call _insert_done after processing each file + await self._insert_done() - async with pipeline_status_lock: - log_message = f"Completed processing file {current_file_number}/{total_files}: {file_path}" - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) + async with pipeline_status_lock: + log_message = f"Completed processing file {current_file_number}/{total_files}: {file_path}" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) - except Exception as e: - # Log error and update pipeline status - logger.error(traceback.format_exc()) - error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}" - logger.error(error_msg) - async with pipeline_status_lock: - pipeline_status["latest_message"] = error_msg - pipeline_status["history_messages"].append( - traceback.format_exc() - ) - pipeline_status["history_messages"].append(error_msg) - - # Persistent llm cache - if self.llm_response_cache: - await self.llm_response_cache.index_done_callback() - - # Update document status to failed - await self.doc_status.upsert( - { - doc_id: { - "status": DocStatus.FAILED, - "error": str(e), - "content": status_doc.content, - "content_summary": status_doc.content_summary, - "content_length": status_doc.content_length, - "created_at": status_doc.created_at, - "updated_at": datetime.now().isoformat(), - "file_path": file_path, - } - } + except Exception as e: + # Log error and update pipeline status + logger.error(traceback.format_exc()) + error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}" + logger.error(error_msg) + async with pipeline_status_lock: + pipeline_status["latest_message"] = error_msg + pipeline_status["history_messages"].append( + traceback.format_exc() ) + pipeline_status["history_messages"].append(error_msg) + + # Persistent llm cache + if self.llm_response_cache: + await self.llm_response_cache.index_done_callback() + + # Update document status to failed + await self.doc_status.upsert( + { + doc_id: { + "status": DocStatus.FAILED, + "error": str(e), + "content": status_doc.content, + "content_summary": status_doc.content_summary, + "content_length": status_doc.content_length, + "created_at": status_doc.created_at, + "updated_at": datetime.now().isoformat(), + "file_path": file_path, + } + } + ) # Create processing tasks for all documents doc_tasks = [] diff --git a/lightrag/operate.py b/lightrag/operate.py index cff40b0a..1a410469 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -36,6 +36,7 @@ from .base import ( ) from .prompt import PROMPTS from .constants import GRAPH_FIELD_SEP +from .kg.shared_storage import get_graph_db_lock_keyed import time from dotenv import load_dotenv @@ -1121,8 +1122,6 @@ async def merge_nodes_and_edges( pipeline_status_lock: Lock for pipeline status llm_response_cache: LLM response cache """ - # Get lock manager from shared storage - from .kg.shared_storage import get_graph_db_lock_keyed # Collect all nodes and edges from all chunks From ef4870fda51b8173ae0ef6ae6c41b65bc35f410b Mon Sep 17 00:00:00 2001 From: yangdx Date: Fri, 11 Jul 2025 16:34:54 +0800 Subject: [PATCH 04/14] Combined entity and edge processing tasks and optimize merging with semaphore --- lightrag/lightrag.py | 1 - lightrag/operate.py | 138 ++++++++++++++++++++++--------------------- 2 files changed, 72 insertions(+), 67 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index e41e1d7c..6f04a43f 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1078,7 +1078,6 @@ class LightRAG: # Semphore is released here # Concurrency is controlled by graph db lock for individual entities and relationships - if file_extraction_stage_ok: try: # Get chunk_results from entity_relation_task diff --git a/lightrag/operate.py b/lightrag/operate.py index c50a24be..dd65f031 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -1016,7 +1016,7 @@ async def _merge_edges_then_upsert( ) for need_insert_id in [src_id, tgt_id]: - if (await knowledge_graph_inst.has_node(need_insert_id)): + if await knowledge_graph_inst.has_node(need_insert_id): # This is so that the initial check for the existence of the node need not be locked continue async with get_graph_db_lock_keyed([need_insert_id], enable_logging=False): @@ -1124,7 +1124,6 @@ async def merge_nodes_and_edges( llm_response_cache: LLM response cache """ - # Collect all nodes and edges from all chunks all_nodes = defaultdict(list) all_edges = defaultdict(list) @@ -1145,92 +1144,99 @@ async def merge_nodes_and_edges( # Merge nodes and edges async with pipeline_status_lock: - log_message = ( - f"Merging stage {current_file_number}/{total_files}: {file_path}" - ) + log_message = f"Merging stage {current_file_number}/{total_files}: {file_path}" logger.info(log_message) pipeline_status["latest_message"] = log_message pipeline_status["history_messages"].append(log_message) - # Process and update all entities at once - log_message = f"Updating {total_entities_count} entities {current_file_number}/{total_files}: {file_path}" + # Process and update all entities and relationships in parallel + log_message = f"Updating {total_entities_count} entities and {total_relations_count} relations {current_file_number}/{total_files}: {file_path}" logger.info(log_message) if pipeline_status is not None: async with pipeline_status_lock: pipeline_status["latest_message"] = log_message pipeline_status["history_messages"].append(log_message) + # Get max async tasks limit from global_config for semaphore control + llm_model_max_async = global_config.get("llm_model_max_async", 4) + semaphore = asyncio.Semaphore(llm_model_max_async) + async def _locked_process_entity_name(entity_name, entities): - async with get_graph_db_lock_keyed([entity_name], enable_logging=False): - entity_data = await _merge_nodes_then_upsert( - entity_name, - entities, - knowledge_graph_inst, - global_config, - pipeline_status, - pipeline_status_lock, - llm_response_cache, - ) - if entity_vdb is not None: - data_for_vdb = { - compute_mdhash_id(entity_data["entity_name"], prefix="ent-"): { - "entity_name": entity_data["entity_name"], - "entity_type": entity_data["entity_type"], - "content": f"{entity_data['entity_name']}\n{entity_data['description']}", - "source_id": entity_data["source_id"], - "file_path": entity_data.get("file_path", "unknown_source"), + async with semaphore: + async with get_graph_db_lock_keyed([entity_name], enable_logging=False): + entity_data = await _merge_nodes_then_upsert( + entity_name, + entities, + knowledge_graph_inst, + global_config, + pipeline_status, + pipeline_status_lock, + llm_response_cache, + ) + if entity_vdb is not None: + data_for_vdb = { + compute_mdhash_id(entity_data["entity_name"], prefix="ent-"): { + "entity_name": entity_data["entity_name"], + "entity_type": entity_data["entity_type"], + "content": f"{entity_data['entity_name']}\n{entity_data['description']}", + "source_id": entity_data["source_id"], + "file_path": entity_data.get("file_path", "unknown_source"), + } } - } - await entity_vdb.upsert(data_for_vdb) - return entity_data - - tasks = [] - for entity_name, entities in all_nodes.items(): - tasks.append(asyncio.create_task(_locked_process_entity_name(entity_name, entities))) - await asyncio.gather(*tasks) - - # Process and update all relationships at once - log_message = f"Updating {total_relations_count} relations {current_file_number}/{total_files}: {file_path}" - logger.info(log_message) - if pipeline_status is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) + await entity_vdb.upsert(data_for_vdb) + return entity_data async def _locked_process_edges(edge_key, edges): - async with get_graph_db_lock_keyed(f"{edge_key[0]}-{edge_key[1]}", enable_logging=False): - edge_data = await _merge_edges_then_upsert( - edge_key[0], - edge_key[1], - edges, - knowledge_graph_inst, - global_config, - pipeline_status, - pipeline_status_lock, - llm_response_cache, - ) - if edge_data is None: - return None + async with semaphore: + async with get_graph_db_lock_keyed( + f"{edge_key[0]}-{edge_key[1]}", enable_logging=False + ): + edge_data = await _merge_edges_then_upsert( + edge_key[0], + edge_key[1], + edges, + knowledge_graph_inst, + global_config, + pipeline_status, + pipeline_status_lock, + llm_response_cache, + ) + if edge_data is None: + return None - if relationships_vdb is not None: - data_for_vdb = { - compute_mdhash_id(edge_data["src_id"] + edge_data["tgt_id"], prefix="rel-"): { - "src_id": edge_data["src_id"], - "tgt_id": edge_data["tgt_id"], - "keywords": edge_data["keywords"], - "content": f"{edge_data['src_id']}\t{edge_data['tgt_id']}\n{edge_data['keywords']}\n{edge_data['description']}", - "source_id": edge_data["source_id"], - "file_path": edge_data.get("file_path", "unknown_source"), + if relationships_vdb is not None: + data_for_vdb = { + compute_mdhash_id( + edge_data["src_id"] + edge_data["tgt_id"], prefix="rel-" + ): { + "src_id": edge_data["src_id"], + "tgt_id": edge_data["tgt_id"], + "keywords": edge_data["keywords"], + "content": f"{edge_data['src_id']}\t{edge_data['tgt_id']}\n{edge_data['keywords']}\n{edge_data['description']}", + "source_id": edge_data["source_id"], + "file_path": edge_data.get("file_path", "unknown_source"), + } } - } - await relationships_vdb.upsert(data_for_vdb) - return edge_data + await relationships_vdb.upsert(data_for_vdb) + return edge_data + # Create a single task queue for both entities and edges tasks = [] + + # Add entity processing tasks + for entity_name, entities in all_nodes.items(): + tasks.append( + asyncio.create_task(_locked_process_entity_name(entity_name, entities)) + ) + + # Add edge processing tasks for edge_key, edges in all_edges.items(): tasks.append(asyncio.create_task(_locked_process_edges(edge_key, edges))) + + # Execute all tasks in parallel with semaphore control await asyncio.gather(*tasks) + async def extract_entities( chunks: dict[str, TextChunkSchema], global_config: dict[str, str], From 3afdd1b67cba27e492c67ec891445e929d7fd45a Mon Sep 17 00:00:00 2001 From: yangdx Date: Fri, 11 Jul 2025 20:39:08 +0800 Subject: [PATCH 05/14] Fix initial count error for multi-process lock with key --- lightrag/kg/shared_storage.py | 39 ++++++++++++++++++++--------------- lightrag/lightrag.py | 5 ++--- 2 files changed, 24 insertions(+), 20 deletions(-) diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index d5780f2e..c9e26614 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -265,13 +265,13 @@ def _get_or_create_shared_raw_mp_lock(factory_name: str, key: str) -> Optional[m if raw is None: raw = _manager.Lock() _lock_registry[combined_key] = raw - _lock_registry_count[combined_key] = 0 + _lock_registry_count[combined_key] = 1 # 修复:新锁初始化为1,与释放逻辑保持一致 else: if count is None: raise RuntimeError(f"Shared-Data lock registry for {factory_name} is corrupted for key {key}") count += 1 _lock_registry_count[combined_key] = count - if count == 1 and combined_key in _lock_cleanup_data: + if count == 1 and combined_key in _lock_cleanup_data: # 把再次使用的锁添剔除出待清理字典 _lock_cleanup_data.pop(combined_key) return raw @@ -292,18 +292,20 @@ def _release_shared_raw_mp_lock(factory_name: str, key: str): count -= 1 if count < 0: - raise RuntimeError(f"Attempting to remove lock for {key} but it is not in the registry") - else: - _lock_registry_count[combined_key] = count + raise RuntimeError(f"Attempting to release lock for {key} more times than it was acquired") + _lock_registry_count[combined_key] = count + + current_time = time.time() if count == 0: - _lock_cleanup_data[combined_key] = time.time() + _lock_cleanup_data[combined_key] = current_time - for combined_key, value in list(_lock_cleanup_data.items()): - if time.time() - value > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: - _lock_registry.pop(combined_key) - _lock_registry_count.pop(combined_key) - _lock_cleanup_data.pop(combined_key) + # 清理过期的锁 + for cleanup_key, cleanup_time in list(_lock_cleanup_data.items()): + if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: + _lock_registry.pop(cleanup_key, None) + _lock_registry_count.pop(cleanup_key, None) + _lock_cleanup_data.pop(cleanup_key, None) # ───────────────────────────────────────────────────────────────────────────── @@ -355,15 +357,18 @@ class KeyedUnifiedLock: def _release_async_lock(self, key: str): count = self._async_lock_count.get(key, 0) count -= 1 + + current_time = time.time() # 优化:只调用一次 time.time() if count == 0: - self._async_lock_cleanup_data[key] = time.time() + self._async_lock_cleanup_data[key] = current_time self._async_lock_count[key] = count - for key, value in list(self._async_lock_cleanup_data.items()): - if time.time() - value > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: - self._async_lock.pop(key) - self._async_lock_count.pop(key) - self._async_lock_cleanup_data.pop(key) + # 使用缓存的时间戳进行清理,避免在循环中重复调用 time.time() + for cleanup_key, cleanup_time in list(self._async_lock_cleanup_data.items()): + if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: + self._async_lock.pop(cleanup_key) + self._async_lock_count.pop(cleanup_key) + self._async_lock_cleanup_data.pop(cleanup_key) def _get_lock_for_key(self, key: str, enable_logging: bool = False) -> UnifiedLock: diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index f7b1dcff..c07c6a53 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1094,9 +1094,8 @@ class LightRAG: } ) - # Semphore is released here - # Concurrency is controlled by graph db lock for individual entities and relationships - + # Semphore is released here + # Concurrency is controlled by graph db lock for individual entities and relationships if file_extraction_stage_ok: try: # Get chunk_results from entity_relation_task From c52c451cf74ed20c3954449d981af07f4deef943 Mon Sep 17 00:00:00 2001 From: yangdx Date: Fri, 11 Jul 2025 20:40:50 +0800 Subject: [PATCH 06/14] Fix linting --- lightrag/kg/shared_storage.py | 97 +++++++++++++++++++++++++---------- 1 file changed, 69 insertions(+), 28 deletions(-) diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index c9e26614..fdd4adcd 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -1,4 +1,3 @@ -from collections import defaultdict import os import sys import asyncio @@ -6,7 +5,7 @@ import multiprocessing as mp from multiprocessing.synchronize import Lock as ProcessLock from multiprocessing import Manager import time -from typing import Any, Callable, Dict, List, Optional, Union, TypeVar, Generic +from typing import Any, Dict, List, Optional, Union, TypeVar, Generic # Define a direct print function for critical logs that must be visible in all processes @@ -56,25 +55,38 @@ _async_locks: Optional[Dict[str, asyncio.Lock]] = None DEBUG_LOCKS = False _debug_n_locks_acquired: int = 0 + + def inc_debug_n_locks_acquired(): global _debug_n_locks_acquired if DEBUG_LOCKS: _debug_n_locks_acquired += 1 - print(f"DEBUG: Keyed Lock acquired, total: {_debug_n_locks_acquired:>5}", end="\r", flush=True) + print( + f"DEBUG: Keyed Lock acquired, total: {_debug_n_locks_acquired:>5}", + end="\r", + flush=True, + ) + def dec_debug_n_locks_acquired(): global _debug_n_locks_acquired if DEBUG_LOCKS: if _debug_n_locks_acquired > 0: _debug_n_locks_acquired -= 1 - print(f"DEBUG: Keyed Lock released, total: {_debug_n_locks_acquired:>5}", end="\r", flush=True) + print( + f"DEBUG: Keyed Lock released, total: {_debug_n_locks_acquired:>5}", + end="\r", + flush=True, + ) else: raise RuntimeError("Attempting to release lock when no locks are acquired") + def get_debug_n_locks_acquired(): global _debug_n_locks_acquired return _debug_n_locks_acquired + class UnifiedLock(Generic[T]): """Provide a unified lock interface type for asyncio.Lock and multiprocessing.Lock""" @@ -246,6 +258,7 @@ class UnifiedLock(Generic[T]): else: return self._lock.locked() + # ───────────────────────────────────────────────────────────────────────────── # 2. CROSS‑PROCESS FACTORY (one manager.Lock shared by *all* processes) # ───────────────────────────────────────────────────────────────────────────── @@ -253,7 +266,10 @@ def _get_combined_key(factory_name: str, key: str) -> str: """Return the combined key for the factory and key.""" return f"{factory_name}:{key}" -def _get_or_create_shared_raw_mp_lock(factory_name: str, key: str) -> Optional[mp.synchronize.Lock]: + +def _get_or_create_shared_raw_mp_lock( + factory_name: str, key: str +) -> Optional[mp.synchronize.Lock]: """Return the *singleton* manager.Lock() proxy for *key*, creating if needed.""" if not _is_multiprocess: return None @@ -265,13 +281,19 @@ def _get_or_create_shared_raw_mp_lock(factory_name: str, key: str) -> Optional[m if raw is None: raw = _manager.Lock() _lock_registry[combined_key] = raw - _lock_registry_count[combined_key] = 1 # 修复:新锁初始化为1,与释放逻辑保持一致 + _lock_registry_count[combined_key] = ( + 1 # 修复:新锁初始化为1,与释放逻辑保持一致 + ) else: if count is None: - raise RuntimeError(f"Shared-Data lock registry for {factory_name} is corrupted for key {key}") + raise RuntimeError( + f"Shared-Data lock registry for {factory_name} is corrupted for key {key}" + ) count += 1 _lock_registry_count[combined_key] = count - if count == 1 and combined_key in _lock_cleanup_data: # 把再次使用的锁添剔除出待清理字典 + if ( + count == 1 and combined_key in _lock_cleanup_data + ): # 把再次使用的锁添剔除出待清理字典 _lock_cleanup_data.pop(combined_key) return raw @@ -288,25 +310,29 @@ def _release_shared_raw_mp_lock(factory_name: str, key: str): if raw is None and count is None: return elif raw is None or count is None: - raise RuntimeError(f"Shared-Data lock registry for {factory_name} is corrupted for key {key}") + raise RuntimeError( + f"Shared-Data lock registry for {factory_name} is corrupted for key {key}" + ) count -= 1 if count < 0: - raise RuntimeError(f"Attempting to release lock for {key} more times than it was acquired") - + raise RuntimeError( + f"Attempting to release lock for {key} more times than it was acquired" + ) + _lock_registry_count[combined_key] = count - + current_time = time.time() if count == 0: _lock_cleanup_data[combined_key] = current_time - + # 清理过期的锁 for cleanup_key, cleanup_time in list(_lock_cleanup_data.items()): if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: _lock_registry.pop(cleanup_key, None) _lock_registry_count.pop(cleanup_key, None) _lock_cleanup_data.pop(cleanup_key, None) - + # ───────────────────────────────────────────────────────────────────────────── # 3. PARAMETER‑KEYED WRAPPER (unchanged except it *accepts a factory*) @@ -322,7 +348,9 @@ class KeyedUnifiedLock: """ # ---------------- construction ---------------- - def __init__(self, factory_name: str, *, default_enable_logging: bool = True) -> None: + def __init__( + self, factory_name: str, *, default_enable_logging: bool = True + ) -> None: self._factory_name = factory_name self._default_enable_logging = default_enable_logging self._async_lock: Dict[str, asyncio.Lock] = {} # key → asyncio.Lock @@ -340,7 +368,12 @@ class KeyedUnifiedLock: """ if enable_logging is None: enable_logging = self._default_enable_logging - return _KeyedLockContext(self, factory_name=self._factory_name, keys=keys, enable_logging=enable_logging) + return _KeyedLockContext( + self, + factory_name=self._factory_name, + keys=keys, + enable_logging=enable_logging, + ) def _get_or_create_async_lock(self, key: str) -> asyncio.Lock: async_lock = self._async_lock.get(key) @@ -357,7 +390,7 @@ class KeyedUnifiedLock: def _release_async_lock(self, key: str): count = self._async_lock_count.get(key, 0) count -= 1 - + current_time = time.time() # 优化:只调用一次 time.time() if count == 0: self._async_lock_cleanup_data[key] = current_time @@ -370,7 +403,6 @@ class KeyedUnifiedLock: self._async_lock_count.pop(cleanup_key) self._async_lock_cleanup_data.pop(cleanup_key) - def _get_lock_for_key(self, key: str, enable_logging: bool = False) -> UnifiedLock: # 1. get (or create) the per‑process async gate for this key # Is synchronous, so no need to acquire a lock @@ -381,15 +413,15 @@ class KeyedUnifiedLock: is_multiprocess = raw_lock is not None if not is_multiprocess: raw_lock = async_lock - + # 3. build a *fresh* UnifiedLock with the chosen logging flag if is_multiprocess: return UnifiedLock( lock=raw_lock, - is_async=False, # manager.Lock is synchronous + is_async=False, # manager.Lock is synchronous name=f"key:{self._factory_name}:{key}", enable_logging=enable_logging, - async_lock=async_lock, # prevents event‑loop blocking + async_lock=async_lock, # prevents event‑loop blocking ) else: return UnifiedLock( @@ -397,13 +429,14 @@ class KeyedUnifiedLock: is_async=True, name=f"key:{self._factory_name}:{key}", enable_logging=enable_logging, - async_lock=None, # No need for async lock in single process mode + async_lock=None, # No need for async lock in single process mode ) - + def _release_lock_for_key(self, key: str): self._release_async_lock(key) _release_shared_raw_mp_lock(self._factory_name, key) + class _KeyedLockContext: def __init__( self, @@ -419,7 +452,8 @@ class _KeyedLockContext: # to avoid deadlocks self._keys = sorted(keys) self._enable_logging = ( - enable_logging if enable_logging is not None + enable_logging + if enable_logging is not None else parent._default_enable_logging ) self._ul: Optional[List["UnifiedLock"]] = None # set in __aenter__ @@ -432,7 +466,9 @@ class _KeyedLockContext: # 4. acquire it self._ul = [] for key in self._keys: - lock = self._parent._get_lock_for_key(key, enable_logging=self._enable_logging) + lock = self._parent._get_lock_for_key( + key, enable_logging=self._enable_logging + ) await lock.__aenter__() inc_debug_n_locks_acquired() self._ul.append(lock) @@ -447,6 +483,7 @@ class _KeyedLockContext: dec_debug_n_locks_acquired() self._ul = None + def get_internal_lock(enable_logging: bool = False) -> UnifiedLock: """return unified storage lock for data consistency""" async_lock = _async_locks.get("internal_lock") if _is_multiprocess else None @@ -494,7 +531,10 @@ def get_graph_db_lock(enable_logging: bool = False) -> UnifiedLock: async_lock=async_lock, ) -def get_graph_db_lock_keyed(keys: str | list[str], enable_logging: bool = False) -> KeyedUnifiedLock: + +def get_graph_db_lock_keyed( + keys: str | list[str], enable_logging: bool = False +) -> KeyedUnifiedLock: """return unified graph database lock for ensuring atomic operations""" global _graph_db_lock_keyed if _graph_db_lock_keyed is None: @@ -503,6 +543,7 @@ def get_graph_db_lock_keyed(keys: str | list[str], enable_logging: bool = False) keys = [keys] return _graph_db_lock_keyed(keys, enable_logging=enable_logging) + def get_data_init_lock(enable_logging: bool = False) -> UnifiedLock: """return unified data initialization lock for ensuring atomic data initialization""" async_lock = _async_locks.get("data_init_lock") if _is_multiprocess else None @@ -577,7 +618,7 @@ def initialize_share_data(workers: int = 1): _shared_dicts = _manager.dict() _init_flags = _manager.dict() _update_flags = _manager.dict() - + _graph_db_lock_keyed = KeyedUnifiedLock( factory_name="graph_db_lock", ) @@ -605,7 +646,7 @@ def initialize_share_data(workers: int = 1): _init_flags = {} _update_flags = {} _async_locks = None # No need for async locks in single process mode - + _graph_db_lock_keyed = KeyedUnifiedLock( factory_name="graph_db_lock", ) From ad99d9ba5ab1bcc00f61cf42caa50b08f99adae9 Mon Sep 17 00:00:00 2001 From: yangdx Date: Fri, 11 Jul 2025 22:13:02 +0800 Subject: [PATCH 07/14] Improve code organization and comments --- lightrag/kg/shared_storage.py | 67 +++++++++++++++-------------------- 1 file changed, 29 insertions(+), 38 deletions(-) diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index fdd4adcd..9330ac6a 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -29,14 +29,17 @@ LockType = Union[ProcessLock, asyncio.Lock] _is_multiprocess = None _workers = None _manager = None + +# Global singleton data for multi-process keyed locks _lock_registry: Optional[Dict[str, mp.synchronize.Lock]] = None _lock_registry_count: Optional[Dict[str, int]] = None _lock_cleanup_data: Optional[Dict[str, time.time]] = None _registry_guard = None -_initialized = None - +# Timeout for keyed locks in seconds CLEANUP_KEYED_LOCKS_AFTER_SECONDS = 300 +_initialized = None + # shared data for storage across processes _shared_dicts: Optional[Dict[str, Any]] = None _init_flags: Optional[Dict[str, bool]] = None # namespace -> initialized @@ -48,6 +51,7 @@ _internal_lock: Optional[LockType] = None _pipeline_status_lock: Optional[LockType] = None _graph_db_lock: Optional[LockType] = None _data_init_lock: Optional[LockType] = None +# Manager for all keyed locks _graph_db_lock_keyed: Optional["KeyedUnifiedLock"] = None # async locks for coroutine synchronization in multiprocess mode @@ -56,7 +60,6 @@ _async_locks: Optional[Dict[str, asyncio.Lock]] = None DEBUG_LOCKS = False _debug_n_locks_acquired: int = 0 - def inc_debug_n_locks_acquired(): global _debug_n_locks_acquired if DEBUG_LOCKS: @@ -259,9 +262,6 @@ class UnifiedLock(Generic[T]): return self._lock.locked() -# ───────────────────────────────────────────────────────────────────────────── -# 2. CROSS‑PROCESS FACTORY (one manager.Lock shared by *all* processes) -# ───────────────────────────────────────────────────────────────────────────── def _get_combined_key(factory_name: str, key: str) -> str: """Return the combined key for the factory and key.""" return f"{factory_name}:{key}" @@ -270,7 +270,7 @@ def _get_combined_key(factory_name: str, key: str) -> str: def _get_or_create_shared_raw_mp_lock( factory_name: str, key: str ) -> Optional[mp.synchronize.Lock]: - """Return the *singleton* manager.Lock() proxy for *key*, creating if needed.""" + """Return the *singleton* manager.Lock() proxy for keyed lock, creating if needed.""" if not _is_multiprocess: return None @@ -281,20 +281,18 @@ def _get_or_create_shared_raw_mp_lock( if raw is None: raw = _manager.Lock() _lock_registry[combined_key] = raw - _lock_registry_count[combined_key] = ( - 1 # 修复:新锁初始化为1,与释放逻辑保持一致 - ) + count = 0 else: if count is None: raise RuntimeError( f"Shared-Data lock registry for {factory_name} is corrupted for key {key}" ) - count += 1 - _lock_registry_count[combined_key] = count if ( count == 1 and combined_key in _lock_cleanup_data - ): # 把再次使用的锁添剔除出待清理字典 + ): # Reusing an key waiting for cleanup, remove it from cleanup list _lock_cleanup_data.pop(combined_key) + count += 1 + _lock_registry_count[combined_key] = count return raw @@ -326,7 +324,6 @@ def _release_shared_raw_mp_lock(factory_name: str, key: str): if count == 0: _lock_cleanup_data[combined_key] = current_time - # 清理过期的锁 for cleanup_key, cleanup_time in list(_lock_cleanup_data.items()): if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: _lock_registry.pop(cleanup_key, None) @@ -334,31 +331,26 @@ def _release_shared_raw_mp_lock(factory_name: str, key: str): _lock_cleanup_data.pop(cleanup_key, None) -# ───────────────────────────────────────────────────────────────────────────── -# 3. PARAMETER‑KEYED WRAPPER (unchanged except it *accepts a factory*) -# ───────────────────────────────────────────────────────────────────────────── class KeyedUnifiedLock: """ - Parameter‑keyed wrapper around `UnifiedLock`. + Manager for unified keyed locks, supporting both single and multi-process - • Keeps only a table of per‑key *asyncio* gates locally - • Fetches the shared process‑wide mutex on *every* acquire + • Keeps only a table of async keyed locks locally + • Fetches the multi-process keyed lockon every acquire • Builds a fresh `UnifiedLock` each time, so `enable_logging` (or future options) can vary per call. """ - # ---------------- construction ---------------- def __init__( self, factory_name: str, *, default_enable_logging: bool = True ) -> None: self._factory_name = factory_name self._default_enable_logging = default_enable_logging - self._async_lock: Dict[str, asyncio.Lock] = {} # key → asyncio.Lock - self._async_lock_count: Dict[str, int] = {} # key → asyncio.Lock count - self._async_lock_cleanup_data: Dict[str, time.time] = {} # key → time.time - self._mp_locks: Dict[str, mp.synchronize.Lock] = {} # key → mp.synchronize.Lock + self._async_lock: Dict[str, asyncio.Lock] = {} # local keyed locks + self._async_lock_count: Dict[str, int] = {} # local keyed locks referenced count + self._async_lock_cleanup_data: Dict[str, time.time] = {} # local keyed locks timeout + self._mp_locks: Dict[str, mp.synchronize.Lock] = {} # multi-process lock proxies - # ---------------- public API ------------------ def __call__(self, keys: list[str], *, enable_logging: Optional[bool] = None): """ Ergonomic helper so you can write: @@ -391,12 +383,11 @@ class KeyedUnifiedLock: count = self._async_lock_count.get(key, 0) count -= 1 - current_time = time.time() # 优化:只调用一次 time.time() + current_time = time.time() if count == 0: self._async_lock_cleanup_data[key] = current_time self._async_lock_count[key] = count - # 使用缓存的时间戳进行清理,避免在循环中重复调用 time.time() for cleanup_key, cleanup_time in list(self._async_lock_cleanup_data.items()): if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: self._async_lock.pop(cleanup_key) @@ -417,19 +408,19 @@ class KeyedUnifiedLock: # 3. build a *fresh* UnifiedLock with the chosen logging flag if is_multiprocess: return UnifiedLock( - lock=raw_lock, - is_async=False, # manager.Lock is synchronous - name=f"key:{self._factory_name}:{key}", - enable_logging=enable_logging, - async_lock=async_lock, # prevents event‑loop blocking + lock = raw_lock, + is_async = False, # manager.Lock is synchronous + name = _get_combined_key(self._factory_name, key), + enable_logging = enable_logging, + async_lock = async_lock, # prevents event‑loop blocking ) else: return UnifiedLock( - lock=raw_lock, - is_async=True, - name=f"key:{self._factory_name}:{key}", - enable_logging=enable_logging, - async_lock=None, # No need for async lock in single process mode + lock = raw_lock, + is_async = True, + name = _get_combined_key(self._factory_name, key), + enable_logging = enable_logging, + async_lock = None, # No need for async lock in single process mode ) def _release_lock_for_key(self, key: str): From a64c767298d15e9361d63e90b98ec240e1db7105 Mon Sep 17 00:00:00 2001 From: yangdx Date: Fri, 11 Jul 2025 23:43:40 +0800 Subject: [PATCH 08/14] optimize: improve lock cleanup performance with threshold-based strategy - Add CLEANUP_THRESHOLD constant (100) to control cleanup frequency - Modify _release_shared_raw_mp_lock to only scan when cleanup list exceeds threshold - Modify _release_async_lock to only scan when cleanup list exceeds threshold --- lightrag/kg/shared_storage.py | 61 ++++++++++++++++++++++------------- 1 file changed, 38 insertions(+), 23 deletions(-) diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 9330ac6a..0f961687 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -37,6 +37,8 @@ _lock_cleanup_data: Optional[Dict[str, time.time]] = None _registry_guard = None # Timeout for keyed locks in seconds CLEANUP_KEYED_LOCKS_AFTER_SECONDS = 300 +# Threshold for triggering cleanup - only clean when pending list exceeds this size +CLEANUP_THRESHOLD = 100 _initialized = None @@ -60,6 +62,7 @@ _async_locks: Optional[Dict[str, asyncio.Lock]] = None DEBUG_LOCKS = False _debug_n_locks_acquired: int = 0 + def inc_debug_n_locks_acquired(): global _debug_n_locks_acquired if DEBUG_LOCKS: @@ -324,11 +327,13 @@ def _release_shared_raw_mp_lock(factory_name: str, key: str): if count == 0: _lock_cleanup_data[combined_key] = current_time - for cleanup_key, cleanup_time in list(_lock_cleanup_data.items()): - if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: - _lock_registry.pop(cleanup_key, None) - _lock_registry_count.pop(cleanup_key, None) - _lock_cleanup_data.pop(cleanup_key, None) + # Only perform cleanup when the pending cleanup list exceeds threshold + if len(_lock_cleanup_data) > CLEANUP_THRESHOLD: + for cleanup_key, cleanup_time in list(_lock_cleanup_data.items()): + if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: + _lock_registry.pop(cleanup_key, None) + _lock_registry_count.pop(cleanup_key, None) + _lock_cleanup_data.pop(cleanup_key, None) class KeyedUnifiedLock: @@ -347,9 +352,15 @@ class KeyedUnifiedLock: self._factory_name = factory_name self._default_enable_logging = default_enable_logging self._async_lock: Dict[str, asyncio.Lock] = {} # local keyed locks - self._async_lock_count: Dict[str, int] = {} # local keyed locks referenced count - self._async_lock_cleanup_data: Dict[str, time.time] = {} # local keyed locks timeout - self._mp_locks: Dict[str, mp.synchronize.Lock] = {} # multi-process lock proxies + self._async_lock_count: Dict[ + str, int + ] = {} # local keyed locks referenced count + self._async_lock_cleanup_data: Dict[ + str, time.time + ] = {} # local keyed locks timeout + self._mp_locks: Dict[ + str, mp.synchronize.Lock + ] = {} # multi-process lock proxies def __call__(self, keys: list[str], *, enable_logging: Optional[bool] = None): """ @@ -388,11 +399,15 @@ class KeyedUnifiedLock: self._async_lock_cleanup_data[key] = current_time self._async_lock_count[key] = count - for cleanup_key, cleanup_time in list(self._async_lock_cleanup_data.items()): - if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: - self._async_lock.pop(cleanup_key) - self._async_lock_count.pop(cleanup_key) - self._async_lock_cleanup_data.pop(cleanup_key) + # Only perform cleanup when the pending cleanup list exceeds threshold + if len(self._async_lock_cleanup_data) > CLEANUP_THRESHOLD: + for cleanup_key, cleanup_time in list( + self._async_lock_cleanup_data.items() + ): + if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: + self._async_lock.pop(cleanup_key) + self._async_lock_count.pop(cleanup_key) + self._async_lock_cleanup_data.pop(cleanup_key) def _get_lock_for_key(self, key: str, enable_logging: bool = False) -> UnifiedLock: # 1. get (or create) the per‑process async gate for this key @@ -408,19 +423,19 @@ class KeyedUnifiedLock: # 3. build a *fresh* UnifiedLock with the chosen logging flag if is_multiprocess: return UnifiedLock( - lock = raw_lock, - is_async = False, # manager.Lock is synchronous - name = _get_combined_key(self._factory_name, key), - enable_logging = enable_logging, - async_lock = async_lock, # prevents event‑loop blocking + lock=raw_lock, + is_async=False, # manager.Lock is synchronous + name=_get_combined_key(self._factory_name, key), + enable_logging=enable_logging, + async_lock=async_lock, # prevents event‑loop blocking ) else: return UnifiedLock( - lock = raw_lock, - is_async = True, - name = _get_combined_key(self._factory_name, key), - enable_logging = enable_logging, - async_lock = None, # No need for async lock in single process mode + lock=raw_lock, + is_async=True, + name=_get_combined_key(self._factory_name, key), + enable_logging=enable_logging, + async_lock=None, # No need for async lock in single process mode ) def _release_lock_for_key(self, key: str): From 22c36f2fd2095e11d2d8344e1d24d1d59de12759 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 12 Jul 2025 02:41:31 +0800 Subject: [PATCH 09/14] Optimize log messages --- lightrag/kg/shared_storage.py | 72 ++++++++++++++++++++++------------- lightrag/operate.py | 13 +++---- 2 files changed, 52 insertions(+), 33 deletions(-) diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 0f961687..fe7e7e00 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -5,21 +5,42 @@ import multiprocessing as mp from multiprocessing.synchronize import Lock as ProcessLock from multiprocessing import Manager import time +import logging from typing import Any, Dict, List, Optional, Union, TypeVar, Generic # Define a direct print function for critical logs that must be visible in all processes -def direct_log(message, level="INFO", enable_output: bool = True): +def direct_log(message, enable_output: bool = True, level: str = "DEBUG"): """ Log a message directly to stderr to ensure visibility in all processes, including the Gunicorn master process. Args: message: The message to log - level: Log level (default: "INFO") + level: Log level (default: "DEBUG") enable_output: Whether to actually output the log (default: True) """ - if enable_output: + # Get the current logger level from the lightrag logger + try: + from lightrag.utils import logger + + current_level = logger.getEffectiveLevel() + except ImportError: + # Fallback if lightrag.utils is not available + current_level = logging.INFO + + # Convert string level to numeric level for comparison + level_mapping = { + "DEBUG": logging.DEBUG, # 10 + "INFO": logging.INFO, # 20 + "WARNING": logging.WARNING, # 30 + "ERROR": logging.ERROR, # 40 + "CRITICAL": logging.CRITICAL, # 50 + } + message_level = level_mapping.get(level.upper(), logging.DEBUG) + + # print(f"Diret_log: {level.upper()} {message_level} ? {current_level}", file=sys.stderr, flush=True) + if enable_output or (message_level >= current_level): print(f"{level}: {message}", file=sys.stderr, flush=True) @@ -67,11 +88,7 @@ def inc_debug_n_locks_acquired(): global _debug_n_locks_acquired if DEBUG_LOCKS: _debug_n_locks_acquired += 1 - print( - f"DEBUG: Keyed Lock acquired, total: {_debug_n_locks_acquired:>5}", - end="\r", - flush=True, - ) + print(f"DEBUG: Keyed Lock acquired, total: {_debug_n_locks_acquired:>5}") def dec_debug_n_locks_acquired(): @@ -79,11 +96,7 @@ def dec_debug_n_locks_acquired(): if DEBUG_LOCKS: if _debug_n_locks_acquired > 0: _debug_n_locks_acquired -= 1 - print( - f"DEBUG: Keyed Lock released, total: {_debug_n_locks_acquired:>5}", - end="\r", - flush=True, - ) + print(f"DEBUG: Keyed Lock released, total: {_debug_n_locks_acquired:>5}") else: raise RuntimeError("Attempting to release lock when no locks are acquired") @@ -113,17 +126,8 @@ class UnifiedLock(Generic[T]): async def __aenter__(self) -> "UnifiedLock[T]": try: - # direct_log( - # f"== Lock == Process {self._pid}: Acquiring lock '{self._name}' (async={self._is_async})", - # enable_output=self._enable_logging, - # ) - # If in multiprocess mode and async lock exists, acquire it first if not self._is_async and self._async_lock is not None: - # direct_log( - # f"== Lock == Process {self._pid}: Acquiring async lock for '{self._name}'", - # enable_output=self._enable_logging, - # ) await self._async_lock.acquire() direct_log( f"== Lock == Process {self._pid}: Async lock for '{self._name}' acquired", @@ -328,12 +332,20 @@ def _release_shared_raw_mp_lock(factory_name: str, key: str): _lock_cleanup_data[combined_key] = current_time # Only perform cleanup when the pending cleanup list exceeds threshold - if len(_lock_cleanup_data) > CLEANUP_THRESHOLD: + total_cleanup_len = len(_lock_cleanup_data) + if total_cleanup_len >= CLEANUP_THRESHOLD: + cleaned_count = 0 for cleanup_key, cleanup_time in list(_lock_cleanup_data.items()): if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: _lock_registry.pop(cleanup_key, None) _lock_registry_count.pop(cleanup_key, None) _lock_cleanup_data.pop(cleanup_key, None) + cleaned_count += 1 + direct_log( + f"== mp Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired locks", + enable_output=False, + level="DEBUG", + ) class KeyedUnifiedLock: @@ -400,7 +412,9 @@ class KeyedUnifiedLock: self._async_lock_count[key] = count # Only perform cleanup when the pending cleanup list exceeds threshold - if len(self._async_lock_cleanup_data) > CLEANUP_THRESHOLD: + total_cleanup_len = len(self._async_lock_cleanup_data) + if total_cleanup_len >= CLEANUP_THRESHOLD: + cleaned_count = 0 for cleanup_key, cleanup_time in list( self._async_lock_cleanup_data.items() ): @@ -408,6 +422,12 @@ class KeyedUnifiedLock: self._async_lock.pop(cleanup_key) self._async_lock_count.pop(cleanup_key) self._async_lock_cleanup_data.pop(cleanup_key) + cleaned_count += 1 + direct_log( + f"== async Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired async locks", + enable_output=False, + level="DEBUG", + ) def _get_lock_for_key(self, key: str, enable_logging: bool = False) -> UnifiedLock: # 1. get (or create) the per‑process async gate for this key @@ -626,7 +646,7 @@ def initialize_share_data(workers: int = 1): _update_flags = _manager.dict() _graph_db_lock_keyed = KeyedUnifiedLock( - factory_name="graph_db_lock", + factory_name="GraphDB", ) # Initialize async locks for multiprocess mode @@ -654,7 +674,7 @@ def initialize_share_data(workers: int = 1): _async_locks = None # No need for async locks in single process mode _graph_db_lock_keyed = KeyedUnifiedLock( - factory_name="graph_db_lock", + factory_name="GraphDB", ) direct_log(f"Process {os.getpid()} Shared-Data created for Single Process") diff --git a/lightrag/operate.py b/lightrag/operate.py index 84e99c14..2008cb51 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -1143,19 +1143,18 @@ async def merge_nodes_and_edges( total_relations_count = len(all_edges) # Merge nodes and edges + log_message = f"Merging stage {current_file_number}/{total_files}: {file_path}" + logger.info(log_message) async with pipeline_status_lock: - log_message = f"Merging stage {current_file_number}/{total_files}: {file_path}" - logger.info(log_message) pipeline_status["latest_message"] = log_message pipeline_status["history_messages"].append(log_message) # Process and update all entities and relationships in parallel - log_message = f"Updating {total_entities_count} entities and {total_relations_count} relations {current_file_number}/{total_files}: {file_path}" + log_message = f"Processing: {total_entities_count} entities and {total_relations_count} relations" logger.info(log_message) - if pipeline_status is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) + async with pipeline_status_lock: + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) # Get max async tasks limit from global_config for semaphore control llm_model_max_async = global_config.get("llm_model_max_async", 4) From 3d8e6924bc849a7789de5830b78f460973f874a6 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 12 Jul 2025 02:58:05 +0800 Subject: [PATCH 10/14] Show lock clean up message --- lightrag/kg/shared_storage.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index fe7e7e00..02b6c245 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -341,11 +341,12 @@ def _release_shared_raw_mp_lock(factory_name: str, key: str): _lock_registry_count.pop(cleanup_key, None) _lock_cleanup_data.pop(cleanup_key, None) cleaned_count += 1 - direct_log( - f"== mp Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired locks", - enable_output=False, - level="DEBUG", - ) + if cleaned_count > 0: + direct_log( + f"== mp Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired locks", + enable_output=False, + level="INFO", + ) class KeyedUnifiedLock: @@ -423,11 +424,12 @@ class KeyedUnifiedLock: self._async_lock_count.pop(cleanup_key) self._async_lock_cleanup_data.pop(cleanup_key) cleaned_count += 1 - direct_log( - f"== async Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired async locks", - enable_output=False, - level="DEBUG", - ) + if cleaned_count > 0: + direct_log( + f"== async Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired async locks", + enable_output=False, + level="INFO", + ) def _get_lock_for_key(self, key: str, enable_logging: bool = False) -> UnifiedLock: # 1. get (or create) the per‑process async gate for this key From 7490a18481a45ffa77e68953697cbefe803ab2f3 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 12 Jul 2025 03:10:03 +0800 Subject: [PATCH 11/14] Optimize lock cleanup parameters --- lightrag/kg/shared_storage.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 02b6c245..0961da16 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -57,9 +57,9 @@ _lock_registry_count: Optional[Dict[str, int]] = None _lock_cleanup_data: Optional[Dict[str, time.time]] = None _registry_guard = None # Timeout for keyed locks in seconds -CLEANUP_KEYED_LOCKS_AFTER_SECONDS = 300 +CLEANUP_KEYED_LOCKS_AFTER_SECONDS = 150 # Threshold for triggering cleanup - only clean when pending list exceeds this size -CLEANUP_THRESHOLD = 100 +CLEANUP_THRESHOLD = 200 _initialized = None From 39965d7ded0a025f047ad81ddd7438784504ac0f Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 12 Jul 2025 03:32:08 +0800 Subject: [PATCH 12/14] Move merging stage back controled by max parallel insert semhore --- lightrag/kg/shared_storage.py | 4 +- lightrag/lightrag.py | 157 +++++++++++++++++----------------- 2 files changed, 82 insertions(+), 79 deletions(-) diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 0961da16..9871d4aa 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -57,9 +57,9 @@ _lock_registry_count: Optional[Dict[str, int]] = None _lock_cleanup_data: Optional[Dict[str, time.time]] = None _registry_guard = None # Timeout for keyed locks in seconds -CLEANUP_KEYED_LOCKS_AFTER_SECONDS = 150 +CLEANUP_KEYED_LOCKS_AFTER_SECONDS = 300 # Threshold for triggering cleanup - only clean when pending list exceeds this size -CLEANUP_THRESHOLD = 200 +CLEANUP_THRESHOLD = 500 _initialized = None diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index c07c6a53..feb7ab16 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1094,86 +1094,89 @@ class LightRAG: } ) - # Semphore is released here - # Concurrency is controlled by graph db lock for individual entities and relationships - if file_extraction_stage_ok: - try: - # Get chunk_results from entity_relation_task - chunk_results = await entity_relation_task - await merge_nodes_and_edges( - chunk_results=chunk_results, # result collected from entity_relation_task - knowledge_graph_inst=self.chunk_entity_relation_graph, - entity_vdb=self.entities_vdb, - relationships_vdb=self.relationships_vdb, - global_config=asdict(self), - pipeline_status=pipeline_status, - pipeline_status_lock=pipeline_status_lock, - llm_response_cache=self.llm_response_cache, - current_file_number=current_file_number, - total_files=total_files, - file_path=file_path, - ) - - await self.doc_status.upsert( - { - doc_id: { - "status": DocStatus.PROCESSED, - "chunks_count": len(chunks), - "chunks_list": list( - chunks.keys() - ), # 保留 chunks_list - "content": status_doc.content, - "content_summary": status_doc.content_summary, - "content_length": status_doc.content_length, - "created_at": status_doc.created_at, - "updated_at": datetime.now( - timezone.utc - ).isoformat(), - "file_path": file_path, - } - } - ) - - # Call _insert_done after processing each file - await self._insert_done() - - async with pipeline_status_lock: - log_message = f"Completed processing file {current_file_number}/{total_files}: {file_path}" - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) - - except Exception as e: - # Log error and update pipeline status - logger.error(traceback.format_exc()) - error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}" - logger.error(error_msg) - async with pipeline_status_lock: - pipeline_status["latest_message"] = error_msg - pipeline_status["history_messages"].append( - traceback.format_exc() + # Concurrency is controlled by graph db lock for individual entities and relationships + if file_extraction_stage_ok: + try: + # Get chunk_results from entity_relation_task + chunk_results = await entity_relation_task + await merge_nodes_and_edges( + chunk_results=chunk_results, # result collected from entity_relation_task + knowledge_graph_inst=self.chunk_entity_relation_graph, + entity_vdb=self.entities_vdb, + relationships_vdb=self.relationships_vdb, + global_config=asdict(self), + pipeline_status=pipeline_status, + pipeline_status_lock=pipeline_status_lock, + llm_response_cache=self.llm_response_cache, + current_file_number=current_file_number, + total_files=total_files, + file_path=file_path, ) - pipeline_status["history_messages"].append(error_msg) - # Persistent llm cache - if self.llm_response_cache: - await self.llm_response_cache.index_done_callback() - - # Update document status to failed - await self.doc_status.upsert( - { - doc_id: { - "status": DocStatus.FAILED, - "error": str(e), - "content": status_doc.content, - "content_summary": status_doc.content_summary, - "content_length": status_doc.content_length, - "created_at": status_doc.created_at, - "updated_at": datetime.now().isoformat(), - "file_path": file_path, + await self.doc_status.upsert( + { + doc_id: { + "status": DocStatus.PROCESSED, + "chunks_count": len(chunks), + "chunks_list": list( + chunks.keys() + ), # 保留 chunks_list + "content": status_doc.content, + "content_summary": status_doc.content_summary, + "content_length": status_doc.content_length, + "created_at": status_doc.created_at, + "updated_at": datetime.now( + timezone.utc + ).isoformat(), + "file_path": file_path, + } } - } - ) + ) + + # Call _insert_done after processing each file + await self._insert_done() + + async with pipeline_status_lock: + log_message = f"Completed processing file {current_file_number}/{total_files}: {file_path}" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append( + log_message + ) + + except Exception as e: + # Log error and update pipeline status + logger.error(traceback.format_exc()) + error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}" + logger.error(error_msg) + async with pipeline_status_lock: + pipeline_status["latest_message"] = error_msg + pipeline_status["history_messages"].append( + traceback.format_exc() + ) + pipeline_status["history_messages"].append( + error_msg + ) + + # Persistent llm cache + if self.llm_response_cache: + await self.llm_response_cache.index_done_callback() + + # Update document status to failed + await self.doc_status.upsert( + { + doc_id: { + "status": DocStatus.FAILED, + "error": str(e), + "content": status_doc.content, + "content_summary": status_doc.content_summary, + "content_length": status_doc.content_length, + "created_at": status_doc.created_at, + "updated_at": datetime.now().isoformat(), + "file_path": file_path, + } + } + ) # Create processing tasks for all documents doc_tasks = [] From 964293f21b7e4d916b7127150a5d96715f586644 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 12 Jul 2025 04:34:26 +0800 Subject: [PATCH 13/14] Optimize lock cleanup with time tracking and intervals - Add cleanup time tracking variables - Implement minimum cleanup intervals - Track earliest cleanup times - Handle time rollback cases - Improve cleanup logging --- lightrag/kg/shared_storage.py | 172 +++++++++++++++++++++++++++++----- 1 file changed, 147 insertions(+), 25 deletions(-) diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 9871d4aa..dd40c700 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -60,6 +60,12 @@ _registry_guard = None CLEANUP_KEYED_LOCKS_AFTER_SECONDS = 300 # Threshold for triggering cleanup - only clean when pending list exceeds this size CLEANUP_THRESHOLD = 500 +# Minimum interval between cleanup operations in seconds +MIN_CLEANUP_INTERVAL_SECONDS = 30 +# Track the earliest cleanup time for efficient cleanup triggering (multiprocess locks only) +_earliest_mp_cleanup_time: Optional[float] = None +# Track the last cleanup time to enforce minimum interval (multiprocess locks only) +_last_mp_cleanup_time: Optional[float] = None _initialized = None @@ -308,6 +314,8 @@ def _release_shared_raw_mp_lock(factory_name: str, key: str): if not _is_multiprocess: return + global _earliest_mp_cleanup_time, _last_mp_cleanup_time + with _registry_guard: combined_key = _get_combined_key(factory_name, key) raw = _lock_registry.get(combined_key) @@ -330,23 +338,77 @@ def _release_shared_raw_mp_lock(factory_name: str, key: str): current_time = time.time() if count == 0: _lock_cleanup_data[combined_key] = current_time + + # Update earliest multiprocess cleanup time (only when earlier) + if _earliest_mp_cleanup_time is None or current_time < _earliest_mp_cleanup_time: + _earliest_mp_cleanup_time = current_time - # Only perform cleanup when the pending cleanup list exceeds threshold + # Efficient cleanup triggering with minimum interval control total_cleanup_len = len(_lock_cleanup_data) if total_cleanup_len >= CLEANUP_THRESHOLD: - cleaned_count = 0 - for cleanup_key, cleanup_time in list(_lock_cleanup_data.items()): - if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: - _lock_registry.pop(cleanup_key, None) - _lock_registry_count.pop(cleanup_key, None) - _lock_cleanup_data.pop(cleanup_key, None) - cleaned_count += 1 - if cleaned_count > 0: + # Time rollback detection + if _last_mp_cleanup_time is not None and current_time < _last_mp_cleanup_time: direct_log( - f"== mp Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired locks", + "== mp Lock == Time rollback detected, resetting cleanup time", + level="WARNING", enable_output=False, - level="INFO", ) + _last_mp_cleanup_time = None + + # Check cleanup conditions + has_expired_locks = ( + _earliest_mp_cleanup_time is not None and + current_time - _earliest_mp_cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS + ) + + interval_satisfied = ( + _last_mp_cleanup_time is None or + current_time - _last_mp_cleanup_time > MIN_CLEANUP_INTERVAL_SECONDS + ) + + if has_expired_locks and interval_satisfied: + try: + cleaned_count = 0 + new_earliest_time = None + + # Perform cleanup while maintaining the new earliest time + for cleanup_key, cleanup_time in list(_lock_cleanup_data.items()): + if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: + # Clean expired locks + _lock_registry.pop(cleanup_key, None) + _lock_registry_count.pop(cleanup_key, None) + _lock_cleanup_data.pop(cleanup_key, None) + cleaned_count += 1 + else: + # Track the earliest time among remaining locks + if new_earliest_time is None or cleanup_time < new_earliest_time: + new_earliest_time = cleanup_time + + # Update state only after successful cleanup + _earliest_mp_cleanup_time = new_earliest_time + _last_mp_cleanup_time = current_time + + if cleaned_count > 0: + next_cleanup_in = ( + max( + (new_earliest_time + CLEANUP_KEYED_LOCKS_AFTER_SECONDS - current_time) if new_earliest_time else float('inf'), + MIN_CLEANUP_INTERVAL_SECONDS + ) + ) + direct_log( + f"== mp Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired locks, " + f"next cleanup in {next_cleanup_in:.1f}s", + enable_output=False, + level="INFO", + ) + + except Exception as e: + direct_log( + f"== mp Lock == Cleanup failed: {e}", + level="ERROR", + enable_output=False, + ) + # Don't update _last_mp_cleanup_time to allow retry class KeyedUnifiedLock: @@ -374,6 +436,8 @@ class KeyedUnifiedLock: self._mp_locks: Dict[ str, mp.synchronize.Lock ] = {} # multi-process lock proxies + self._earliest_async_cleanup_time: Optional[float] = None # track earliest async cleanup time + self._last_async_cleanup_time: Optional[float] = None # track last async cleanup time for minimum interval def __call__(self, keys: list[str], *, enable_logging: Optional[bool] = None): """ @@ -410,26 +474,78 @@ class KeyedUnifiedLock: current_time = time.time() if count == 0: self._async_lock_cleanup_data[key] = current_time + + # Update earliest async cleanup time (only when earlier) + if self._earliest_async_cleanup_time is None or current_time < self._earliest_async_cleanup_time: + self._earliest_async_cleanup_time = current_time self._async_lock_count[key] = count - # Only perform cleanup when the pending cleanup list exceeds threshold + # Efficient cleanup triggering with minimum interval control total_cleanup_len = len(self._async_lock_cleanup_data) if total_cleanup_len >= CLEANUP_THRESHOLD: - cleaned_count = 0 - for cleanup_key, cleanup_time in list( - self._async_lock_cleanup_data.items() - ): - if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: - self._async_lock.pop(cleanup_key) - self._async_lock_count.pop(cleanup_key) - self._async_lock_cleanup_data.pop(cleanup_key) - cleaned_count += 1 - if cleaned_count > 0: + # Time rollback detection + if self._last_async_cleanup_time is not None and current_time < self._last_async_cleanup_time: direct_log( - f"== async Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired async locks", + "== async Lock == Time rollback detected, resetting cleanup time", + level="WARNING", enable_output=False, - level="INFO", ) + self._last_async_cleanup_time = None + + # Check cleanup conditions + has_expired_locks = ( + self._earliest_async_cleanup_time is not None and + current_time - self._earliest_async_cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS + ) + + interval_satisfied = ( + self._last_async_cleanup_time is None or + current_time - self._last_async_cleanup_time > MIN_CLEANUP_INTERVAL_SECONDS + ) + + if has_expired_locks and interval_satisfied: + try: + cleaned_count = 0 + new_earliest_time = None + + # Perform cleanup while maintaining the new earliest time + for cleanup_key, cleanup_time in list(self._async_lock_cleanup_data.items()): + if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: + # Clean expired async locks + self._async_lock.pop(cleanup_key) + self._async_lock_count.pop(cleanup_key) + self._async_lock_cleanup_data.pop(cleanup_key) + cleaned_count += 1 + else: + # Track the earliest time among remaining locks + if new_earliest_time is None or cleanup_time < new_earliest_time: + new_earliest_time = cleanup_time + + # Update state only after successful cleanup + self._earliest_async_cleanup_time = new_earliest_time + self._last_async_cleanup_time = current_time + + if cleaned_count > 0: + next_cleanup_in = ( + max( + (new_earliest_time + CLEANUP_KEYED_LOCKS_AFTER_SECONDS - current_time) if new_earliest_time else float('inf'), + MIN_CLEANUP_INTERVAL_SECONDS + ) + ) + direct_log( + f"== async Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired async locks, " + f"next cleanup in {next_cleanup_in:.1f}s", + enable_output=False, + level="INFO", + ) + + except Exception as e: + direct_log( + f"== async Lock == Cleanup failed: {e}", + level="ERROR", + enable_output=False, + ) + # Don't update _last_async_cleanup_time to allow retry def _get_lock_for_key(self, key: str, enable_logging: bool = False) -> UnifiedLock: # 1. get (or create) the per‑process async gate for this key @@ -620,7 +736,9 @@ def initialize_share_data(workers: int = 1): _initialized, \ _update_flags, \ _async_locks, \ - _graph_db_lock_keyed + _graph_db_lock_keyed, \ + _earliest_mp_cleanup_time, \ + _last_mp_cleanup_time # Check if already initialized if _initialized: @@ -680,6 +798,10 @@ def initialize_share_data(workers: int = 1): ) direct_log(f"Process {os.getpid()} Shared-Data created for Single Process") + # Initialize multiprocess cleanup times + _earliest_mp_cleanup_time = None + _last_mp_cleanup_time = None + # Mark as initialized _initialized = True From 5ee509e67101509330537009da034bfe02af4cc9 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 12 Jul 2025 05:17:44 +0800 Subject: [PATCH 14/14] Fix linting --- lightrag/kg/shared_storage.py | 129 ++++++++++++++++++++++------------ 1 file changed, 85 insertions(+), 44 deletions(-) diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index dd40c700..b83e058c 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -338,42 +338,52 @@ def _release_shared_raw_mp_lock(factory_name: str, key: str): current_time = time.time() if count == 0: _lock_cleanup_data[combined_key] = current_time - + # Update earliest multiprocess cleanup time (only when earlier) - if _earliest_mp_cleanup_time is None or current_time < _earliest_mp_cleanup_time: + if ( + _earliest_mp_cleanup_time is None + or current_time < _earliest_mp_cleanup_time + ): _earliest_mp_cleanup_time = current_time # Efficient cleanup triggering with minimum interval control total_cleanup_len = len(_lock_cleanup_data) if total_cleanup_len >= CLEANUP_THRESHOLD: # Time rollback detection - if _last_mp_cleanup_time is not None and current_time < _last_mp_cleanup_time: + if ( + _last_mp_cleanup_time is not None + and current_time < _last_mp_cleanup_time + ): direct_log( "== mp Lock == Time rollback detected, resetting cleanup time", level="WARNING", enable_output=False, ) _last_mp_cleanup_time = None - + # Check cleanup conditions has_expired_locks = ( - _earliest_mp_cleanup_time is not None and - current_time - _earliest_mp_cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS + _earliest_mp_cleanup_time is not None + and current_time - _earliest_mp_cleanup_time + > CLEANUP_KEYED_LOCKS_AFTER_SECONDS ) - + interval_satisfied = ( - _last_mp_cleanup_time is None or - current_time - _last_mp_cleanup_time > MIN_CLEANUP_INTERVAL_SECONDS + _last_mp_cleanup_time is None + or current_time - _last_mp_cleanup_time > MIN_CLEANUP_INTERVAL_SECONDS ) - + if has_expired_locks and interval_satisfied: try: cleaned_count = 0 new_earliest_time = None - + # Perform cleanup while maintaining the new earliest time for cleanup_key, cleanup_time in list(_lock_cleanup_data.items()): - if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: + if ( + current_time - cleanup_time + > CLEANUP_KEYED_LOCKS_AFTER_SECONDS + ): # Clean expired locks _lock_registry.pop(cleanup_key, None) _lock_registry_count.pop(cleanup_key, None) @@ -381,19 +391,26 @@ def _release_shared_raw_mp_lock(factory_name: str, key: str): cleaned_count += 1 else: # Track the earliest time among remaining locks - if new_earliest_time is None or cleanup_time < new_earliest_time: + if ( + new_earliest_time is None + or cleanup_time < new_earliest_time + ): new_earliest_time = cleanup_time - + # Update state only after successful cleanup _earliest_mp_cleanup_time = new_earliest_time _last_mp_cleanup_time = current_time - + if cleaned_count > 0: - next_cleanup_in = ( - max( - (new_earliest_time + CLEANUP_KEYED_LOCKS_AFTER_SECONDS - current_time) if new_earliest_time else float('inf'), - MIN_CLEANUP_INTERVAL_SECONDS + next_cleanup_in = max( + ( + new_earliest_time + + CLEANUP_KEYED_LOCKS_AFTER_SECONDS + - current_time ) + if new_earliest_time + else float("inf"), + MIN_CLEANUP_INTERVAL_SECONDS, ) direct_log( f"== mp Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired locks, " @@ -401,7 +418,7 @@ def _release_shared_raw_mp_lock(factory_name: str, key: str): enable_output=False, level="INFO", ) - + except Exception as e: direct_log( f"== mp Lock == Cleanup failed: {e}", @@ -436,8 +453,12 @@ class KeyedUnifiedLock: self._mp_locks: Dict[ str, mp.synchronize.Lock ] = {} # multi-process lock proxies - self._earliest_async_cleanup_time: Optional[float] = None # track earliest async cleanup time - self._last_async_cleanup_time: Optional[float] = None # track last async cleanup time for minimum interval + self._earliest_async_cleanup_time: Optional[float] = ( + None # track earliest async cleanup time + ) + self._last_async_cleanup_time: Optional[float] = ( + None # track last async cleanup time for minimum interval + ) def __call__(self, keys: list[str], *, enable_logging: Optional[bool] = None): """ @@ -474,9 +495,12 @@ class KeyedUnifiedLock: current_time = time.time() if count == 0: self._async_lock_cleanup_data[key] = current_time - + # Update earliest async cleanup time (only when earlier) - if self._earliest_async_cleanup_time is None or current_time < self._earliest_async_cleanup_time: + if ( + self._earliest_async_cleanup_time is None + or current_time < self._earliest_async_cleanup_time + ): self._earliest_async_cleanup_time = current_time self._async_lock_count[key] = count @@ -484,33 +508,43 @@ class KeyedUnifiedLock: total_cleanup_len = len(self._async_lock_cleanup_data) if total_cleanup_len >= CLEANUP_THRESHOLD: # Time rollback detection - if self._last_async_cleanup_time is not None and current_time < self._last_async_cleanup_time: + if ( + self._last_async_cleanup_time is not None + and current_time < self._last_async_cleanup_time + ): direct_log( "== async Lock == Time rollback detected, resetting cleanup time", level="WARNING", enable_output=False, ) self._last_async_cleanup_time = None - + # Check cleanup conditions has_expired_locks = ( - self._earliest_async_cleanup_time is not None and - current_time - self._earliest_async_cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS + self._earliest_async_cleanup_time is not None + and current_time - self._earliest_async_cleanup_time + > CLEANUP_KEYED_LOCKS_AFTER_SECONDS ) - + interval_satisfied = ( - self._last_async_cleanup_time is None or - current_time - self._last_async_cleanup_time > MIN_CLEANUP_INTERVAL_SECONDS + self._last_async_cleanup_time is None + or current_time - self._last_async_cleanup_time + > MIN_CLEANUP_INTERVAL_SECONDS ) - + if has_expired_locks and interval_satisfied: try: cleaned_count = 0 new_earliest_time = None - + # Perform cleanup while maintaining the new earliest time - for cleanup_key, cleanup_time in list(self._async_lock_cleanup_data.items()): - if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: + for cleanup_key, cleanup_time in list( + self._async_lock_cleanup_data.items() + ): + if ( + current_time - cleanup_time + > CLEANUP_KEYED_LOCKS_AFTER_SECONDS + ): # Clean expired async locks self._async_lock.pop(cleanup_key) self._async_lock_count.pop(cleanup_key) @@ -518,19 +552,26 @@ class KeyedUnifiedLock: cleaned_count += 1 else: # Track the earliest time among remaining locks - if new_earliest_time is None or cleanup_time < new_earliest_time: + if ( + new_earliest_time is None + or cleanup_time < new_earliest_time + ): new_earliest_time = cleanup_time - + # Update state only after successful cleanup self._earliest_async_cleanup_time = new_earliest_time self._last_async_cleanup_time = current_time - + if cleaned_count > 0: - next_cleanup_in = ( - max( - (new_earliest_time + CLEANUP_KEYED_LOCKS_AFTER_SECONDS - current_time) if new_earliest_time else float('inf'), - MIN_CLEANUP_INTERVAL_SECONDS + next_cleanup_in = max( + ( + new_earliest_time + + CLEANUP_KEYED_LOCKS_AFTER_SECONDS + - current_time ) + if new_earliest_time + else float("inf"), + MIN_CLEANUP_INTERVAL_SECONDS, ) direct_log( f"== async Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired async locks, " @@ -538,7 +579,7 @@ class KeyedUnifiedLock: enable_output=False, level="INFO", ) - + except Exception as e: direct_log( f"== async Lock == Cleanup failed: {e}", @@ -801,7 +842,7 @@ def initialize_share_data(workers: int = 1): # Initialize multiprocess cleanup times _earliest_mp_cleanup_time = None _last_mp_cleanup_time = None - + # Mark as initialized _initialized = True