From 0bd162a416fe497b0e12cbae95c8e1c5aff86c93 Mon Sep 17 00:00:00 2001 From: chengjie Date: Tue, 11 Nov 2025 00:23:42 +0800 Subject: [PATCH] fix: ensure finalize_share_data properly cleans up workspace locks Why this change is needed: The finalize_share_data() function was not properly cleaning up workspace lock-related global variables (_sync_locks, _workspace_async_locks, and lock registry variables). This caused stale references to remain after finalization, leading to EOFError or BrokenPipeError when trying to re-initialize or when processes tried to use locks after the Manager was shut down. How it solves it: 1. Added comprehensive cleanup of all Manager.dict proxies before Manager shutdown (_sync_locks, _lock_registry, _lock_registry_count, _lock_cleanup_data) 2. Added cleanup of per-process _workspace_async_locks dictionary 3. Reset all lock-related globals to None at end of finalization: - _workers, _lock_registry, _lock_registry_count, _lock_cleanup_data - _registry_guard, _storage_keyed_lock, _sync_locks - _workspace_async_locks, _earliest_mp_cleanup_time, _last_mp_cleanup_time Impact: - Prevents EOFError/BrokenPipeError in production deployments - Enables safe re-initialization after finalization - Critical for proper resource cleanup in multi-process deployments - Fixes memory leaks from stale lock references Testing: - Added 3 comprehensive tests in test_finalization_cleanup.py - All 23 workspace lock tests pass (17 original + 3 bug fixes + 3 finalization) - Tests verify clean re-initialization after finalization in both single-process and multiprocess modes --- lightrag/kg/shared_storage.py | 55 ++++++++++++- tests/test_finalization_cleanup.py | 110 ++++++++++++++++++++++++++ tests/test_workspace_lock_bugfixes.py | 8 ++ 3 files changed, 171 insertions(+), 2 deletions(-) create mode 100644 tests/test_finalization_cleanup.py diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 3c24bbdc..70993dfe 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -1542,8 +1542,13 @@ def finalize_share_data(): """ global \ _manager, \ + _workers, \ _is_multiprocess, \ _storage_lock, \ + _lock_registry, \ + _lock_registry_count, \ + _lock_cleanup_data, \ + _registry_guard, \ _internal_lock, \ _pipeline_status_lock, \ _graph_db_lock, \ @@ -1552,7 +1557,12 @@ def finalize_share_data(): _init_flags, \ _initialized, \ _update_flags, \ - _async_locks + _async_locks, \ + _storage_keyed_lock, \ + _sync_locks, \ + _workspace_async_locks, \ + _earliest_mp_cleanup_time, \ + _last_mp_cleanup_time # Check if already initialized if not _initialized: @@ -1597,6 +1607,30 @@ def finalize_share_data(): pass # Ignore any errors during update flags cleanup _update_flags.clear() + # Clear workspace locks (Manager.dict proxy) + if _sync_locks is not None: + try: + _sync_locks.clear() + except Exception: + pass # Ignore any errors during sync locks cleanup + + # Clear lock registry data + if _lock_registry is not None: + try: + _lock_registry.clear() + except Exception: + pass + if _lock_registry_count is not None: + try: + _lock_registry_count.clear() + except Exception: + pass + if _lock_cleanup_data is not None: + try: + _lock_cleanup_data.clear() + except Exception: + pass + # Shut down the Manager - this will automatically clean up all shared resources _manager.shutdown() direct_log(f"Process {os.getpid()} Manager shutdown complete") @@ -1605,8 +1639,16 @@ def finalize_share_data(): f"Process {os.getpid()} Error shutting down Manager: {e}", level="ERROR" ) - # Reset global variables + # Clear per-process workspace async locks (not a Manager object, so clear separately) + if _workspace_async_locks is not None: + try: + _workspace_async_locks.clear() + except Exception: + pass # Ignore any errors during workspace async locks cleanup + + # Reset global variables to None _manager = None + _workers = None _initialized = None _is_multiprocess = None _shared_dicts = None @@ -1618,5 +1660,14 @@ def finalize_share_data(): _data_init_lock = None _update_flags = None _async_locks = None + _storage_keyed_lock = None + _sync_locks = None + _workspace_async_locks = None + _lock_registry = None + _lock_registry_count = None + _lock_cleanup_data = None + _registry_guard = None + _earliest_mp_cleanup_time = None + _last_mp_cleanup_time = None direct_log(f"Process {os.getpid()} storage data finalization complete") diff --git a/tests/test_finalization_cleanup.py b/tests/test_finalization_cleanup.py new file mode 100644 index 00000000..8c851422 --- /dev/null +++ b/tests/test_finalization_cleanup.py @@ -0,0 +1,110 @@ +""" +Test finalization cleanup for workspace locks. + +This test module verifies that finalize_share_data() properly cleans up +all lock-related global variables, including: +- _sync_locks (Manager.dict in multiprocess mode) +- _workspace_async_locks (per-process dict) +- _lock_registry, _lock_registry_count, _lock_cleanup_data +- _storage_keyed_lock + +Bug: Previously, these weren't properly cleaned up, causing EOFError/BrokenPipeError +when re-initializing after finalization. +""" + +import pytest +from lightrag.kg import shared_storage + + +@pytest.fixture(autouse=True) +def cleanup_shared_storage(): + """Ensure shared storage is cleaned up after each test.""" + yield + shared_storage.finalize_share_data() + + +def test_finalization_clears_workspace_locks(): + """Test that finalize_share_data() clears workspace lock dictionaries. + + Bug Fix: Previously, _sync_locks and _workspace_async_locks were not + cleared during finalization, causing stale references to shut-down Manager. + """ + # Initialize in multiprocess mode + shared_storage.initialize_share_data(workers=2) + + # Create some workspace locks + lock1 = shared_storage.get_storage_lock(workspace="tenant1") + lock2 = shared_storage.get_pipeline_status_lock(workspace="tenant2") + + # Verify locks were created + assert "tenant1:storage_lock" in shared_storage._sync_locks + assert "tenant2:pipeline_status_lock" in shared_storage._sync_locks + assert "tenant1:storage_lock" in shared_storage._workspace_async_locks + + # Finalize + shared_storage.finalize_share_data() + + # Verify all lock-related globals are None + assert shared_storage._sync_locks is None + assert shared_storage._workspace_async_locks is None + assert shared_storage._lock_registry is None + assert shared_storage._lock_registry_count is None + assert shared_storage._lock_cleanup_data is None + assert shared_storage._registry_guard is None + assert shared_storage._storage_keyed_lock is None + assert shared_storage._manager is None + + +def test_reinitialize_after_finalization(): + """Test that re-initialization works after finalization. + + Bug Fix: Previously, stale references to shut-down Manager caused + EOFError/BrokenPipeError when creating locks after re-initialization. + """ + # First initialization + shared_storage.initialize_share_data(workers=2) + lock1 = shared_storage.get_storage_lock(workspace="tenant1") + assert "tenant1:storage_lock" in shared_storage._sync_locks + + # Finalize + shared_storage.finalize_share_data() + assert shared_storage._manager is None + + # Re-initialize + shared_storage.initialize_share_data(workers=2) + + # Should work without EOFError/BrokenPipeError + lock2 = shared_storage.get_storage_lock(workspace="tenant2") + assert "tenant2:storage_lock" in shared_storage._sync_locks + + # Clean up + shared_storage.finalize_share_data() + + +def test_single_process_finalization(): + """Test finalization in single-process mode. + + Ensures finalization works correctly when not using multiprocess Manager. + """ + # Initialize in single-process mode + shared_storage.initialize_share_data(workers=1) + + # Create some workspace locks + lock1 = shared_storage.get_storage_lock(workspace="tenant1") + assert "tenant1:storage_lock" in shared_storage._sync_locks + + # Finalize + shared_storage.finalize_share_data() + + # Verify globals are None + assert shared_storage._sync_locks is None + assert shared_storage._workspace_async_locks is None + assert shared_storage._manager is None # Should be None even in single-process + + # Re-initialize should work + shared_storage.initialize_share_data(workers=1) + lock2 = shared_storage.get_storage_lock(workspace="tenant2") + assert "tenant2:storage_lock" in shared_storage._sync_locks + + # Clean up + shared_storage.finalize_share_data() diff --git a/tests/test_workspace_lock_bugfixes.py b/tests/test_workspace_lock_bugfixes.py index f6fc1f0a..1d26981e 100644 --- a/tests/test_workspace_lock_bugfixes.py +++ b/tests/test_workspace_lock_bugfixes.py @@ -11,9 +11,17 @@ from unittest.mock import patch from lightrag.kg.shared_storage import ( _get_workspace_lock, initialize_share_data, + finalize_share_data, ) +@pytest.fixture(autouse=True) +def cleanup_shared_storage(): + """Ensure shared storage is cleaned up after each test.""" + yield + finalize_share_data() + + def test_error_when_not_initialized(): """Test that _get_workspace_lock raises RuntimeError when called before initialize_share_data().