Refactor namespace lock to support reusable async context manager

• Add NamespaceLock class wrapper
• Fix lock re-entrance issues
• Enable concurrent lock usage
• Fresh context per async with block
• Update get_namespace_lock API
This commit is contained in:
yangdx 2025-11-17 04:07:37 +08:00
parent 52c812b9a0
commit 7deb9a64b9

View file

@ -1452,22 +1452,85 @@ async def get_namespace_data(
return _shared_dicts[final_namespace]
class NamespaceLock:
"""
Reusable namespace lock wrapper that creates a fresh context on each use.
This class solves the lock re-entrance issue by implementing the async context
manager protocol. Each time it's used in an 'async with' statement, it creates
a new _KeyedLockContext internally, allowing the same NamespaceLock instance
to be used multiple times safely, even in concurrent scenarios.
Example:
lock = NamespaceLock("my_namespace", "workspace1")
# Can be used multiple times safely
async with lock:
await do_something()
# Can even be used concurrently (each creates its own context)
await asyncio.gather(
use_lock_1(lock),
use_lock_2(lock)
)
"""
def __init__(
self, namespace: str, workspace: str | None = None, enable_logging: bool = False
):
self._namespace = namespace
self._workspace = workspace
self._enable_logging = enable_logging
self._current_ctx = None
async def __aenter__(self):
"""Create a fresh context each time we enter"""
final_namespace = get_final_namespace(self._namespace, self._workspace)
self._current_ctx = get_storage_keyed_lock(
["default_key"],
namespace=final_namespace,
enable_logging=self._enable_logging,
)
return await self._current_ctx.__aenter__()
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Exit the current context and clean up"""
if self._current_ctx is None:
raise RuntimeError("NamespaceLock exited without being entered")
result = await self._current_ctx.__aexit__(exc_type, exc_val, exc_tb)
self._current_ctx = None
return result
def get_namespace_lock(
namespace: str, workspace: str | None = None, enable_logging: bool = False
) -> str:
"""Get the lock key for a namespace.
) -> NamespaceLock:
"""Get a reusable namespace lock wrapper.
This function returns a NamespaceLock instance that can be used multiple times
safely, even in concurrent scenarios. Each use creates a fresh lock context
internally, preventing lock re-entrance errors.
Args:
namespace: The namespace to get the lock key for.
namespace: The namespace to get the lock for.
workspace: Workspace identifier (may be empty string for global namespace)
enable_logging: Whether to enable lock operation logging
Returns:
str: The lock key for the namespace.
NamespaceLock: A reusable lock wrapper that can be used with 'async with'
Example:
lock = get_namespace_lock("pipeline_status", workspace="space1")
# Can be used multiple times
async with lock:
await do_something()
async with lock:
await do_something_else()
"""
final_namespace = get_final_namespace(namespace, workspace)
return get_storage_keyed_lock(
["default_key"], namespace=final_namespace, enable_logging=enable_logging
)
return NamespaceLock(namespace, workspace, enable_logging)
def finalize_share_data():