diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 87318c07..3c24bbdc 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -94,6 +94,10 @@ _sync_locks: dict = {} # async locks for coroutine synchronization in multiprocess mode _async_locks: Optional[Dict[str, asyncio.Lock]] = None +# Per-process dictionary to store workspace-specific async locks +# This MUST be per-process because asyncio.Lock objects cannot be shared across processes +_workspace_async_locks: Dict[str, asyncio.Lock] = {} + _debug_n_locks_acquired: int = 0 @@ -1074,19 +1078,28 @@ def _get_workspace_lock( if workspace: # Workspace-specific lock (lazy creation with synchronization) if _is_multiprocess: + # Check if shared data is initialized + if _registry_guard is None: + raise RuntimeError( + "Shared data not initialized. Call initialize_share_data() first." + ) + # Use registry guard to protect lazy creation in multiprocess mode with _registry_guard: if lock_name not in _sync_locks: _sync_locks[lock_name] = _manager.RLock() - # Create companion async lock for this workspace lock - if _async_locks is not None and lock_name not in _async_locks: - _async_locks[lock_name] = asyncio.Lock() + + # Create per-process async lock for workspace (cannot be shared across processes) + if lock_name not in _workspace_async_locks: + _workspace_async_locks[lock_name] = asyncio.Lock() + + async_lock = _workspace_async_locks[lock_name] else: # Single-process mode - no synchronization needed due to asyncio nature if lock_name not in _sync_locks: _sync_locks[lock_name] = asyncio.Lock() - async_lock = _async_locks.get(lock_name) if _is_multiprocess else None + async_lock = None return UnifiedLock( lock=_sync_locks[lock_name], is_async=not _is_multiprocess, @@ -1271,6 +1284,7 @@ def initialize_share_data(workers: int = 1): _async_locks, \ _storage_keyed_lock, \ _sync_locks, \ + _workspace_async_locks, \ _earliest_mp_cleanup_time, \ _last_mp_cleanup_time @@ -1311,6 +1325,9 @@ def initialize_share_data(workers: int = 1): "data_init_lock": asyncio.Lock(), } + # Initialize per-process workspace async locks + _workspace_async_locks = {} + direct_log( f"Process {os.getpid()} Shared-Data created for Multiple Process (workers={workers})" ) @@ -1326,6 +1343,7 @@ def initialize_share_data(workers: int = 1): _update_flags = {} _sync_locks = {} _async_locks = None # No need for async locks in single process mode + _workspace_async_locks = {} # Not used in single process mode, but initialize for consistency _storage_keyed_lock = KeyedUnifiedLock() direct_log(f"Process {os.getpid()} Shared-Data created for Single Process") diff --git a/tests/test_workspace_lock_bugfixes.py b/tests/test_workspace_lock_bugfixes.py new file mode 100644 index 00000000..f6fc1f0a --- /dev/null +++ b/tests/test_workspace_lock_bugfixes.py @@ -0,0 +1,84 @@ +""" +Test bug fixes for workspace lock implementation. + +This test module specifically verifies the bug fixes for: +1. RuntimeError when _registry_guard is None +2. Per-process workspace async locks in multiprocess mode +""" + +import pytest +from unittest.mock import patch +from lightrag.kg.shared_storage import ( + _get_workspace_lock, + initialize_share_data, +) + + +def test_error_when_not_initialized(): + """Test that _get_workspace_lock raises RuntimeError when called before initialize_share_data(). + + Bug Fix: Previously, calling _get_workspace_lock() before initialize_share_data() + would cause TypeError: 'NoneType' object does not support the context manager protocol + because _registry_guard was None. + + Now it should raise a clear RuntimeError with helpful message. + """ + # Mock _is_multiprocess to be True to trigger the bug path + with patch("lightrag.kg.shared_storage._is_multiprocess", True): + with patch("lightrag.kg.shared_storage._registry_guard", None): + with pytest.raises(RuntimeError, match="Shared data not initialized"): + # This should raise RuntimeError, not TypeError + _get_workspace_lock( + "test_lock", None, workspace="tenant1", enable_logging=False + ) + + +def test_workspace_async_locks_per_process(): + """Test that workspace async locks are stored per-process. + + Bug Fix: Previously, workspace async locks were added to _async_locks dict, + which is a regular Python dict in multiprocess mode. This meant modifications + in one process were not visible to other processes. + + Now workspace async locks are stored in _workspace_async_locks, which is + per-process by design (since asyncio.Lock cannot be shared across processes). + """ + from lightrag.kg import shared_storage + + # Initialize in multiprocess mode + initialize_share_data(workers=2) + + # Get a workspace lock + lock1 = shared_storage.get_storage_lock(workspace="tenant1") + + # Verify the workspace async lock exists in _workspace_async_locks + assert "tenant1:storage_lock" in shared_storage._workspace_async_locks + + # Get the same workspace lock again + lock2 = shared_storage.get_storage_lock(workspace="tenant1") + + # Both locks should reference the same async_lock instance + # (within the same process) + assert lock1._async_lock is lock2._async_lock + + # Verify it's the same as the one stored in _workspace_async_locks + assert lock1._async_lock is shared_storage._workspace_async_locks["tenant1:storage_lock"] + + +def test_multiple_workspace_locks_different_async_locks(): + """Test that different workspaces have different async locks.""" + from lightrag.kg import shared_storage + + # Initialize in multiprocess mode + initialize_share_data(workers=2) + + # Get locks for different workspaces + lock1 = shared_storage.get_storage_lock(workspace="tenant1") + lock2 = shared_storage.get_storage_lock(workspace="tenant2") + + # They should have different async_lock instances + assert lock1._async_lock is not lock2._async_lock + + # Verify both are in _workspace_async_locks + assert "tenant1:storage_lock" in shared_storage._workspace_async_locks + assert "tenant2:storage_lock" in shared_storage._workspace_async_locks