fix: critical bugs in workspace lock multiprocess synchronization
Bug 1a - RuntimeError when _registry_guard is None: - Added explicit check for _registry_guard initialization - Now raises clear RuntimeError instead of cryptic TypeError - Helps users understand they need to call initialize_share_data() first Bug 1b - Workspace async_locks not visible across processes: - Created new _workspace_async_locks dict for per-process storage - Fixed issue where async_locks modifications in one process were invisible to others - This is correct design since asyncio.Lock objects cannot be pickled/shared Why per-process async_locks: - asyncio.Lock objects cannot be shared across processes - Each process needs its own asyncio.Lock instances for coroutine sync - Cross-process sync is handled by Manager.RLock() in _sync_locks - Within-process async sync is handled by per-process asyncio.Lock Testing: - All 17 existing workspace lock tests pass - Added 3 new tests specifically for bug verification - Total 20 tests passing Impact: - Fixes potential race conditions in multiprocess scenarios - Ensures proper synchronization both across and within processes - Maintains backward compatibility
This commit is contained in:
parent
27de78113d
commit
9e3c64df03
2 changed files with 106 additions and 4 deletions
|
|
@ -94,6 +94,10 @@ _sync_locks: dict = {}
|
|||
# async locks for coroutine synchronization in multiprocess mode
|
||||
_async_locks: Optional[Dict[str, asyncio.Lock]] = None
|
||||
|
||||
# Per-process dictionary to store workspace-specific async locks
|
||||
# This MUST be per-process because asyncio.Lock objects cannot be shared across processes
|
||||
_workspace_async_locks: Dict[str, asyncio.Lock] = {}
|
||||
|
||||
_debug_n_locks_acquired: int = 0
|
||||
|
||||
|
||||
|
|
@ -1074,19 +1078,28 @@ def _get_workspace_lock(
|
|||
if workspace:
|
||||
# Workspace-specific lock (lazy creation with synchronization)
|
||||
if _is_multiprocess:
|
||||
# Check if shared data is initialized
|
||||
if _registry_guard is None:
|
||||
raise RuntimeError(
|
||||
"Shared data not initialized. Call initialize_share_data() first."
|
||||
)
|
||||
|
||||
# Use registry guard to protect lazy creation in multiprocess mode
|
||||
with _registry_guard:
|
||||
if lock_name not in _sync_locks:
|
||||
_sync_locks[lock_name] = _manager.RLock()
|
||||
# Create companion async lock for this workspace lock
|
||||
if _async_locks is not None and lock_name not in _async_locks:
|
||||
_async_locks[lock_name] = asyncio.Lock()
|
||||
|
||||
# Create per-process async lock for workspace (cannot be shared across processes)
|
||||
if lock_name not in _workspace_async_locks:
|
||||
_workspace_async_locks[lock_name] = asyncio.Lock()
|
||||
|
||||
async_lock = _workspace_async_locks[lock_name]
|
||||
else:
|
||||
# Single-process mode - no synchronization needed due to asyncio nature
|
||||
if lock_name not in _sync_locks:
|
||||
_sync_locks[lock_name] = asyncio.Lock()
|
||||
|
||||
async_lock = _async_locks.get(lock_name) if _is_multiprocess else None
|
||||
async_lock = None
|
||||
return UnifiedLock(
|
||||
lock=_sync_locks[lock_name],
|
||||
is_async=not _is_multiprocess,
|
||||
|
|
@ -1271,6 +1284,7 @@ def initialize_share_data(workers: int = 1):
|
|||
_async_locks, \
|
||||
_storage_keyed_lock, \
|
||||
_sync_locks, \
|
||||
_workspace_async_locks, \
|
||||
_earliest_mp_cleanup_time, \
|
||||
_last_mp_cleanup_time
|
||||
|
||||
|
|
@ -1311,6 +1325,9 @@ def initialize_share_data(workers: int = 1):
|
|||
"data_init_lock": asyncio.Lock(),
|
||||
}
|
||||
|
||||
# Initialize per-process workspace async locks
|
||||
_workspace_async_locks = {}
|
||||
|
||||
direct_log(
|
||||
f"Process {os.getpid()} Shared-Data created for Multiple Process (workers={workers})"
|
||||
)
|
||||
|
|
@ -1326,6 +1343,7 @@ def initialize_share_data(workers: int = 1):
|
|||
_update_flags = {}
|
||||
_sync_locks = {}
|
||||
_async_locks = None # No need for async locks in single process mode
|
||||
_workspace_async_locks = {} # Not used in single process mode, but initialize for consistency
|
||||
|
||||
_storage_keyed_lock = KeyedUnifiedLock()
|
||||
direct_log(f"Process {os.getpid()} Shared-Data created for Single Process")
|
||||
|
|
|
|||
84
tests/test_workspace_lock_bugfixes.py
Normal file
84
tests/test_workspace_lock_bugfixes.py
Normal file
|
|
@ -0,0 +1,84 @@
|
|||
"""
|
||||
Test bug fixes for workspace lock implementation.
|
||||
|
||||
This test module specifically verifies the bug fixes for:
|
||||
1. RuntimeError when _registry_guard is None
|
||||
2. Per-process workspace async locks in multiprocess mode
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from unittest.mock import patch
|
||||
from lightrag.kg.shared_storage import (
|
||||
_get_workspace_lock,
|
||||
initialize_share_data,
|
||||
)
|
||||
|
||||
|
||||
def test_error_when_not_initialized():
|
||||
"""Test that _get_workspace_lock raises RuntimeError when called before initialize_share_data().
|
||||
|
||||
Bug Fix: Previously, calling _get_workspace_lock() before initialize_share_data()
|
||||
would cause TypeError: 'NoneType' object does not support the context manager protocol
|
||||
because _registry_guard was None.
|
||||
|
||||
Now it should raise a clear RuntimeError with helpful message.
|
||||
"""
|
||||
# Mock _is_multiprocess to be True to trigger the bug path
|
||||
with patch("lightrag.kg.shared_storage._is_multiprocess", True):
|
||||
with patch("lightrag.kg.shared_storage._registry_guard", None):
|
||||
with pytest.raises(RuntimeError, match="Shared data not initialized"):
|
||||
# This should raise RuntimeError, not TypeError
|
||||
_get_workspace_lock(
|
||||
"test_lock", None, workspace="tenant1", enable_logging=False
|
||||
)
|
||||
|
||||
|
||||
def test_workspace_async_locks_per_process():
|
||||
"""Test that workspace async locks are stored per-process.
|
||||
|
||||
Bug Fix: Previously, workspace async locks were added to _async_locks dict,
|
||||
which is a regular Python dict in multiprocess mode. This meant modifications
|
||||
in one process were not visible to other processes.
|
||||
|
||||
Now workspace async locks are stored in _workspace_async_locks, which is
|
||||
per-process by design (since asyncio.Lock cannot be shared across processes).
|
||||
"""
|
||||
from lightrag.kg import shared_storage
|
||||
|
||||
# Initialize in multiprocess mode
|
||||
initialize_share_data(workers=2)
|
||||
|
||||
# Get a workspace lock
|
||||
lock1 = shared_storage.get_storage_lock(workspace="tenant1")
|
||||
|
||||
# Verify the workspace async lock exists in _workspace_async_locks
|
||||
assert "tenant1:storage_lock" in shared_storage._workspace_async_locks
|
||||
|
||||
# Get the same workspace lock again
|
||||
lock2 = shared_storage.get_storage_lock(workspace="tenant1")
|
||||
|
||||
# Both locks should reference the same async_lock instance
|
||||
# (within the same process)
|
||||
assert lock1._async_lock is lock2._async_lock
|
||||
|
||||
# Verify it's the same as the one stored in _workspace_async_locks
|
||||
assert lock1._async_lock is shared_storage._workspace_async_locks["tenant1:storage_lock"]
|
||||
|
||||
|
||||
def test_multiple_workspace_locks_different_async_locks():
|
||||
"""Test that different workspaces have different async locks."""
|
||||
from lightrag.kg import shared_storage
|
||||
|
||||
# Initialize in multiprocess mode
|
||||
initialize_share_data(workers=2)
|
||||
|
||||
# Get locks for different workspaces
|
||||
lock1 = shared_storage.get_storage_lock(workspace="tenant1")
|
||||
lock2 = shared_storage.get_storage_lock(workspace="tenant2")
|
||||
|
||||
# They should have different async_lock instances
|
||||
assert lock1._async_lock is not lock2._async_lock
|
||||
|
||||
# Verify both are in _workspace_async_locks
|
||||
assert "tenant1:storage_lock" in shared_storage._workspace_async_locks
|
||||
assert "tenant2:storage_lock" in shared_storage._workspace_async_locks
|
||||
Loading…
Add table
Reference in a new issue