Fix NamespaceLock concurrent coroutine safety with ContextVar
- Use ContextVar for per-coroutine storage - Prevent state interference between coroutines - Add re-entrance protection check
This commit is contained in:
parent
91be4ccecb
commit
83cf878548
1 changed files with 30 additions and 13 deletions
|
|
@ -6,6 +6,7 @@ from multiprocessing.synchronize import Lock as ProcessLock
|
|||
from multiprocessing import Manager
|
||||
import time
|
||||
import logging
|
||||
from contextvars import ContextVar
|
||||
from typing import Any, Dict, List, Optional, Union, TypeVar, Generic
|
||||
|
||||
from lightrag.exceptions import PipelineNotInitializedError
|
||||
|
|
@ -1456,10 +1457,10 @@ class NamespaceLock:
|
|||
"""
|
||||
Reusable namespace lock wrapper that creates a fresh context on each use.
|
||||
|
||||
This class solves the lock re-entrance issue by implementing the async context
|
||||
manager protocol. Each time it's used in an 'async with' statement, it creates
|
||||
a new _KeyedLockContext internally, allowing the same NamespaceLock instance
|
||||
to be used multiple times safely, even in concurrent scenarios.
|
||||
This class solves the lock re-entrance and concurrent coroutine issues by using
|
||||
contextvars.ContextVar to provide per-coroutine storage. Each coroutine gets its
|
||||
own independent lock context, preventing state interference between concurrent
|
||||
coroutines using the same NamespaceLock instance.
|
||||
|
||||
Example:
|
||||
lock = NamespaceLock("my_namespace", "workspace1")
|
||||
|
|
@ -1468,10 +1469,10 @@ class NamespaceLock:
|
|||
async with lock:
|
||||
await do_something()
|
||||
|
||||
# Can even be used concurrently (each creates its own context)
|
||||
# Can even be used concurrently without deadlock
|
||||
await asyncio.gather(
|
||||
use_lock_1(lock),
|
||||
use_lock_2(lock)
|
||||
coroutine_1(lock), # Each gets its own context
|
||||
coroutine_2(lock) # No state interference
|
||||
)
|
||||
"""
|
||||
|
||||
|
|
@ -1481,25 +1482,41 @@ class NamespaceLock:
|
|||
self._namespace = namespace
|
||||
self._workspace = workspace
|
||||
self._enable_logging = enable_logging
|
||||
self._current_ctx = None
|
||||
# Use ContextVar to provide per-coroutine storage for lock context
|
||||
# This ensures each coroutine has its own independent context
|
||||
self._ctx_var: ContextVar[Optional[_KeyedLockContext]] = ContextVar(
|
||||
"lock_ctx", default=None
|
||||
)
|
||||
|
||||
async def __aenter__(self):
|
||||
"""Create a fresh context each time we enter"""
|
||||
# Check if this coroutine already has an active lock context
|
||||
if self._ctx_var.get() is not None:
|
||||
raise RuntimeError(
|
||||
"NamespaceLock already acquired in current coroutine context"
|
||||
)
|
||||
|
||||
final_namespace = get_final_namespace(self._namespace, self._workspace)
|
||||
self._current_ctx = get_storage_keyed_lock(
|
||||
ctx = get_storage_keyed_lock(
|
||||
["default_key"],
|
||||
namespace=final_namespace,
|
||||
enable_logging=self._enable_logging,
|
||||
)
|
||||
return await self._current_ctx.__aenter__()
|
||||
|
||||
# Store context in this coroutine's ContextVar
|
||||
self._ctx_var.set(ctx)
|
||||
return await ctx.__aenter__()
|
||||
|
||||
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
||||
"""Exit the current context and clean up"""
|
||||
if self._current_ctx is None:
|
||||
# Retrieve this coroutine's context
|
||||
ctx = self._ctx_var.get()
|
||||
if ctx is None:
|
||||
raise RuntimeError("NamespaceLock exited without being entered")
|
||||
|
||||
result = await self._current_ctx.__aexit__(exc_type, exc_val, exc_tb)
|
||||
self._current_ctx = None
|
||||
result = await ctx.__aexit__(exc_type, exc_val, exc_tb)
|
||||
# Clear this coroutine's context
|
||||
self._ctx_var.set(None)
|
||||
return result
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue