Fix NamespaceLock concurrent coroutine safety with ContextVar
- Use ContextVar for per-coroutine storage
- Prevent state interference between coroutines
- Add re-entrance protection check
(cherry picked from commit b6a5a90eaf)
This commit is contained in:
parent
f6a45245bd
commit
1e7bd654d8
1 changed files with 93 additions and 13 deletions
|
|
@ -6,6 +6,7 @@ from multiprocessing.synchronize import Lock as ProcessLock
|
||||||
from multiprocessing import Manager
|
from multiprocessing import Manager
|
||||||
import time
|
import time
|
||||||
import logging
|
import logging
|
||||||
|
from contextvars import ContextVar
|
||||||
from typing import Any, Dict, List, Optional, Union, TypeVar, Generic
|
from typing import Any, Dict, List, Optional, Union, TypeVar, Generic
|
||||||
|
|
||||||
from lightrag.exceptions import PipelineNotInitializedError
|
from lightrag.exceptions import PipelineNotInitializedError
|
||||||
|
|
@ -1365,7 +1366,7 @@ async def get_all_update_flags_status(workspace: str | None = None) -> Dict[str,
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
if workspace is None:
|
if workspace is None:
|
||||||
workspace = get_default_workspace
|
workspace = get_default_workspace()
|
||||||
|
|
||||||
result = {}
|
result = {}
|
||||||
async with get_internal_lock():
|
async with get_internal_lock():
|
||||||
|
|
@ -1441,33 +1442,112 @@ async def get_namespace_data(
|
||||||
if final_namespace.endswith(":pipeline_status") and not first_init:
|
if final_namespace.endswith(":pipeline_status") and not first_init:
|
||||||
# Check if pipeline_status should have been initialized but wasn't
|
# Check if pipeline_status should have been initialized but wasn't
|
||||||
# This helps users to call initialize_pipeline_status() before get_namespace_data()
|
# This helps users to call initialize_pipeline_status() before get_namespace_data()
|
||||||
raise PipelineNotInitializedError(namespace)
|
raise PipelineNotInitializedError(final_namespace)
|
||||||
|
|
||||||
# For other namespaces or when allow_create=True, create them dynamically
|
# For other namespaces or when allow_create=True, create them dynamically
|
||||||
if _is_multiprocess and _manager is not None:
|
if _is_multiprocess and _manager is not None:
|
||||||
_shared_dicts[namespace] = _manager.dict()
|
_shared_dicts[final_namespace] = _manager.dict()
|
||||||
else:
|
else:
|
||||||
_shared_dicts[namespace] = {}
|
_shared_dicts[final_namespace] = {}
|
||||||
|
|
||||||
return _shared_dicts[namespace]
|
return _shared_dicts[final_namespace]
|
||||||
|
|
||||||
|
|
||||||
|
class NamespaceLock:
|
||||||
|
"""
|
||||||
|
Reusable namespace lock wrapper that creates a fresh context on each use.
|
||||||
|
|
||||||
|
This class solves the lock re-entrance and concurrent coroutine issues by using
|
||||||
|
contextvars.ContextVar to provide per-coroutine storage. Each coroutine gets its
|
||||||
|
own independent lock context, preventing state interference between concurrent
|
||||||
|
coroutines using the same NamespaceLock instance.
|
||||||
|
|
||||||
|
Example:
|
||||||
|
lock = NamespaceLock("my_namespace", "workspace1")
|
||||||
|
|
||||||
|
# Can be used multiple times safely
|
||||||
|
async with lock:
|
||||||
|
await do_something()
|
||||||
|
|
||||||
|
# Can even be used concurrently without deadlock
|
||||||
|
await asyncio.gather(
|
||||||
|
coroutine_1(lock), # Each gets its own context
|
||||||
|
coroutine_2(lock) # No state interference
|
||||||
|
)
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self, namespace: str, workspace: str | None = None, enable_logging: bool = False
|
||||||
|
):
|
||||||
|
self._namespace = namespace
|
||||||
|
self._workspace = workspace
|
||||||
|
self._enable_logging = enable_logging
|
||||||
|
# Use ContextVar to provide per-coroutine storage for lock context
|
||||||
|
# This ensures each coroutine has its own independent context
|
||||||
|
self._ctx_var: ContextVar[Optional[_KeyedLockContext]] = ContextVar(
|
||||||
|
"lock_ctx", default=None
|
||||||
|
)
|
||||||
|
|
||||||
|
async def __aenter__(self):
|
||||||
|
"""Create a fresh context each time we enter"""
|
||||||
|
# Check if this coroutine already has an active lock context
|
||||||
|
if self._ctx_var.get() is not None:
|
||||||
|
raise RuntimeError(
|
||||||
|
"NamespaceLock already acquired in current coroutine context"
|
||||||
|
)
|
||||||
|
|
||||||
|
final_namespace = get_final_namespace(self._namespace, self._workspace)
|
||||||
|
ctx = get_storage_keyed_lock(
|
||||||
|
["default_key"],
|
||||||
|
namespace=final_namespace,
|
||||||
|
enable_logging=self._enable_logging,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Store context in this coroutine's ContextVar
|
||||||
|
self._ctx_var.set(ctx)
|
||||||
|
return await ctx.__aenter__()
|
||||||
|
|
||||||
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
||||||
|
"""Exit the current context and clean up"""
|
||||||
|
# Retrieve this coroutine's context
|
||||||
|
ctx = self._ctx_var.get()
|
||||||
|
if ctx is None:
|
||||||
|
raise RuntimeError("NamespaceLock exited without being entered")
|
||||||
|
|
||||||
|
result = await ctx.__aexit__(exc_type, exc_val, exc_tb)
|
||||||
|
# Clear this coroutine's context
|
||||||
|
self._ctx_var.set(None)
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
def get_namespace_lock(
|
def get_namespace_lock(
|
||||||
namespace: str, workspace: str | None = None, enable_logging: bool = False
|
namespace: str, workspace: str | None = None, enable_logging: bool = False
|
||||||
) -> str:
|
) -> NamespaceLock:
|
||||||
"""Get the lock key for a namespace.
|
"""Get a reusable namespace lock wrapper.
|
||||||
|
|
||||||
|
This function returns a NamespaceLock instance that can be used multiple times
|
||||||
|
safely, even in concurrent scenarios. Each use creates a fresh lock context
|
||||||
|
internally, preventing lock re-entrance errors.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
namespace: The namespace to get the lock key for.
|
namespace: The namespace to get the lock for.
|
||||||
workspace: Workspace identifier (may be empty string for global namespace)
|
workspace: Workspace identifier (may be empty string for global namespace)
|
||||||
|
enable_logging: Whether to enable lock operation logging
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
str: The lock key for the namespace.
|
NamespaceLock: A reusable lock wrapper that can be used with 'async with'
|
||||||
|
|
||||||
|
Example:
|
||||||
|
lock = get_namespace_lock("pipeline_status", workspace="space1")
|
||||||
|
|
||||||
|
# Can be used multiple times
|
||||||
|
async with lock:
|
||||||
|
await do_something()
|
||||||
|
|
||||||
|
async with lock:
|
||||||
|
await do_something_else()
|
||||||
"""
|
"""
|
||||||
final_namespace = get_final_namespace(namespace, workspace)
|
return NamespaceLock(namespace, workspace, enable_logging)
|
||||||
return get_storage_keyed_lock(
|
|
||||||
["default_key"], namespace=final_namespace, enable_logging=enable_logging
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def finalize_share_data():
|
def finalize_share_data():
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue