Optimize lock cleanup with time tracking and intervals
- Add cleanup time tracking variables - Implement minimum cleanup intervals - Track earliest cleanup times - Handle time rollback cases - Improve cleanup logging
This commit is contained in:
parent
39965d7ded
commit
964293f21b
1 changed files with 147 additions and 25 deletions
|
|
@ -60,6 +60,12 @@ _registry_guard = None
|
||||||
CLEANUP_KEYED_LOCKS_AFTER_SECONDS = 300
|
CLEANUP_KEYED_LOCKS_AFTER_SECONDS = 300
|
||||||
# Threshold for triggering cleanup - only clean when pending list exceeds this size
|
# Threshold for triggering cleanup - only clean when pending list exceeds this size
|
||||||
CLEANUP_THRESHOLD = 500
|
CLEANUP_THRESHOLD = 500
|
||||||
|
# Minimum interval between cleanup operations in seconds
|
||||||
|
MIN_CLEANUP_INTERVAL_SECONDS = 30
|
||||||
|
# Track the earliest cleanup time for efficient cleanup triggering (multiprocess locks only)
|
||||||
|
_earliest_mp_cleanup_time: Optional[float] = None
|
||||||
|
# Track the last cleanup time to enforce minimum interval (multiprocess locks only)
|
||||||
|
_last_mp_cleanup_time: Optional[float] = None
|
||||||
|
|
||||||
_initialized = None
|
_initialized = None
|
||||||
|
|
||||||
|
|
@ -308,6 +314,8 @@ def _release_shared_raw_mp_lock(factory_name: str, key: str):
|
||||||
if not _is_multiprocess:
|
if not _is_multiprocess:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
global _earliest_mp_cleanup_time, _last_mp_cleanup_time
|
||||||
|
|
||||||
with _registry_guard:
|
with _registry_guard:
|
||||||
combined_key = _get_combined_key(factory_name, key)
|
combined_key = _get_combined_key(factory_name, key)
|
||||||
raw = _lock_registry.get(combined_key)
|
raw = _lock_registry.get(combined_key)
|
||||||
|
|
@ -330,23 +338,77 @@ def _release_shared_raw_mp_lock(factory_name: str, key: str):
|
||||||
current_time = time.time()
|
current_time = time.time()
|
||||||
if count == 0:
|
if count == 0:
|
||||||
_lock_cleanup_data[combined_key] = current_time
|
_lock_cleanup_data[combined_key] = current_time
|
||||||
|
|
||||||
|
# Update earliest multiprocess cleanup time (only when earlier)
|
||||||
|
if _earliest_mp_cleanup_time is None or current_time < _earliest_mp_cleanup_time:
|
||||||
|
_earliest_mp_cleanup_time = current_time
|
||||||
|
|
||||||
# Only perform cleanup when the pending cleanup list exceeds threshold
|
# Efficient cleanup triggering with minimum interval control
|
||||||
total_cleanup_len = len(_lock_cleanup_data)
|
total_cleanup_len = len(_lock_cleanup_data)
|
||||||
if total_cleanup_len >= CLEANUP_THRESHOLD:
|
if total_cleanup_len >= CLEANUP_THRESHOLD:
|
||||||
cleaned_count = 0
|
# Time rollback detection
|
||||||
for cleanup_key, cleanup_time in list(_lock_cleanup_data.items()):
|
if _last_mp_cleanup_time is not None and current_time < _last_mp_cleanup_time:
|
||||||
if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS:
|
|
||||||
_lock_registry.pop(cleanup_key, None)
|
|
||||||
_lock_registry_count.pop(cleanup_key, None)
|
|
||||||
_lock_cleanup_data.pop(cleanup_key, None)
|
|
||||||
cleaned_count += 1
|
|
||||||
if cleaned_count > 0:
|
|
||||||
direct_log(
|
direct_log(
|
||||||
f"== mp Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired locks",
|
"== mp Lock == Time rollback detected, resetting cleanup time",
|
||||||
|
level="WARNING",
|
||||||
enable_output=False,
|
enable_output=False,
|
||||||
level="INFO",
|
|
||||||
)
|
)
|
||||||
|
_last_mp_cleanup_time = None
|
||||||
|
|
||||||
|
# Check cleanup conditions
|
||||||
|
has_expired_locks = (
|
||||||
|
_earliest_mp_cleanup_time is not None and
|
||||||
|
current_time - _earliest_mp_cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS
|
||||||
|
)
|
||||||
|
|
||||||
|
interval_satisfied = (
|
||||||
|
_last_mp_cleanup_time is None or
|
||||||
|
current_time - _last_mp_cleanup_time > MIN_CLEANUP_INTERVAL_SECONDS
|
||||||
|
)
|
||||||
|
|
||||||
|
if has_expired_locks and interval_satisfied:
|
||||||
|
try:
|
||||||
|
cleaned_count = 0
|
||||||
|
new_earliest_time = None
|
||||||
|
|
||||||
|
# Perform cleanup while maintaining the new earliest time
|
||||||
|
for cleanup_key, cleanup_time in list(_lock_cleanup_data.items()):
|
||||||
|
if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS:
|
||||||
|
# Clean expired locks
|
||||||
|
_lock_registry.pop(cleanup_key, None)
|
||||||
|
_lock_registry_count.pop(cleanup_key, None)
|
||||||
|
_lock_cleanup_data.pop(cleanup_key, None)
|
||||||
|
cleaned_count += 1
|
||||||
|
else:
|
||||||
|
# Track the earliest time among remaining locks
|
||||||
|
if new_earliest_time is None or cleanup_time < new_earliest_time:
|
||||||
|
new_earliest_time = cleanup_time
|
||||||
|
|
||||||
|
# Update state only after successful cleanup
|
||||||
|
_earliest_mp_cleanup_time = new_earliest_time
|
||||||
|
_last_mp_cleanup_time = current_time
|
||||||
|
|
||||||
|
if cleaned_count > 0:
|
||||||
|
next_cleanup_in = (
|
||||||
|
max(
|
||||||
|
(new_earliest_time + CLEANUP_KEYED_LOCKS_AFTER_SECONDS - current_time) if new_earliest_time else float('inf'),
|
||||||
|
MIN_CLEANUP_INTERVAL_SECONDS
|
||||||
|
)
|
||||||
|
)
|
||||||
|
direct_log(
|
||||||
|
f"== mp Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired locks, "
|
||||||
|
f"next cleanup in {next_cleanup_in:.1f}s",
|
||||||
|
enable_output=False,
|
||||||
|
level="INFO",
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
direct_log(
|
||||||
|
f"== mp Lock == Cleanup failed: {e}",
|
||||||
|
level="ERROR",
|
||||||
|
enable_output=False,
|
||||||
|
)
|
||||||
|
# Don't update _last_mp_cleanup_time to allow retry
|
||||||
|
|
||||||
|
|
||||||
class KeyedUnifiedLock:
|
class KeyedUnifiedLock:
|
||||||
|
|
@ -374,6 +436,8 @@ class KeyedUnifiedLock:
|
||||||
self._mp_locks: Dict[
|
self._mp_locks: Dict[
|
||||||
str, mp.synchronize.Lock
|
str, mp.synchronize.Lock
|
||||||
] = {} # multi-process lock proxies
|
] = {} # multi-process lock proxies
|
||||||
|
self._earliest_async_cleanup_time: Optional[float] = None # track earliest async cleanup time
|
||||||
|
self._last_async_cleanup_time: Optional[float] = None # track last async cleanup time for minimum interval
|
||||||
|
|
||||||
def __call__(self, keys: list[str], *, enable_logging: Optional[bool] = None):
|
def __call__(self, keys: list[str], *, enable_logging: Optional[bool] = None):
|
||||||
"""
|
"""
|
||||||
|
|
@ -410,26 +474,78 @@ class KeyedUnifiedLock:
|
||||||
current_time = time.time()
|
current_time = time.time()
|
||||||
if count == 0:
|
if count == 0:
|
||||||
self._async_lock_cleanup_data[key] = current_time
|
self._async_lock_cleanup_data[key] = current_time
|
||||||
|
|
||||||
|
# Update earliest async cleanup time (only when earlier)
|
||||||
|
if self._earliest_async_cleanup_time is None or current_time < self._earliest_async_cleanup_time:
|
||||||
|
self._earliest_async_cleanup_time = current_time
|
||||||
self._async_lock_count[key] = count
|
self._async_lock_count[key] = count
|
||||||
|
|
||||||
# Only perform cleanup when the pending cleanup list exceeds threshold
|
# Efficient cleanup triggering with minimum interval control
|
||||||
total_cleanup_len = len(self._async_lock_cleanup_data)
|
total_cleanup_len = len(self._async_lock_cleanup_data)
|
||||||
if total_cleanup_len >= CLEANUP_THRESHOLD:
|
if total_cleanup_len >= CLEANUP_THRESHOLD:
|
||||||
cleaned_count = 0
|
# Time rollback detection
|
||||||
for cleanup_key, cleanup_time in list(
|
if self._last_async_cleanup_time is not None and current_time < self._last_async_cleanup_time:
|
||||||
self._async_lock_cleanup_data.items()
|
|
||||||
):
|
|
||||||
if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS:
|
|
||||||
self._async_lock.pop(cleanup_key)
|
|
||||||
self._async_lock_count.pop(cleanup_key)
|
|
||||||
self._async_lock_cleanup_data.pop(cleanup_key)
|
|
||||||
cleaned_count += 1
|
|
||||||
if cleaned_count > 0:
|
|
||||||
direct_log(
|
direct_log(
|
||||||
f"== async Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired async locks",
|
"== async Lock == Time rollback detected, resetting cleanup time",
|
||||||
|
level="WARNING",
|
||||||
enable_output=False,
|
enable_output=False,
|
||||||
level="INFO",
|
|
||||||
)
|
)
|
||||||
|
self._last_async_cleanup_time = None
|
||||||
|
|
||||||
|
# Check cleanup conditions
|
||||||
|
has_expired_locks = (
|
||||||
|
self._earliest_async_cleanup_time is not None and
|
||||||
|
current_time - self._earliest_async_cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS
|
||||||
|
)
|
||||||
|
|
||||||
|
interval_satisfied = (
|
||||||
|
self._last_async_cleanup_time is None or
|
||||||
|
current_time - self._last_async_cleanup_time > MIN_CLEANUP_INTERVAL_SECONDS
|
||||||
|
)
|
||||||
|
|
||||||
|
if has_expired_locks and interval_satisfied:
|
||||||
|
try:
|
||||||
|
cleaned_count = 0
|
||||||
|
new_earliest_time = None
|
||||||
|
|
||||||
|
# Perform cleanup while maintaining the new earliest time
|
||||||
|
for cleanup_key, cleanup_time in list(self._async_lock_cleanup_data.items()):
|
||||||
|
if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS:
|
||||||
|
# Clean expired async locks
|
||||||
|
self._async_lock.pop(cleanup_key)
|
||||||
|
self._async_lock_count.pop(cleanup_key)
|
||||||
|
self._async_lock_cleanup_data.pop(cleanup_key)
|
||||||
|
cleaned_count += 1
|
||||||
|
else:
|
||||||
|
# Track the earliest time among remaining locks
|
||||||
|
if new_earliest_time is None or cleanup_time < new_earliest_time:
|
||||||
|
new_earliest_time = cleanup_time
|
||||||
|
|
||||||
|
# Update state only after successful cleanup
|
||||||
|
self._earliest_async_cleanup_time = new_earliest_time
|
||||||
|
self._last_async_cleanup_time = current_time
|
||||||
|
|
||||||
|
if cleaned_count > 0:
|
||||||
|
next_cleanup_in = (
|
||||||
|
max(
|
||||||
|
(new_earliest_time + CLEANUP_KEYED_LOCKS_AFTER_SECONDS - current_time) if new_earliest_time else float('inf'),
|
||||||
|
MIN_CLEANUP_INTERVAL_SECONDS
|
||||||
|
)
|
||||||
|
)
|
||||||
|
direct_log(
|
||||||
|
f"== async Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired async locks, "
|
||||||
|
f"next cleanup in {next_cleanup_in:.1f}s",
|
||||||
|
enable_output=False,
|
||||||
|
level="INFO",
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
direct_log(
|
||||||
|
f"== async Lock == Cleanup failed: {e}",
|
||||||
|
level="ERROR",
|
||||||
|
enable_output=False,
|
||||||
|
)
|
||||||
|
# Don't update _last_async_cleanup_time to allow retry
|
||||||
|
|
||||||
def _get_lock_for_key(self, key: str, enable_logging: bool = False) -> UnifiedLock:
|
def _get_lock_for_key(self, key: str, enable_logging: bool = False) -> UnifiedLock:
|
||||||
# 1. get (or create) the per‑process async gate for this key
|
# 1. get (or create) the per‑process async gate for this key
|
||||||
|
|
@ -620,7 +736,9 @@ def initialize_share_data(workers: int = 1):
|
||||||
_initialized, \
|
_initialized, \
|
||||||
_update_flags, \
|
_update_flags, \
|
||||||
_async_locks, \
|
_async_locks, \
|
||||||
_graph_db_lock_keyed
|
_graph_db_lock_keyed, \
|
||||||
|
_earliest_mp_cleanup_time, \
|
||||||
|
_last_mp_cleanup_time
|
||||||
|
|
||||||
# Check if already initialized
|
# Check if already initialized
|
||||||
if _initialized:
|
if _initialized:
|
||||||
|
|
@ -680,6 +798,10 @@ def initialize_share_data(workers: int = 1):
|
||||||
)
|
)
|
||||||
direct_log(f"Process {os.getpid()} Shared-Data created for Single Process")
|
direct_log(f"Process {os.getpid()} Shared-Data created for Single Process")
|
||||||
|
|
||||||
|
# Initialize multiprocess cleanup times
|
||||||
|
_earliest_mp_cleanup_time = None
|
||||||
|
_last_mp_cleanup_time = None
|
||||||
|
|
||||||
# Mark as initialized
|
# Mark as initialized
|
||||||
_initialized = True
|
_initialized = True
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue