diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 113bda1c..87f0f9a9 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -1452,22 +1452,85 @@ async def get_namespace_data( return _shared_dicts[final_namespace] +class NamespaceLock: + """ + Reusable namespace lock wrapper that creates a fresh context on each use. + + This class solves the lock re-entrance issue by implementing the async context + manager protocol. Each time it's used in an 'async with' statement, it creates + a new _KeyedLockContext internally, allowing the same NamespaceLock instance + to be used multiple times safely, even in concurrent scenarios. + + Example: + lock = NamespaceLock("my_namespace", "workspace1") + + # Can be used multiple times safely + async with lock: + await do_something() + + # Can even be used concurrently (each creates its own context) + await asyncio.gather( + use_lock_1(lock), + use_lock_2(lock) + ) + """ + + def __init__( + self, namespace: str, workspace: str | None = None, enable_logging: bool = False + ): + self._namespace = namespace + self._workspace = workspace + self._enable_logging = enable_logging + self._current_ctx = None + + async def __aenter__(self): + """Create a fresh context each time we enter""" + final_namespace = get_final_namespace(self._namespace, self._workspace) + self._current_ctx = get_storage_keyed_lock( + ["default_key"], + namespace=final_namespace, + enable_logging=self._enable_logging, + ) + return await self._current_ctx.__aenter__() + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Exit the current context and clean up""" + if self._current_ctx is None: + raise RuntimeError("NamespaceLock exited without being entered") + + result = await self._current_ctx.__aexit__(exc_type, exc_val, exc_tb) + self._current_ctx = None + return result + + def get_namespace_lock( namespace: str, workspace: str | None = None, enable_logging: bool = False -) -> str: - """Get the lock key for a namespace. +) -> NamespaceLock: + """Get a reusable namespace lock wrapper. + + This function returns a NamespaceLock instance that can be used multiple times + safely, even in concurrent scenarios. Each use creates a fresh lock context + internally, preventing lock re-entrance errors. Args: - namespace: The namespace to get the lock key for. + namespace: The namespace to get the lock for. workspace: Workspace identifier (may be empty string for global namespace) + enable_logging: Whether to enable lock operation logging Returns: - str: The lock key for the namespace. + NamespaceLock: A reusable lock wrapper that can be used with 'async with' + + Example: + lock = get_namespace_lock("pipeline_status", workspace="space1") + + # Can be used multiple times + async with lock: + await do_something() + + async with lock: + await do_something_else() """ - final_namespace = get_final_namespace(namespace, workspace) - return get_storage_keyed_lock( - ["default_key"], namespace=final_namespace, enable_logging=enable_logging - ) + return NamespaceLock(namespace, workspace, enable_logging) def finalize_share_data():