diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 70993dfe..f6c9486d 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -1051,7 +1051,7 @@ def _get_workspace_lock( lock_type: str, global_lock: LockType, workspace: str = "", - enable_logging: bool = False + enable_logging: bool = False, ) -> UnifiedLock: """Internal common implementation for workspace-aware locks. @@ -1126,7 +1126,9 @@ def get_internal_lock(workspace: str = "", enable_logging: bool = False) -> Unif workspace: Optional workspace identifier for namespace isolation. enable_logging: Enable lock operation logging. """ - return _get_workspace_lock("internal_lock", _internal_lock, workspace, enable_logging) + return _get_workspace_lock( + "internal_lock", _internal_lock, workspace, enable_logging + ) def get_storage_lock(workspace: str = "", enable_logging: bool = False) -> UnifiedLock: @@ -1136,12 +1138,12 @@ def get_storage_lock(workspace: str = "", enable_logging: bool = False) -> Unifi workspace: Optional workspace identifier for namespace isolation. enable_logging: Enable lock operation logging. """ - return _get_workspace_lock( - "storage_lock", _storage_lock, workspace, enable_logging - ) + return _get_workspace_lock("storage_lock", _storage_lock, workspace, enable_logging) -def get_pipeline_status_lock(workspace: str = "", enable_logging: bool = False) -> UnifiedLock: +def get_pipeline_status_lock( + workspace: str = "", enable_logging: bool = False +) -> UnifiedLock: """Return unified pipeline status lock for concurrent processing control. Args: diff --git a/tests/test_finalization_cleanup.py b/tests/test_finalization_cleanup.py index 8c851422..29b65bf8 100644 --- a/tests/test_finalization_cleanup.py +++ b/tests/test_finalization_cleanup.py @@ -33,8 +33,8 @@ def test_finalization_clears_workspace_locks(): shared_storage.initialize_share_data(workers=2) # Create some workspace locks - lock1 = shared_storage.get_storage_lock(workspace="tenant1") - lock2 = shared_storage.get_pipeline_status_lock(workspace="tenant2") + _ = shared_storage.get_storage_lock(workspace="tenant1") + _ = shared_storage.get_pipeline_status_lock(workspace="tenant2") # Verify locks were created assert "tenant1:storage_lock" in shared_storage._sync_locks @@ -63,7 +63,7 @@ def test_reinitialize_after_finalization(): """ # First initialization shared_storage.initialize_share_data(workers=2) - lock1 = shared_storage.get_storage_lock(workspace="tenant1") + _ = shared_storage.get_storage_lock(workspace="tenant1") assert "tenant1:storage_lock" in shared_storage._sync_locks # Finalize @@ -74,7 +74,7 @@ def test_reinitialize_after_finalization(): shared_storage.initialize_share_data(workers=2) # Should work without EOFError/BrokenPipeError - lock2 = shared_storage.get_storage_lock(workspace="tenant2") + _ = shared_storage.get_storage_lock(workspace="tenant2") assert "tenant2:storage_lock" in shared_storage._sync_locks # Clean up @@ -90,7 +90,7 @@ def test_single_process_finalization(): shared_storage.initialize_share_data(workers=1) # Create some workspace locks - lock1 = shared_storage.get_storage_lock(workspace="tenant1") + _ = shared_storage.get_storage_lock(workspace="tenant1") assert "tenant1:storage_lock" in shared_storage._sync_locks # Finalize @@ -103,7 +103,7 @@ def test_single_process_finalization(): # Re-initialize should work shared_storage.initialize_share_data(workers=1) - lock2 = shared_storage.get_storage_lock(workspace="tenant2") + _ = shared_storage.get_storage_lock(workspace="tenant2") assert "tenant2:storage_lock" in shared_storage._sync_locks # Clean up