From 1e7bd654d8e1fda1fc3c4931a52151c4caad1d8b Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 17 Nov 2025 05:27:31 +0800 Subject: [PATCH] Fix NamespaceLock concurrent coroutine safety with ContextVar - Use ContextVar for per-coroutine storage - Prevent state interference between coroutines - Add re-entrance protection check (cherry picked from commit b6a5a90eaf743c961a36d6a35a22f5fb91b9c222) --- lightrag/kg/shared_storage.py | 106 +++++++++++++++++++++++++++++----- 1 file changed, 93 insertions(+), 13 deletions(-) diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 3ccb0f52..32778ddc 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -6,6 +6,7 @@ from multiprocessing.synchronize import Lock as ProcessLock from multiprocessing import Manager import time import logging +from contextvars import ContextVar from typing import Any, Dict, List, Optional, Union, TypeVar, Generic from lightrag.exceptions import PipelineNotInitializedError @@ -1365,7 +1366,7 @@ async def get_all_update_flags_status(workspace: str | None = None) -> Dict[str, return {} if workspace is None: - workspace = get_default_workspace + workspace = get_default_workspace() result = {} async with get_internal_lock(): @@ -1441,33 +1442,112 @@ async def get_namespace_data( if final_namespace.endswith(":pipeline_status") and not first_init: # Check if pipeline_status should have been initialized but wasn't # This helps users to call initialize_pipeline_status() before get_namespace_data() - raise PipelineNotInitializedError(namespace) + raise PipelineNotInitializedError(final_namespace) # For other namespaces or when allow_create=True, create them dynamically if _is_multiprocess and _manager is not None: - _shared_dicts[namespace] = _manager.dict() + _shared_dicts[final_namespace] = _manager.dict() else: - _shared_dicts[namespace] = {} + _shared_dicts[final_namespace] = {} - return _shared_dicts[namespace] + return _shared_dicts[final_namespace] + + +class NamespaceLock: + """ + Reusable namespace lock wrapper that creates a fresh context on each use. + + This class solves the lock re-entrance and concurrent coroutine issues by using + contextvars.ContextVar to provide per-coroutine storage. Each coroutine gets its + own independent lock context, preventing state interference between concurrent + coroutines using the same NamespaceLock instance. + + Example: + lock = NamespaceLock("my_namespace", "workspace1") + + # Can be used multiple times safely + async with lock: + await do_something() + + # Can even be used concurrently without deadlock + await asyncio.gather( + coroutine_1(lock), # Each gets its own context + coroutine_2(lock) # No state interference + ) + """ + + def __init__( + self, namespace: str, workspace: str | None = None, enable_logging: bool = False + ): + self._namespace = namespace + self._workspace = workspace + self._enable_logging = enable_logging + # Use ContextVar to provide per-coroutine storage for lock context + # This ensures each coroutine has its own independent context + self._ctx_var: ContextVar[Optional[_KeyedLockContext]] = ContextVar( + "lock_ctx", default=None + ) + + async def __aenter__(self): + """Create a fresh context each time we enter""" + # Check if this coroutine already has an active lock context + if self._ctx_var.get() is not None: + raise RuntimeError( + "NamespaceLock already acquired in current coroutine context" + ) + + final_namespace = get_final_namespace(self._namespace, self._workspace) + ctx = get_storage_keyed_lock( + ["default_key"], + namespace=final_namespace, + enable_logging=self._enable_logging, + ) + + # Store context in this coroutine's ContextVar + self._ctx_var.set(ctx) + return await ctx.__aenter__() + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Exit the current context and clean up""" + # Retrieve this coroutine's context + ctx = self._ctx_var.get() + if ctx is None: + raise RuntimeError("NamespaceLock exited without being entered") + + result = await ctx.__aexit__(exc_type, exc_val, exc_tb) + # Clear this coroutine's context + self._ctx_var.set(None) + return result def get_namespace_lock( namespace: str, workspace: str | None = None, enable_logging: bool = False -) -> str: - """Get the lock key for a namespace. +) -> NamespaceLock: + """Get a reusable namespace lock wrapper. + + This function returns a NamespaceLock instance that can be used multiple times + safely, even in concurrent scenarios. Each use creates a fresh lock context + internally, preventing lock re-entrance errors. Args: - namespace: The namespace to get the lock key for. + namespace: The namespace to get the lock for. workspace: Workspace identifier (may be empty string for global namespace) + enable_logging: Whether to enable lock operation logging Returns: - str: The lock key for the namespace. + NamespaceLock: A reusable lock wrapper that can be used with 'async with' + + Example: + lock = get_namespace_lock("pipeline_status", workspace="space1") + + # Can be used multiple times + async with lock: + await do_something() + + async with lock: + await do_something_else() """ - final_namespace = get_final_namespace(namespace, workspace) - return get_storage_keyed_lock( - ["default_key"], namespace=final_namespace, enable_logging=enable_logging - ) + return NamespaceLock(namespace, workspace, enable_logging) def finalize_share_data():