From 0e3aaa318fda50f281e4c1671d3ea2be42785280 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sun, 13 Jul 2025 00:09:00 +0800 Subject: [PATCH] Feat: Add keyed lock cleanup and status monitoring --- lightrag/api/lightrag_server.py | 5 + lightrag/kg/shared_storage.py | 530 ++++++++++++++++++++++---------- 2 files changed, 374 insertions(+), 161 deletions(-) diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index b43c66d9..693ed48f 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -52,6 +52,7 @@ from lightrag.kg.shared_storage import ( get_namespace_data, get_pipeline_status_lock, initialize_pipeline_status, + cleanup_keyed_lock, ) from fastapi.security import OAuth2PasswordRequestForm from lightrag.api.auth import auth_handler @@ -486,6 +487,9 @@ def create_app(args): else: auth_mode = "enabled" + # Cleanup expired keyed locks and get status + keyed_lock_info = cleanup_keyed_lock() + return { "status": "healthy", "working_directory": str(args.working_dir), @@ -517,6 +521,7 @@ def create_app(args): }, "auth_mode": auth_mode, "pipeline_busy": pipeline_status.get("busy", False), + "keyed_locks": keyed_lock_info, "core_version": core_version, "api_version": __api_version__, "webui_title": webui_title, diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 9b917850..ce1068cc 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -280,6 +280,125 @@ def _get_combined_key(factory_name: str, key: str) -> str: return f"{factory_name}:{key}" +def _perform_lock_cleanup( + lock_type: str, + cleanup_data: Dict[str, float], + lock_registry: Optional[Dict[str, Any]], + lock_count: Optional[Dict[str, int]], + earliest_cleanup_time: Optional[float], + last_cleanup_time: Optional[float], + current_time: float, + threshold_check: bool = True, +) -> tuple[int, Optional[float], Optional[float]]: + """ + Generic lock cleanup function to unify cleanup logic for both multiprocess and async locks. + + Args: + lock_type: Lock type identifier ("mp" or "async") + cleanup_data: Cleanup data dictionary + lock_registry: Lock registry dictionary (can be None for async locks) + lock_count: Lock count dictionary (can be None for async locks) + earliest_cleanup_time: Earliest cleanup time + last_cleanup_time: Last cleanup time + current_time: Current time + threshold_check: Whether to check threshold condition (default True, set to False in cleanup_expired_locks) + + Returns: + tuple: (cleaned_count, new_earliest_time, new_last_cleanup_time) + """ + if len(cleanup_data) == 0: + return 0, earliest_cleanup_time, last_cleanup_time + + # If threshold check is needed and threshold not reached, return directly + if threshold_check and len(cleanup_data) < CLEANUP_THRESHOLD: + return 0, earliest_cleanup_time, last_cleanup_time + + # Time rollback detection + if last_cleanup_time is not None and current_time < last_cleanup_time: + direct_log( + f"== {lock_type} Lock == Time rollback detected, resetting cleanup time", + level="WARNING", + enable_output=False, + ) + last_cleanup_time = None + + # Check cleanup conditions + has_expired_locks = ( + earliest_cleanup_time is not None + and current_time - earliest_cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS + ) + + interval_satisfied = ( + last_cleanup_time is None + or current_time - last_cleanup_time > MIN_CLEANUP_INTERVAL_SECONDS + ) + + if not (has_expired_locks and interval_satisfied): + return 0, earliest_cleanup_time, last_cleanup_time + + try: + cleaned_count = 0 + new_earliest_time = None + + # Perform cleanup operation + for cleanup_key, cleanup_time in list(cleanup_data.items()): + if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: + # Remove from cleanup data + cleanup_data.pop(cleanup_key, None) + + # Remove from lock registry if exists + if lock_registry is not None: + lock_registry.pop(cleanup_key, None) + if lock_count is not None: + lock_count.pop(cleanup_key, None) + + cleaned_count += 1 + else: + # Track the earliest time among remaining locks + if new_earliest_time is None or cleanup_time < new_earliest_time: + new_earliest_time = cleanup_time + + # Update state only after successful cleanup + if cleaned_count > 0: + new_last_cleanup_time = current_time + + # Log cleanup results + next_cleanup_in = max( + (new_earliest_time + CLEANUP_KEYED_LOCKS_AFTER_SECONDS - current_time) + if new_earliest_time + else float("inf"), + MIN_CLEANUP_INTERVAL_SECONDS, + ) + total_cleanup_len = len(cleanup_data) + + if lock_type == "async": + direct_log( + f"== {lock_type} Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired {lock_type} locks, " + f"next cleanup in {next_cleanup_in:.1f}s", + enable_output=False, + level="INFO", + ) + else: + direct_log( + f"== {lock_type} Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired locks, " + f"next cleanup in {next_cleanup_in:.1f}s", + enable_output=False, + level="INFO", + ) + + return cleaned_count, new_earliest_time, new_last_cleanup_time + else: + return 0, earliest_cleanup_time, last_cleanup_time + + except Exception as e: + direct_log( + f"== {lock_type} Lock == Cleanup failed: {e}", + level="ERROR", + enable_output=False, + ) + return 0, earliest_cleanup_time, last_cleanup_time + + def _get_or_create_shared_raw_mp_lock( factory_name: str, key: str ) -> Optional[mp.synchronize.Lock]: @@ -346,86 +465,22 @@ def _release_shared_raw_mp_lock(factory_name: str, key: str): ): _earliest_mp_cleanup_time = current_time - # Efficient cleanup triggering with minimum interval control - total_cleanup_len = len(_lock_cleanup_data) - if total_cleanup_len >= CLEANUP_THRESHOLD: - # Time rollback detection - if ( - _last_mp_cleanup_time is not None - and current_time < _last_mp_cleanup_time - ): - direct_log( - "== mp Lock == Time rollback detected, resetting cleanup time", - level="WARNING", - enable_output=False, - ) - _last_mp_cleanup_time = None + # Use generic cleanup function + cleaned_count, new_earliest_time, new_last_cleanup_time = _perform_lock_cleanup( + lock_type="mp", + cleanup_data=_lock_cleanup_data, + lock_registry=_lock_registry, + lock_count=_lock_registry_count, + earliest_cleanup_time=_earliest_mp_cleanup_time, + last_cleanup_time=_last_mp_cleanup_time, + current_time=current_time, + threshold_check=True, + ) - # Check cleanup conditions - has_expired_locks = ( - _earliest_mp_cleanup_time is not None - and current_time - _earliest_mp_cleanup_time - > CLEANUP_KEYED_LOCKS_AFTER_SECONDS - ) - - interval_satisfied = ( - _last_mp_cleanup_time is None - or current_time - _last_mp_cleanup_time > MIN_CLEANUP_INTERVAL_SECONDS - ) - - if has_expired_locks and interval_satisfied: - try: - cleaned_count = 0 - new_earliest_time = None - - # Perform cleanup while maintaining the new earliest time - # Clean expired locks from all namespaces - for cleanup_key, cleanup_time in list(_lock_cleanup_data.items()): - if ( - current_time - cleanup_time - > CLEANUP_KEYED_LOCKS_AFTER_SECONDS - ): - _lock_registry.pop(cleanup_key, None) - _lock_registry_count.pop(cleanup_key, None) - _lock_cleanup_data.pop(cleanup_key, None) - cleaned_count += 1 - else: - # Track the earliest time among remaining locks - if ( - new_earliest_time is None - or cleanup_time < new_earliest_time - ): - new_earliest_time = cleanup_time - - # Update state only after successful cleanup - _earliest_mp_cleanup_time = new_earliest_time - _last_mp_cleanup_time = current_time - - if cleaned_count > 0: - next_cleanup_in = max( - ( - new_earliest_time - + CLEANUP_KEYED_LOCKS_AFTER_SECONDS - - current_time - ) - if new_earliest_time - else float("inf"), - MIN_CLEANUP_INTERVAL_SECONDS, - ) - direct_log( - f"== mp Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired locks, " - f"next cleanup in {next_cleanup_in:.1f}s", - enable_output=False, - level="INFO", - ) - - except Exception as e: - direct_log( - f"== mp Lock == Cleanup failed: {e}", - level="ERROR", - enable_output=False, - ) - # Don't update _last_mp_cleanup_time to allow retry + # Update global state if cleanup was performed + if cleaned_count > 0: + _earliest_mp_cleanup_time = new_earliest_time + _last_mp_cleanup_time = new_last_cleanup_time class KeyedUnifiedLock: @@ -504,89 +559,22 @@ class KeyedUnifiedLock: self._earliest_async_cleanup_time = current_time self._async_lock_count[combined_key] = count - # Efficient cleanup triggering with minimum interval control - total_cleanup_len = len(self._async_lock_cleanup_data) - if total_cleanup_len >= CLEANUP_THRESHOLD: - # Time rollback detection - if ( - self._last_async_cleanup_time is not None - and current_time < self._last_async_cleanup_time - ): - direct_log( - "== async Lock == Time rollback detected, resetting cleanup time", - level="WARNING", - enable_output=False, - ) - self._last_async_cleanup_time = None + # Use generic cleanup function + cleaned_count, new_earliest_time, new_last_cleanup_time = _perform_lock_cleanup( + lock_type="async", + cleanup_data=self._async_lock_cleanup_data, + lock_registry=self._async_lock, + lock_count=self._async_lock_count, + earliest_cleanup_time=self._earliest_async_cleanup_time, + last_cleanup_time=self._last_async_cleanup_time, + current_time=current_time, + threshold_check=True, + ) - # Check cleanup conditions - has_expired_locks = ( - self._earliest_async_cleanup_time is not None - and current_time - self._earliest_async_cleanup_time - > CLEANUP_KEYED_LOCKS_AFTER_SECONDS - ) - - interval_satisfied = ( - self._last_async_cleanup_time is None - or current_time - self._last_async_cleanup_time - > MIN_CLEANUP_INTERVAL_SECONDS - ) - - if has_expired_locks and interval_satisfied: - try: - cleaned_count = 0 - new_earliest_time = None - - # Perform cleanup while maintaining the new earliest time - # Clean expired async locks from all namespaces - for cleanup_key, cleanup_time in list( - self._async_lock_cleanup_data.items() - ): - if ( - current_time - cleanup_time - > CLEANUP_KEYED_LOCKS_AFTER_SECONDS - ): - self._async_lock.pop(cleanup_key) - self._async_lock_count.pop(cleanup_key) - self._async_lock_cleanup_data.pop(cleanup_key) - cleaned_count += 1 - else: - # Track the earliest time among remaining locks - if ( - new_earliest_time is None - or cleanup_time < new_earliest_time - ): - new_earliest_time = cleanup_time - - # Update state only after successful cleanup - self._earliest_async_cleanup_time = new_earliest_time - self._last_async_cleanup_time = current_time - - if cleaned_count > 0: - next_cleanup_in = max( - ( - new_earliest_time - + CLEANUP_KEYED_LOCKS_AFTER_SECONDS - - current_time - ) - if new_earliest_time - else float("inf"), - MIN_CLEANUP_INTERVAL_SECONDS, - ) - direct_log( - f"== async Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired async locks, " - f"next cleanup in {next_cleanup_in:.1f}s", - enable_output=False, - level="INFO", - ) - - except Exception as e: - direct_log( - f"== async Lock == Cleanup failed: {e}", - level="ERROR", - enable_output=False, - ) - # Don't update _last_async_cleanup_time to allow retry + # Update instance state if cleanup was performed + if cleaned_count > 0: + self._earliest_async_cleanup_time = new_earliest_time + self._last_async_cleanup_time = new_last_cleanup_time def _get_lock_for_key( self, namespace: str, key: str, enable_logging: bool = False @@ -627,6 +615,171 @@ class KeyedUnifiedLock: self._release_async_lock(combined_key) _release_shared_raw_mp_lock(namespace, key) + def cleanup_expired_locks(self) -> Dict[str, Any]: + """ + Cleanup expired locks for both async and multiprocess locks following the same + conditions as _release_shared_raw_mp_lock and _release_async_lock functions. + + Only performs cleanup when both has_expired_locks and interval_satisfied conditions are met + to avoid too frequent cleanup operations. + + Since async and multiprocess locks work together, this method cleans up + both types of expired locks and returns comprehensive statistics. + + Returns: + Dict containing cleanup statistics and current status: + { + "process_id": 12345, + "cleanup_performed": { + "mp_cleaned": 5, + "async_cleaned": 3 + }, + "current_status": { + "total_mp_locks": 10, + "pending_mp_cleanup": 2, + "total_async_locks": 8, + "pending_async_cleanup": 1 + } + } + """ + global _lock_registry, _lock_registry_count, _lock_cleanup_data + global _registry_guard, _earliest_mp_cleanup_time, _last_mp_cleanup_time + + cleanup_stats = {"mp_cleaned": 0, "async_cleaned": 0} + + current_time = time.time() + + # 1. Cleanup multiprocess locks using generic function + if ( + _is_multiprocess + and _lock_registry is not None + and _registry_guard is not None + ): + try: + with _registry_guard: + if _lock_cleanup_data is not None: + # Use generic cleanup function without threshold check + cleaned_count, new_earliest_time, new_last_cleanup_time = ( + _perform_lock_cleanup( + lock_type="mp", + cleanup_data=_lock_cleanup_data, + lock_registry=_lock_registry, + lock_count=_lock_registry_count, + earliest_cleanup_time=_earliest_mp_cleanup_time, + last_cleanup_time=_last_mp_cleanup_time, + current_time=current_time, + threshold_check=False, # Force cleanup in cleanup_expired_locks + ) + ) + + # Update global state if cleanup was performed + if cleaned_count > 0: + _earliest_mp_cleanup_time = new_earliest_time + _last_mp_cleanup_time = new_last_cleanup_time + cleanup_stats["mp_cleaned"] = cleaned_count + + except Exception as e: + direct_log( + f"Error during multiprocess lock cleanup: {e}", + level="ERROR", + enable_output=False, + ) + + # 2. Cleanup async locks using generic function + try: + # Use generic cleanup function without threshold check + cleaned_count, new_earliest_time, new_last_cleanup_time = ( + _perform_lock_cleanup( + lock_type="async", + cleanup_data=self._async_lock_cleanup_data, + lock_registry=self._async_lock, + lock_count=self._async_lock_count, + earliest_cleanup_time=self._earliest_async_cleanup_time, + last_cleanup_time=self._last_async_cleanup_time, + current_time=current_time, + threshold_check=False, # Force cleanup in cleanup_expired_locks + ) + ) + + # Update instance state if cleanup was performed + if cleaned_count > 0: + self._earliest_async_cleanup_time = new_earliest_time + self._last_async_cleanup_time = new_last_cleanup_time + cleanup_stats["async_cleaned"] = cleaned_count + + except Exception as e: + direct_log( + f"Error during async lock cleanup: {e}", + level="ERROR", + enable_output=False, + ) + + # Log cleanup results if any locks were cleaned + total_cleaned = cleanup_stats["mp_cleaned"] + cleanup_stats["async_cleaned"] + if total_cleaned > 0: + direct_log( + f"Keyed lock cleanup completed: {total_cleaned} locks cleaned " + f"(MP: {cleanup_stats['mp_cleaned']}, Async: {cleanup_stats['async_cleaned']})", + level="INFO", + enable_output=False, + ) + + # 3. Get current status after cleanup + current_status = self.get_lock_status() + + return { + "process_id": os.getpid(), + "cleanup_performed": cleanup_stats, + "current_status": current_status, + } + + def get_lock_status(self) -> Dict[str, int]: + """ + Get current status of both async and multiprocess locks. + + Returns comprehensive lock counts for both types of locks since + they work together in the keyed lock system. + + Returns: + Dict containing lock counts: + { + "total_mp_locks": 10, + "pending_mp_cleanup": 2, + "total_async_locks": 8, + "pending_async_cleanup": 1 + } + """ + global _lock_registry_count, _lock_cleanup_data, _registry_guard + + status = { + "total_mp_locks": 0, + "pending_mp_cleanup": 0, + "total_async_locks": 0, + "pending_async_cleanup": 0, + } + + try: + # Count multiprocess locks + if _is_multiprocess and _lock_registry_count is not None: + if _registry_guard is not None: + with _registry_guard: + status["total_mp_locks"] = len(_lock_registry_count) + if _lock_cleanup_data is not None: + status["pending_mp_cleanup"] = len(_lock_cleanup_data) + + # Count async locks + status["total_async_locks"] = len(self._async_lock_count) + status["pending_async_cleanup"] = len(self._async_lock_cleanup_data) + + except Exception as e: + direct_log( + f"Error getting keyed lock status: {e}", + level="ERROR", + enable_output=False, + ) + + return status + class _KeyedLockContext: def __init__( @@ -747,6 +900,61 @@ def get_data_init_lock(enable_logging: bool = False) -> UnifiedLock: ) +def cleanup_keyed_lock() -> Dict[str, Any]: + """ + Force cleanup of expired keyed locks and return comprehensive status information. + + This function actively cleans up expired locks for both async and multiprocess locks, + then returns detailed statistics about the cleanup operation and current lock status. + + Returns: + Same as cleanup_expired_locks in KeyedUnifiedLock + """ + global _storage_keyed_lock + + # Check if shared storage is initialized + if not _initialized or _storage_keyed_lock is None: + return { + "process_id": os.getpid(), + "cleanup_performed": {"mp_cleaned": 0, "async_cleaned": 0}, + "current_status": { + "total_mp_locks": 0, + "pending_mp_cleanup": 0, + "total_async_locks": 0, + "pending_async_cleanup": 0, + }, + } + + return _storage_keyed_lock.cleanup_expired_locks() + + +def get_keyed_lock_status() -> Dict[str, Any]: + """ + Get current status of keyed locks without performing cleanup. + + This function provides a read-only view of the current lock counts + for both multiprocess and async locks, including pending cleanup counts. + + Returns: + Same as get_lock_status in KeyedUnifiedLock + """ + global _storage_keyed_lock + + # Check if shared storage is initialized + if not _initialized or _storage_keyed_lock is None: + return { + "process_id": os.getpid(), + "total_mp_locks": 0, + "pending_mp_cleanup": 0, + "total_async_locks": 0, + "pending_async_cleanup": 0, + } + + status = _storage_keyed_lock.get_lock_status() + status["process_id"] = os.getpid() + return status + + def initialize_share_data(workers: int = 1): """ Initialize shared storage data for single or multi-process mode.