fix: ensure finalize_share_data properly cleans up workspace locks
Why this change is needed:
The finalize_share_data() function was not properly cleaning up workspace
lock-related global variables (_sync_locks, _workspace_async_locks, and
lock registry variables). This caused stale references to remain after
finalization, leading to EOFError or BrokenPipeError when trying to
re-initialize or when processes tried to use locks after the Manager
was shut down.
How it solves it:
1. Added comprehensive cleanup of all Manager.dict proxies before Manager
shutdown (_sync_locks, _lock_registry, _lock_registry_count,
_lock_cleanup_data)
2. Added cleanup of per-process _workspace_async_locks dictionary
3. Reset all lock-related globals to None at end of finalization:
- _workers, _lock_registry, _lock_registry_count, _lock_cleanup_data
- _registry_guard, _storage_keyed_lock, _sync_locks
- _workspace_async_locks, _earliest_mp_cleanup_time,
_last_mp_cleanup_time
Impact:
- Prevents EOFError/BrokenPipeError in production deployments
- Enables safe re-initialization after finalization
- Critical for proper resource cleanup in multi-process deployments
- Fixes memory leaks from stale lock references
Testing:
- Added 3 comprehensive tests in test_finalization_cleanup.py
- All 23 workspace lock tests pass (17 original + 3 bug fixes + 3 finalization)
- Tests verify clean re-initialization after finalization in both
single-process and multiprocess modes
This commit is contained in:
parent
9e3c64df03
commit
0bd162a416
3 changed files with 171 additions and 2 deletions
|
|
@ -1542,8 +1542,13 @@ def finalize_share_data():
|
|||
"""
|
||||
global \
|
||||
_manager, \
|
||||
_workers, \
|
||||
_is_multiprocess, \
|
||||
_storage_lock, \
|
||||
_lock_registry, \
|
||||
_lock_registry_count, \
|
||||
_lock_cleanup_data, \
|
||||
_registry_guard, \
|
||||
_internal_lock, \
|
||||
_pipeline_status_lock, \
|
||||
_graph_db_lock, \
|
||||
|
|
@ -1552,7 +1557,12 @@ def finalize_share_data():
|
|||
_init_flags, \
|
||||
_initialized, \
|
||||
_update_flags, \
|
||||
_async_locks
|
||||
_async_locks, \
|
||||
_storage_keyed_lock, \
|
||||
_sync_locks, \
|
||||
_workspace_async_locks, \
|
||||
_earliest_mp_cleanup_time, \
|
||||
_last_mp_cleanup_time
|
||||
|
||||
# Check if already initialized
|
||||
if not _initialized:
|
||||
|
|
@ -1597,6 +1607,30 @@ def finalize_share_data():
|
|||
pass # Ignore any errors during update flags cleanup
|
||||
_update_flags.clear()
|
||||
|
||||
# Clear workspace locks (Manager.dict proxy)
|
||||
if _sync_locks is not None:
|
||||
try:
|
||||
_sync_locks.clear()
|
||||
except Exception:
|
||||
pass # Ignore any errors during sync locks cleanup
|
||||
|
||||
# Clear lock registry data
|
||||
if _lock_registry is not None:
|
||||
try:
|
||||
_lock_registry.clear()
|
||||
except Exception:
|
||||
pass
|
||||
if _lock_registry_count is not None:
|
||||
try:
|
||||
_lock_registry_count.clear()
|
||||
except Exception:
|
||||
pass
|
||||
if _lock_cleanup_data is not None:
|
||||
try:
|
||||
_lock_cleanup_data.clear()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Shut down the Manager - this will automatically clean up all shared resources
|
||||
_manager.shutdown()
|
||||
direct_log(f"Process {os.getpid()} Manager shutdown complete")
|
||||
|
|
@ -1605,8 +1639,16 @@ def finalize_share_data():
|
|||
f"Process {os.getpid()} Error shutting down Manager: {e}", level="ERROR"
|
||||
)
|
||||
|
||||
# Reset global variables
|
||||
# Clear per-process workspace async locks (not a Manager object, so clear separately)
|
||||
if _workspace_async_locks is not None:
|
||||
try:
|
||||
_workspace_async_locks.clear()
|
||||
except Exception:
|
||||
pass # Ignore any errors during workspace async locks cleanup
|
||||
|
||||
# Reset global variables to None
|
||||
_manager = None
|
||||
_workers = None
|
||||
_initialized = None
|
||||
_is_multiprocess = None
|
||||
_shared_dicts = None
|
||||
|
|
@ -1618,5 +1660,14 @@ def finalize_share_data():
|
|||
_data_init_lock = None
|
||||
_update_flags = None
|
||||
_async_locks = None
|
||||
_storage_keyed_lock = None
|
||||
_sync_locks = None
|
||||
_workspace_async_locks = None
|
||||
_lock_registry = None
|
||||
_lock_registry_count = None
|
||||
_lock_cleanup_data = None
|
||||
_registry_guard = None
|
||||
_earliest_mp_cleanup_time = None
|
||||
_last_mp_cleanup_time = None
|
||||
|
||||
direct_log(f"Process {os.getpid()} storage data finalization complete")
|
||||
|
|
|
|||
110
tests/test_finalization_cleanup.py
Normal file
110
tests/test_finalization_cleanup.py
Normal file
|
|
@ -0,0 +1,110 @@
|
|||
"""
|
||||
Test finalization cleanup for workspace locks.
|
||||
|
||||
This test module verifies that finalize_share_data() properly cleans up
|
||||
all lock-related global variables, including:
|
||||
- _sync_locks (Manager.dict in multiprocess mode)
|
||||
- _workspace_async_locks (per-process dict)
|
||||
- _lock_registry, _lock_registry_count, _lock_cleanup_data
|
||||
- _storage_keyed_lock
|
||||
|
||||
Bug: Previously, these weren't properly cleaned up, causing EOFError/BrokenPipeError
|
||||
when re-initializing after finalization.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from lightrag.kg import shared_storage
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def cleanup_shared_storage():
|
||||
"""Ensure shared storage is cleaned up after each test."""
|
||||
yield
|
||||
shared_storage.finalize_share_data()
|
||||
|
||||
|
||||
def test_finalization_clears_workspace_locks():
|
||||
"""Test that finalize_share_data() clears workspace lock dictionaries.
|
||||
|
||||
Bug Fix: Previously, _sync_locks and _workspace_async_locks were not
|
||||
cleared during finalization, causing stale references to shut-down Manager.
|
||||
"""
|
||||
# Initialize in multiprocess mode
|
||||
shared_storage.initialize_share_data(workers=2)
|
||||
|
||||
# Create some workspace locks
|
||||
lock1 = shared_storage.get_storage_lock(workspace="tenant1")
|
||||
lock2 = shared_storage.get_pipeline_status_lock(workspace="tenant2")
|
||||
|
||||
# Verify locks were created
|
||||
assert "tenant1:storage_lock" in shared_storage._sync_locks
|
||||
assert "tenant2:pipeline_status_lock" in shared_storage._sync_locks
|
||||
assert "tenant1:storage_lock" in shared_storage._workspace_async_locks
|
||||
|
||||
# Finalize
|
||||
shared_storage.finalize_share_data()
|
||||
|
||||
# Verify all lock-related globals are None
|
||||
assert shared_storage._sync_locks is None
|
||||
assert shared_storage._workspace_async_locks is None
|
||||
assert shared_storage._lock_registry is None
|
||||
assert shared_storage._lock_registry_count is None
|
||||
assert shared_storage._lock_cleanup_data is None
|
||||
assert shared_storage._registry_guard is None
|
||||
assert shared_storage._storage_keyed_lock is None
|
||||
assert shared_storage._manager is None
|
||||
|
||||
|
||||
def test_reinitialize_after_finalization():
|
||||
"""Test that re-initialization works after finalization.
|
||||
|
||||
Bug Fix: Previously, stale references to shut-down Manager caused
|
||||
EOFError/BrokenPipeError when creating locks after re-initialization.
|
||||
"""
|
||||
# First initialization
|
||||
shared_storage.initialize_share_data(workers=2)
|
||||
lock1 = shared_storage.get_storage_lock(workspace="tenant1")
|
||||
assert "tenant1:storage_lock" in shared_storage._sync_locks
|
||||
|
||||
# Finalize
|
||||
shared_storage.finalize_share_data()
|
||||
assert shared_storage._manager is None
|
||||
|
||||
# Re-initialize
|
||||
shared_storage.initialize_share_data(workers=2)
|
||||
|
||||
# Should work without EOFError/BrokenPipeError
|
||||
lock2 = shared_storage.get_storage_lock(workspace="tenant2")
|
||||
assert "tenant2:storage_lock" in shared_storage._sync_locks
|
||||
|
||||
# Clean up
|
||||
shared_storage.finalize_share_data()
|
||||
|
||||
|
||||
def test_single_process_finalization():
|
||||
"""Test finalization in single-process mode.
|
||||
|
||||
Ensures finalization works correctly when not using multiprocess Manager.
|
||||
"""
|
||||
# Initialize in single-process mode
|
||||
shared_storage.initialize_share_data(workers=1)
|
||||
|
||||
# Create some workspace locks
|
||||
lock1 = shared_storage.get_storage_lock(workspace="tenant1")
|
||||
assert "tenant1:storage_lock" in shared_storage._sync_locks
|
||||
|
||||
# Finalize
|
||||
shared_storage.finalize_share_data()
|
||||
|
||||
# Verify globals are None
|
||||
assert shared_storage._sync_locks is None
|
||||
assert shared_storage._workspace_async_locks is None
|
||||
assert shared_storage._manager is None # Should be None even in single-process
|
||||
|
||||
# Re-initialize should work
|
||||
shared_storage.initialize_share_data(workers=1)
|
||||
lock2 = shared_storage.get_storage_lock(workspace="tenant2")
|
||||
assert "tenant2:storage_lock" in shared_storage._sync_locks
|
||||
|
||||
# Clean up
|
||||
shared_storage.finalize_share_data()
|
||||
|
|
@ -11,9 +11,17 @@ from unittest.mock import patch
|
|||
from lightrag.kg.shared_storage import (
|
||||
_get_workspace_lock,
|
||||
initialize_share_data,
|
||||
finalize_share_data,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def cleanup_shared_storage():
|
||||
"""Ensure shared storage is cleaned up after each test."""
|
||||
yield
|
||||
finalize_share_data()
|
||||
|
||||
|
||||
def test_error_when_not_initialized():
|
||||
"""Test that _get_workspace_lock raises RuntimeError when called before initialize_share_data().
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue