From 83cf878548586440952575988d471a12354ac6bb Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 17 Nov 2025 05:27:31 +0800 Subject: [PATCH] Fix NamespaceLock concurrent coroutine safety with ContextVar - Use ContextVar for per-coroutine storage - Prevent state interference between coroutines - Add re-entrance protection check --- lightrag/kg/shared_storage.py | 43 ++++++++++++++++++++++++----------- 1 file changed, 30 insertions(+), 13 deletions(-) diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index f556e91f..32778ddc 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -6,6 +6,7 @@ from multiprocessing.synchronize import Lock as ProcessLock from multiprocessing import Manager import time import logging +from contextvars import ContextVar from typing import Any, Dict, List, Optional, Union, TypeVar, Generic from lightrag.exceptions import PipelineNotInitializedError @@ -1456,10 +1457,10 @@ class NamespaceLock: """ Reusable namespace lock wrapper that creates a fresh context on each use. - This class solves the lock re-entrance issue by implementing the async context - manager protocol. Each time it's used in an 'async with' statement, it creates - a new _KeyedLockContext internally, allowing the same NamespaceLock instance - to be used multiple times safely, even in concurrent scenarios. + This class solves the lock re-entrance and concurrent coroutine issues by using + contextvars.ContextVar to provide per-coroutine storage. Each coroutine gets its + own independent lock context, preventing state interference between concurrent + coroutines using the same NamespaceLock instance. Example: lock = NamespaceLock("my_namespace", "workspace1") @@ -1468,10 +1469,10 @@ class NamespaceLock: async with lock: await do_something() - # Can even be used concurrently (each creates its own context) + # Can even be used concurrently without deadlock await asyncio.gather( - use_lock_1(lock), - use_lock_2(lock) + coroutine_1(lock), # Each gets its own context + coroutine_2(lock) # No state interference ) """ @@ -1481,25 +1482,41 @@ class NamespaceLock: self._namespace = namespace self._workspace = workspace self._enable_logging = enable_logging - self._current_ctx = None + # Use ContextVar to provide per-coroutine storage for lock context + # This ensures each coroutine has its own independent context + self._ctx_var: ContextVar[Optional[_KeyedLockContext]] = ContextVar( + "lock_ctx", default=None + ) async def __aenter__(self): """Create a fresh context each time we enter""" + # Check if this coroutine already has an active lock context + if self._ctx_var.get() is not None: + raise RuntimeError( + "NamespaceLock already acquired in current coroutine context" + ) + final_namespace = get_final_namespace(self._namespace, self._workspace) - self._current_ctx = get_storage_keyed_lock( + ctx = get_storage_keyed_lock( ["default_key"], namespace=final_namespace, enable_logging=self._enable_logging, ) - return await self._current_ctx.__aenter__() + + # Store context in this coroutine's ContextVar + self._ctx_var.set(ctx) + return await ctx.__aenter__() async def __aexit__(self, exc_type, exc_val, exc_tb): """Exit the current context and clean up""" - if self._current_ctx is None: + # Retrieve this coroutine's context + ctx = self._ctx_var.get() + if ctx is None: raise RuntimeError("NamespaceLock exited without being entered") - result = await self._current_ctx.__aexit__(exc_type, exc_val, exc_tb) - self._current_ctx = None + result = await ctx.__aexit__(exc_type, exc_val, exc_tb) + # Clear this coroutine's context + self._ctx_var.set(None) return result