Merge pull request #1773 from danielaskdd/cleanup-lock-in-healthcheck
Feat: Added reranker config and lock status to status card of WebUI
This commit is contained in:
commit
0e52582bf2
9 changed files with 447 additions and 176 deletions
|
|
@ -52,6 +52,7 @@ from lightrag.kg.shared_storage import (
|
||||||
get_namespace_data,
|
get_namespace_data,
|
||||||
get_pipeline_status_lock,
|
get_pipeline_status_lock,
|
||||||
initialize_pipeline_status,
|
initialize_pipeline_status,
|
||||||
|
cleanup_keyed_lock,
|
||||||
)
|
)
|
||||||
from fastapi.security import OAuth2PasswordRequestForm
|
from fastapi.security import OAuth2PasswordRequestForm
|
||||||
from lightrag.api.auth import auth_handler
|
from lightrag.api.auth import auth_handler
|
||||||
|
|
@ -486,6 +487,9 @@ def create_app(args):
|
||||||
else:
|
else:
|
||||||
auth_mode = "enabled"
|
auth_mode = "enabled"
|
||||||
|
|
||||||
|
# Cleanup expired keyed locks and get status
|
||||||
|
keyed_lock_info = cleanup_keyed_lock()
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"status": "healthy",
|
"status": "healthy",
|
||||||
"working_directory": str(args.working_dir),
|
"working_directory": str(args.working_dir),
|
||||||
|
|
@ -517,6 +521,7 @@ def create_app(args):
|
||||||
},
|
},
|
||||||
"auth_mode": auth_mode,
|
"auth_mode": auth_mode,
|
||||||
"pipeline_busy": pipeline_status.get("busy", False),
|
"pipeline_busy": pipeline_status.get("busy", False),
|
||||||
|
"keyed_locks": keyed_lock_info,
|
||||||
"core_version": core_version,
|
"core_version": core_version,
|
||||||
"api_version": __api_version__,
|
"api_version": __api_version__,
|
||||||
"webui_title": webui_title,
|
"webui_title": webui_title,
|
||||||
|
|
|
||||||
|
|
@ -280,6 +280,125 @@ def _get_combined_key(factory_name: str, key: str) -> str:
|
||||||
return f"{factory_name}:{key}"
|
return f"{factory_name}:{key}"
|
||||||
|
|
||||||
|
|
||||||
|
def _perform_lock_cleanup(
|
||||||
|
lock_type: str,
|
||||||
|
cleanup_data: Dict[str, float],
|
||||||
|
lock_registry: Optional[Dict[str, Any]],
|
||||||
|
lock_count: Optional[Dict[str, int]],
|
||||||
|
earliest_cleanup_time: Optional[float],
|
||||||
|
last_cleanup_time: Optional[float],
|
||||||
|
current_time: float,
|
||||||
|
threshold_check: bool = True,
|
||||||
|
) -> tuple[int, Optional[float], Optional[float]]:
|
||||||
|
"""
|
||||||
|
Generic lock cleanup function to unify cleanup logic for both multiprocess and async locks.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
lock_type: Lock type identifier ("mp" or "async")
|
||||||
|
cleanup_data: Cleanup data dictionary
|
||||||
|
lock_registry: Lock registry dictionary (can be None for async locks)
|
||||||
|
lock_count: Lock count dictionary (can be None for async locks)
|
||||||
|
earliest_cleanup_time: Earliest cleanup time
|
||||||
|
last_cleanup_time: Last cleanup time
|
||||||
|
current_time: Current time
|
||||||
|
threshold_check: Whether to check threshold condition (default True, set to False in cleanup_expired_locks)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
tuple: (cleaned_count, new_earliest_time, new_last_cleanup_time)
|
||||||
|
"""
|
||||||
|
if len(cleanup_data) == 0:
|
||||||
|
return 0, earliest_cleanup_time, last_cleanup_time
|
||||||
|
|
||||||
|
# If threshold check is needed and threshold not reached, return directly
|
||||||
|
if threshold_check and len(cleanup_data) < CLEANUP_THRESHOLD:
|
||||||
|
return 0, earliest_cleanup_time, last_cleanup_time
|
||||||
|
|
||||||
|
# Time rollback detection
|
||||||
|
if last_cleanup_time is not None and current_time < last_cleanup_time:
|
||||||
|
direct_log(
|
||||||
|
f"== {lock_type} Lock == Time rollback detected, resetting cleanup time",
|
||||||
|
level="WARNING",
|
||||||
|
enable_output=False,
|
||||||
|
)
|
||||||
|
last_cleanup_time = None
|
||||||
|
|
||||||
|
# Check cleanup conditions
|
||||||
|
has_expired_locks = (
|
||||||
|
earliest_cleanup_time is not None
|
||||||
|
and current_time - earliest_cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS
|
||||||
|
)
|
||||||
|
|
||||||
|
interval_satisfied = (
|
||||||
|
last_cleanup_time is None
|
||||||
|
or current_time - last_cleanup_time > MIN_CLEANUP_INTERVAL_SECONDS
|
||||||
|
)
|
||||||
|
|
||||||
|
if not (has_expired_locks and interval_satisfied):
|
||||||
|
return 0, earliest_cleanup_time, last_cleanup_time
|
||||||
|
|
||||||
|
try:
|
||||||
|
cleaned_count = 0
|
||||||
|
new_earliest_time = None
|
||||||
|
|
||||||
|
# Perform cleanup operation
|
||||||
|
for cleanup_key, cleanup_time in list(cleanup_data.items()):
|
||||||
|
if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS:
|
||||||
|
# Remove from cleanup data
|
||||||
|
cleanup_data.pop(cleanup_key, None)
|
||||||
|
|
||||||
|
# Remove from lock registry if exists
|
||||||
|
if lock_registry is not None:
|
||||||
|
lock_registry.pop(cleanup_key, None)
|
||||||
|
if lock_count is not None:
|
||||||
|
lock_count.pop(cleanup_key, None)
|
||||||
|
|
||||||
|
cleaned_count += 1
|
||||||
|
else:
|
||||||
|
# Track the earliest time among remaining locks
|
||||||
|
if new_earliest_time is None or cleanup_time < new_earliest_time:
|
||||||
|
new_earliest_time = cleanup_time
|
||||||
|
|
||||||
|
# Update state only after successful cleanup
|
||||||
|
if cleaned_count > 0:
|
||||||
|
new_last_cleanup_time = current_time
|
||||||
|
|
||||||
|
# Log cleanup results
|
||||||
|
next_cleanup_in = max(
|
||||||
|
(new_earliest_time + CLEANUP_KEYED_LOCKS_AFTER_SECONDS - current_time)
|
||||||
|
if new_earliest_time
|
||||||
|
else float("inf"),
|
||||||
|
MIN_CLEANUP_INTERVAL_SECONDS,
|
||||||
|
)
|
||||||
|
total_cleanup_len = len(cleanup_data)
|
||||||
|
|
||||||
|
if lock_type == "async":
|
||||||
|
direct_log(
|
||||||
|
f"== {lock_type} Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired {lock_type} locks, "
|
||||||
|
f"next cleanup in {next_cleanup_in:.1f}s",
|
||||||
|
enable_output=False,
|
||||||
|
level="INFO",
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
direct_log(
|
||||||
|
f"== {lock_type} Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired locks, "
|
||||||
|
f"next cleanup in {next_cleanup_in:.1f}s",
|
||||||
|
enable_output=False,
|
||||||
|
level="INFO",
|
||||||
|
)
|
||||||
|
|
||||||
|
return cleaned_count, new_earliest_time, new_last_cleanup_time
|
||||||
|
else:
|
||||||
|
return 0, earliest_cleanup_time, last_cleanup_time
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
direct_log(
|
||||||
|
f"== {lock_type} Lock == Cleanup failed: {e}",
|
||||||
|
level="ERROR",
|
||||||
|
enable_output=False,
|
||||||
|
)
|
||||||
|
return 0, earliest_cleanup_time, last_cleanup_time
|
||||||
|
|
||||||
|
|
||||||
def _get_or_create_shared_raw_mp_lock(
|
def _get_or_create_shared_raw_mp_lock(
|
||||||
factory_name: str, key: str
|
factory_name: str, key: str
|
||||||
) -> Optional[mp.synchronize.Lock]:
|
) -> Optional[mp.synchronize.Lock]:
|
||||||
|
|
@ -346,86 +465,22 @@ def _release_shared_raw_mp_lock(factory_name: str, key: str):
|
||||||
):
|
):
|
||||||
_earliest_mp_cleanup_time = current_time
|
_earliest_mp_cleanup_time = current_time
|
||||||
|
|
||||||
# Efficient cleanup triggering with minimum interval control
|
# Use generic cleanup function
|
||||||
total_cleanup_len = len(_lock_cleanup_data)
|
cleaned_count, new_earliest_time, new_last_cleanup_time = _perform_lock_cleanup(
|
||||||
if total_cleanup_len >= CLEANUP_THRESHOLD:
|
lock_type="mp",
|
||||||
# Time rollback detection
|
cleanup_data=_lock_cleanup_data,
|
||||||
if (
|
lock_registry=_lock_registry,
|
||||||
_last_mp_cleanup_time is not None
|
lock_count=_lock_registry_count,
|
||||||
and current_time < _last_mp_cleanup_time
|
earliest_cleanup_time=_earliest_mp_cleanup_time,
|
||||||
):
|
last_cleanup_time=_last_mp_cleanup_time,
|
||||||
direct_log(
|
current_time=current_time,
|
||||||
"== mp Lock == Time rollback detected, resetting cleanup time",
|
threshold_check=True,
|
||||||
level="WARNING",
|
)
|
||||||
enable_output=False,
|
|
||||||
)
|
|
||||||
_last_mp_cleanup_time = None
|
|
||||||
|
|
||||||
# Check cleanup conditions
|
# Update global state if cleanup was performed
|
||||||
has_expired_locks = (
|
if cleaned_count > 0:
|
||||||
_earliest_mp_cleanup_time is not None
|
_earliest_mp_cleanup_time = new_earliest_time
|
||||||
and current_time - _earliest_mp_cleanup_time
|
_last_mp_cleanup_time = new_last_cleanup_time
|
||||||
> CLEANUP_KEYED_LOCKS_AFTER_SECONDS
|
|
||||||
)
|
|
||||||
|
|
||||||
interval_satisfied = (
|
|
||||||
_last_mp_cleanup_time is None
|
|
||||||
or current_time - _last_mp_cleanup_time > MIN_CLEANUP_INTERVAL_SECONDS
|
|
||||||
)
|
|
||||||
|
|
||||||
if has_expired_locks and interval_satisfied:
|
|
||||||
try:
|
|
||||||
cleaned_count = 0
|
|
||||||
new_earliest_time = None
|
|
||||||
|
|
||||||
# Perform cleanup while maintaining the new earliest time
|
|
||||||
# Clean expired locks from all namespaces
|
|
||||||
for cleanup_key, cleanup_time in list(_lock_cleanup_data.items()):
|
|
||||||
if (
|
|
||||||
current_time - cleanup_time
|
|
||||||
> CLEANUP_KEYED_LOCKS_AFTER_SECONDS
|
|
||||||
):
|
|
||||||
_lock_registry.pop(cleanup_key, None)
|
|
||||||
_lock_registry_count.pop(cleanup_key, None)
|
|
||||||
_lock_cleanup_data.pop(cleanup_key, None)
|
|
||||||
cleaned_count += 1
|
|
||||||
else:
|
|
||||||
# Track the earliest time among remaining locks
|
|
||||||
if (
|
|
||||||
new_earliest_time is None
|
|
||||||
or cleanup_time < new_earliest_time
|
|
||||||
):
|
|
||||||
new_earliest_time = cleanup_time
|
|
||||||
|
|
||||||
# Update state only after successful cleanup
|
|
||||||
_earliest_mp_cleanup_time = new_earliest_time
|
|
||||||
_last_mp_cleanup_time = current_time
|
|
||||||
|
|
||||||
if cleaned_count > 0:
|
|
||||||
next_cleanup_in = max(
|
|
||||||
(
|
|
||||||
new_earliest_time
|
|
||||||
+ CLEANUP_KEYED_LOCKS_AFTER_SECONDS
|
|
||||||
- current_time
|
|
||||||
)
|
|
||||||
if new_earliest_time
|
|
||||||
else float("inf"),
|
|
||||||
MIN_CLEANUP_INTERVAL_SECONDS,
|
|
||||||
)
|
|
||||||
direct_log(
|
|
||||||
f"== mp Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired locks, "
|
|
||||||
f"next cleanup in {next_cleanup_in:.1f}s",
|
|
||||||
enable_output=False,
|
|
||||||
level="INFO",
|
|
||||||
)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
direct_log(
|
|
||||||
f"== mp Lock == Cleanup failed: {e}",
|
|
||||||
level="ERROR",
|
|
||||||
enable_output=False,
|
|
||||||
)
|
|
||||||
# Don't update _last_mp_cleanup_time to allow retry
|
|
||||||
|
|
||||||
|
|
||||||
class KeyedUnifiedLock:
|
class KeyedUnifiedLock:
|
||||||
|
|
@ -504,89 +559,22 @@ class KeyedUnifiedLock:
|
||||||
self._earliest_async_cleanup_time = current_time
|
self._earliest_async_cleanup_time = current_time
|
||||||
self._async_lock_count[combined_key] = count
|
self._async_lock_count[combined_key] = count
|
||||||
|
|
||||||
# Efficient cleanup triggering with minimum interval control
|
# Use generic cleanup function
|
||||||
total_cleanup_len = len(self._async_lock_cleanup_data)
|
cleaned_count, new_earliest_time, new_last_cleanup_time = _perform_lock_cleanup(
|
||||||
if total_cleanup_len >= CLEANUP_THRESHOLD:
|
lock_type="async",
|
||||||
# Time rollback detection
|
cleanup_data=self._async_lock_cleanup_data,
|
||||||
if (
|
lock_registry=self._async_lock,
|
||||||
self._last_async_cleanup_time is not None
|
lock_count=self._async_lock_count,
|
||||||
and current_time < self._last_async_cleanup_time
|
earliest_cleanup_time=self._earliest_async_cleanup_time,
|
||||||
):
|
last_cleanup_time=self._last_async_cleanup_time,
|
||||||
direct_log(
|
current_time=current_time,
|
||||||
"== async Lock == Time rollback detected, resetting cleanup time",
|
threshold_check=True,
|
||||||
level="WARNING",
|
)
|
||||||
enable_output=False,
|
|
||||||
)
|
|
||||||
self._last_async_cleanup_time = None
|
|
||||||
|
|
||||||
# Check cleanup conditions
|
# Update instance state if cleanup was performed
|
||||||
has_expired_locks = (
|
if cleaned_count > 0:
|
||||||
self._earliest_async_cleanup_time is not None
|
self._earliest_async_cleanup_time = new_earliest_time
|
||||||
and current_time - self._earliest_async_cleanup_time
|
self._last_async_cleanup_time = new_last_cleanup_time
|
||||||
> CLEANUP_KEYED_LOCKS_AFTER_SECONDS
|
|
||||||
)
|
|
||||||
|
|
||||||
interval_satisfied = (
|
|
||||||
self._last_async_cleanup_time is None
|
|
||||||
or current_time - self._last_async_cleanup_time
|
|
||||||
> MIN_CLEANUP_INTERVAL_SECONDS
|
|
||||||
)
|
|
||||||
|
|
||||||
if has_expired_locks and interval_satisfied:
|
|
||||||
try:
|
|
||||||
cleaned_count = 0
|
|
||||||
new_earliest_time = None
|
|
||||||
|
|
||||||
# Perform cleanup while maintaining the new earliest time
|
|
||||||
# Clean expired async locks from all namespaces
|
|
||||||
for cleanup_key, cleanup_time in list(
|
|
||||||
self._async_lock_cleanup_data.items()
|
|
||||||
):
|
|
||||||
if (
|
|
||||||
current_time - cleanup_time
|
|
||||||
> CLEANUP_KEYED_LOCKS_AFTER_SECONDS
|
|
||||||
):
|
|
||||||
self._async_lock.pop(cleanup_key)
|
|
||||||
self._async_lock_count.pop(cleanup_key)
|
|
||||||
self._async_lock_cleanup_data.pop(cleanup_key)
|
|
||||||
cleaned_count += 1
|
|
||||||
else:
|
|
||||||
# Track the earliest time among remaining locks
|
|
||||||
if (
|
|
||||||
new_earliest_time is None
|
|
||||||
or cleanup_time < new_earliest_time
|
|
||||||
):
|
|
||||||
new_earliest_time = cleanup_time
|
|
||||||
|
|
||||||
# Update state only after successful cleanup
|
|
||||||
self._earliest_async_cleanup_time = new_earliest_time
|
|
||||||
self._last_async_cleanup_time = current_time
|
|
||||||
|
|
||||||
if cleaned_count > 0:
|
|
||||||
next_cleanup_in = max(
|
|
||||||
(
|
|
||||||
new_earliest_time
|
|
||||||
+ CLEANUP_KEYED_LOCKS_AFTER_SECONDS
|
|
||||||
- current_time
|
|
||||||
)
|
|
||||||
if new_earliest_time
|
|
||||||
else float("inf"),
|
|
||||||
MIN_CLEANUP_INTERVAL_SECONDS,
|
|
||||||
)
|
|
||||||
direct_log(
|
|
||||||
f"== async Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired async locks, "
|
|
||||||
f"next cleanup in {next_cleanup_in:.1f}s",
|
|
||||||
enable_output=False,
|
|
||||||
level="INFO",
|
|
||||||
)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
direct_log(
|
|
||||||
f"== async Lock == Cleanup failed: {e}",
|
|
||||||
level="ERROR",
|
|
||||||
enable_output=False,
|
|
||||||
)
|
|
||||||
# Don't update _last_async_cleanup_time to allow retry
|
|
||||||
|
|
||||||
def _get_lock_for_key(
|
def _get_lock_for_key(
|
||||||
self, namespace: str, key: str, enable_logging: bool = False
|
self, namespace: str, key: str, enable_logging: bool = False
|
||||||
|
|
@ -627,6 +615,171 @@ class KeyedUnifiedLock:
|
||||||
self._release_async_lock(combined_key)
|
self._release_async_lock(combined_key)
|
||||||
_release_shared_raw_mp_lock(namespace, key)
|
_release_shared_raw_mp_lock(namespace, key)
|
||||||
|
|
||||||
|
def cleanup_expired_locks(self) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Cleanup expired locks for both async and multiprocess locks following the same
|
||||||
|
conditions as _release_shared_raw_mp_lock and _release_async_lock functions.
|
||||||
|
|
||||||
|
Only performs cleanup when both has_expired_locks and interval_satisfied conditions are met
|
||||||
|
to avoid too frequent cleanup operations.
|
||||||
|
|
||||||
|
Since async and multiprocess locks work together, this method cleans up
|
||||||
|
both types of expired locks and returns comprehensive statistics.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict containing cleanup statistics and current status:
|
||||||
|
{
|
||||||
|
"process_id": 12345,
|
||||||
|
"cleanup_performed": {
|
||||||
|
"mp_cleaned": 5,
|
||||||
|
"async_cleaned": 3
|
||||||
|
},
|
||||||
|
"current_status": {
|
||||||
|
"total_mp_locks": 10,
|
||||||
|
"pending_mp_cleanup": 2,
|
||||||
|
"total_async_locks": 8,
|
||||||
|
"pending_async_cleanup": 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
global _lock_registry, _lock_registry_count, _lock_cleanup_data
|
||||||
|
global _registry_guard, _earliest_mp_cleanup_time, _last_mp_cleanup_time
|
||||||
|
|
||||||
|
cleanup_stats = {"mp_cleaned": 0, "async_cleaned": 0}
|
||||||
|
|
||||||
|
current_time = time.time()
|
||||||
|
|
||||||
|
# 1. Cleanup multiprocess locks using generic function
|
||||||
|
if (
|
||||||
|
_is_multiprocess
|
||||||
|
and _lock_registry is not None
|
||||||
|
and _registry_guard is not None
|
||||||
|
):
|
||||||
|
try:
|
||||||
|
with _registry_guard:
|
||||||
|
if _lock_cleanup_data is not None:
|
||||||
|
# Use generic cleanup function without threshold check
|
||||||
|
cleaned_count, new_earliest_time, new_last_cleanup_time = (
|
||||||
|
_perform_lock_cleanup(
|
||||||
|
lock_type="mp",
|
||||||
|
cleanup_data=_lock_cleanup_data,
|
||||||
|
lock_registry=_lock_registry,
|
||||||
|
lock_count=_lock_registry_count,
|
||||||
|
earliest_cleanup_time=_earliest_mp_cleanup_time,
|
||||||
|
last_cleanup_time=_last_mp_cleanup_time,
|
||||||
|
current_time=current_time,
|
||||||
|
threshold_check=False, # Force cleanup in cleanup_expired_locks
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Update global state if cleanup was performed
|
||||||
|
if cleaned_count > 0:
|
||||||
|
_earliest_mp_cleanup_time = new_earliest_time
|
||||||
|
_last_mp_cleanup_time = new_last_cleanup_time
|
||||||
|
cleanup_stats["mp_cleaned"] = cleaned_count
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
direct_log(
|
||||||
|
f"Error during multiprocess lock cleanup: {e}",
|
||||||
|
level="ERROR",
|
||||||
|
enable_output=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
# 2. Cleanup async locks using generic function
|
||||||
|
try:
|
||||||
|
# Use generic cleanup function without threshold check
|
||||||
|
cleaned_count, new_earliest_time, new_last_cleanup_time = (
|
||||||
|
_perform_lock_cleanup(
|
||||||
|
lock_type="async",
|
||||||
|
cleanup_data=self._async_lock_cleanup_data,
|
||||||
|
lock_registry=self._async_lock,
|
||||||
|
lock_count=self._async_lock_count,
|
||||||
|
earliest_cleanup_time=self._earliest_async_cleanup_time,
|
||||||
|
last_cleanup_time=self._last_async_cleanup_time,
|
||||||
|
current_time=current_time,
|
||||||
|
threshold_check=False, # Force cleanup in cleanup_expired_locks
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Update instance state if cleanup was performed
|
||||||
|
if cleaned_count > 0:
|
||||||
|
self._earliest_async_cleanup_time = new_earliest_time
|
||||||
|
self._last_async_cleanup_time = new_last_cleanup_time
|
||||||
|
cleanup_stats["async_cleaned"] = cleaned_count
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
direct_log(
|
||||||
|
f"Error during async lock cleanup: {e}",
|
||||||
|
level="ERROR",
|
||||||
|
enable_output=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Log cleanup results if any locks were cleaned
|
||||||
|
total_cleaned = cleanup_stats["mp_cleaned"] + cleanup_stats["async_cleaned"]
|
||||||
|
if total_cleaned > 0:
|
||||||
|
direct_log(
|
||||||
|
f"Keyed lock cleanup completed: {total_cleaned} locks cleaned "
|
||||||
|
f"(MP: {cleanup_stats['mp_cleaned']}, Async: {cleanup_stats['async_cleaned']})",
|
||||||
|
level="INFO",
|
||||||
|
enable_output=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
# 3. Get current status after cleanup
|
||||||
|
current_status = self.get_lock_status()
|
||||||
|
|
||||||
|
return {
|
||||||
|
"process_id": os.getpid(),
|
||||||
|
"cleanup_performed": cleanup_stats,
|
||||||
|
"current_status": current_status,
|
||||||
|
}
|
||||||
|
|
||||||
|
def get_lock_status(self) -> Dict[str, int]:
|
||||||
|
"""
|
||||||
|
Get current status of both async and multiprocess locks.
|
||||||
|
|
||||||
|
Returns comprehensive lock counts for both types of locks since
|
||||||
|
they work together in the keyed lock system.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict containing lock counts:
|
||||||
|
{
|
||||||
|
"total_mp_locks": 10,
|
||||||
|
"pending_mp_cleanup": 2,
|
||||||
|
"total_async_locks": 8,
|
||||||
|
"pending_async_cleanup": 1
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
global _lock_registry_count, _lock_cleanup_data, _registry_guard
|
||||||
|
|
||||||
|
status = {
|
||||||
|
"total_mp_locks": 0,
|
||||||
|
"pending_mp_cleanup": 0,
|
||||||
|
"total_async_locks": 0,
|
||||||
|
"pending_async_cleanup": 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Count multiprocess locks
|
||||||
|
if _is_multiprocess and _lock_registry_count is not None:
|
||||||
|
if _registry_guard is not None:
|
||||||
|
with _registry_guard:
|
||||||
|
status["total_mp_locks"] = len(_lock_registry_count)
|
||||||
|
if _lock_cleanup_data is not None:
|
||||||
|
status["pending_mp_cleanup"] = len(_lock_cleanup_data)
|
||||||
|
|
||||||
|
# Count async locks
|
||||||
|
status["total_async_locks"] = len(self._async_lock_count)
|
||||||
|
status["pending_async_cleanup"] = len(self._async_lock_cleanup_data)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
direct_log(
|
||||||
|
f"Error getting keyed lock status: {e}",
|
||||||
|
level="ERROR",
|
||||||
|
enable_output=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
return status
|
||||||
|
|
||||||
|
|
||||||
class _KeyedLockContext:
|
class _KeyedLockContext:
|
||||||
def __init__(
|
def __init__(
|
||||||
|
|
@ -747,6 +900,61 @@ def get_data_init_lock(enable_logging: bool = False) -> UnifiedLock:
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def cleanup_keyed_lock() -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Force cleanup of expired keyed locks and return comprehensive status information.
|
||||||
|
|
||||||
|
This function actively cleans up expired locks for both async and multiprocess locks,
|
||||||
|
then returns detailed statistics about the cleanup operation and current lock status.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Same as cleanup_expired_locks in KeyedUnifiedLock
|
||||||
|
"""
|
||||||
|
global _storage_keyed_lock
|
||||||
|
|
||||||
|
# Check if shared storage is initialized
|
||||||
|
if not _initialized or _storage_keyed_lock is None:
|
||||||
|
return {
|
||||||
|
"process_id": os.getpid(),
|
||||||
|
"cleanup_performed": {"mp_cleaned": 0, "async_cleaned": 0},
|
||||||
|
"current_status": {
|
||||||
|
"total_mp_locks": 0,
|
||||||
|
"pending_mp_cleanup": 0,
|
||||||
|
"total_async_locks": 0,
|
||||||
|
"pending_async_cleanup": 0,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
return _storage_keyed_lock.cleanup_expired_locks()
|
||||||
|
|
||||||
|
|
||||||
|
def get_keyed_lock_status() -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Get current status of keyed locks without performing cleanup.
|
||||||
|
|
||||||
|
This function provides a read-only view of the current lock counts
|
||||||
|
for both multiprocess and async locks, including pending cleanup counts.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Same as get_lock_status in KeyedUnifiedLock
|
||||||
|
"""
|
||||||
|
global _storage_keyed_lock
|
||||||
|
|
||||||
|
# Check if shared storage is initialized
|
||||||
|
if not _initialized or _storage_keyed_lock is None:
|
||||||
|
return {
|
||||||
|
"process_id": os.getpid(),
|
||||||
|
"total_mp_locks": 0,
|
||||||
|
"pending_mp_cleanup": 0,
|
||||||
|
"total_async_locks": 0,
|
||||||
|
"pending_async_cleanup": 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
status = _storage_keyed_lock.get_lock_status()
|
||||||
|
status["process_id"] = os.getpid()
|
||||||
|
return status
|
||||||
|
|
||||||
|
|
||||||
def initialize_share_data(workers: int = 1):
|
def initialize_share_data(workers: int = 1):
|
||||||
"""
|
"""
|
||||||
Initialize shared storage data for single or multi-process mode.
|
Initialize shared storage data for single or multi-process mode.
|
||||||
|
|
|
||||||
|
|
@ -42,12 +42,28 @@ export type LightragStatus = {
|
||||||
vector_storage: string
|
vector_storage: string
|
||||||
workspace?: string
|
workspace?: string
|
||||||
max_graph_nodes?: string
|
max_graph_nodes?: string
|
||||||
|
enable_rerank?: boolean
|
||||||
|
rerank_model?: string | null
|
||||||
|
rerank_binding_host?: string | null
|
||||||
}
|
}
|
||||||
update_status?: Record<string, any>
|
update_status?: Record<string, any>
|
||||||
core_version?: string
|
core_version?: string
|
||||||
api_version?: string
|
api_version?: string
|
||||||
auth_mode?: 'enabled' | 'disabled'
|
auth_mode?: 'enabled' | 'disabled'
|
||||||
pipeline_busy: boolean
|
pipeline_busy: boolean
|
||||||
|
keyed_locks?: {
|
||||||
|
process_id: number
|
||||||
|
cleanup_performed: {
|
||||||
|
mp_cleaned: number
|
||||||
|
async_cleaned: number
|
||||||
|
}
|
||||||
|
current_status: {
|
||||||
|
total_mp_locks: number
|
||||||
|
pending_mp_cleanup: number
|
||||||
|
total_async_locks: number
|
||||||
|
pending_async_cleanup: number
|
||||||
|
}
|
||||||
|
}
|
||||||
webui_title?: string
|
webui_title?: string
|
||||||
webui_description?: string
|
webui_description?: string
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -45,6 +45,18 @@ const StatusCard = ({ status }: { status: LightragStatus | null }) => {
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
|
{status.configuration.enable_rerank && (
|
||||||
|
<div className="space-y-1">
|
||||||
|
<h4 className="font-medium">{t('graphPanel.statusCard.rerankerConfig')}</h4>
|
||||||
|
<div className="text-foreground grid grid-cols-[120px_1fr] gap-1">
|
||||||
|
<span>{t('graphPanel.statusCard.rerankerBindingHost')}:</span>
|
||||||
|
<span>{status.configuration.rerank_binding_host || '-'}</span>
|
||||||
|
<span>{t('graphPanel.statusCard.rerankerModel')}:</span>
|
||||||
|
<span>{status.configuration.rerank_model || '-'}</span>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
)}
|
||||||
|
|
||||||
<div className="space-y-1">
|
<div className="space-y-1">
|
||||||
<h4 className="font-medium">{t('graphPanel.statusCard.storageConfig')}</h4>
|
<h4 className="font-medium">{t('graphPanel.statusCard.storageConfig')}</h4>
|
||||||
<div className="text-foreground grid grid-cols-[120px_1fr] gap-1">
|
<div className="text-foreground grid grid-cols-[120px_1fr] gap-1">
|
||||||
|
|
@ -60,6 +72,16 @@ const StatusCard = ({ status }: { status: LightragStatus | null }) => {
|
||||||
<span>{status.configuration.workspace || '-'}</span>
|
<span>{status.configuration.workspace || '-'}</span>
|
||||||
<span>{t('graphPanel.statusCard.maxGraphNodes')}:</span>
|
<span>{t('graphPanel.statusCard.maxGraphNodes')}:</span>
|
||||||
<span>{status.configuration.max_graph_nodes || '-'}</span>
|
<span>{status.configuration.max_graph_nodes || '-'}</span>
|
||||||
|
{status.keyed_locks && (
|
||||||
|
<>
|
||||||
|
<span>{t('graphPanel.statusCard.lockStatus')}:</span>
|
||||||
|
<span>
|
||||||
|
mp {status.keyed_locks.current_status.pending_mp_cleanup}/{status.keyed_locks.current_status.total_mp_locks} |
|
||||||
|
async {status.keyed_locks.current_status.pending_async_cleanup}/{status.keyed_locks.current_status.total_async_locks}
|
||||||
|
(pid: {status.keyed_locks.process_id})
|
||||||
|
</span>
|
||||||
|
</>
|
||||||
|
)}
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
|
|
|
||||||
|
|
@ -252,12 +252,12 @@
|
||||||
"inputDirectory": "دليل الإدخال",
|
"inputDirectory": "دليل الإدخال",
|
||||||
"llmConfig": "تكوين نموذج اللغة الكبير",
|
"llmConfig": "تكوين نموذج اللغة الكبير",
|
||||||
"llmBinding": "ربط نموذج اللغة الكبير",
|
"llmBinding": "ربط نموذج اللغة الكبير",
|
||||||
"llmBindingHost": "مضيف ربط نموذج اللغة الكبير",
|
"llmBindingHost": "نقطة نهاية نموذج اللغة الكبير",
|
||||||
"llmModel": "نموذج اللغة الكبير",
|
"llmModel": "نموذج اللغة الكبير",
|
||||||
"maxTokens": "أقصى عدد من الرموز",
|
"maxTokens": "أقصى عدد من الرموز",
|
||||||
"embeddingConfig": "تكوين التضمين",
|
"embeddingConfig": "تكوين التضمين",
|
||||||
"embeddingBinding": "ربط التضمين",
|
"embeddingBinding": "ربط التضمين",
|
||||||
"embeddingBindingHost": "مضيف ربط التضمين",
|
"embeddingBindingHost": "نقطة نهاية التضمين",
|
||||||
"embeddingModel": "نموذج التضمين",
|
"embeddingModel": "نموذج التضمين",
|
||||||
"storageConfig": "تكوين التخزين",
|
"storageConfig": "تكوين التخزين",
|
||||||
"kvStorage": "تخزين المفتاح-القيمة",
|
"kvStorage": "تخزين المفتاح-القيمة",
|
||||||
|
|
@ -265,7 +265,11 @@
|
||||||
"graphStorage": "تخزين الرسم البياني",
|
"graphStorage": "تخزين الرسم البياني",
|
||||||
"vectorStorage": "تخزين المتجهات",
|
"vectorStorage": "تخزين المتجهات",
|
||||||
"workspace": "مساحة العمل",
|
"workspace": "مساحة العمل",
|
||||||
"maxGraphNodes": "الحد الأقصى لعقد الرسم البياني"
|
"maxGraphNodes": "الحد الأقصى لعقد الرسم البياني",
|
||||||
|
"rerankerConfig": "تكوين إعادة الترتيب",
|
||||||
|
"rerankerBindingHost": "نقطة نهاية إعادة الترتيب",
|
||||||
|
"rerankerModel": "نموذج إعادة الترتيب",
|
||||||
|
"lockStatus": "حالة القفل"
|
||||||
},
|
},
|
||||||
"propertiesView": {
|
"propertiesView": {
|
||||||
"editProperty": "تعديل {{property}}",
|
"editProperty": "تعديل {{property}}",
|
||||||
|
|
|
||||||
|
|
@ -252,12 +252,12 @@
|
||||||
"inputDirectory": "Input Directory",
|
"inputDirectory": "Input Directory",
|
||||||
"llmConfig": "LLM Configuration",
|
"llmConfig": "LLM Configuration",
|
||||||
"llmBinding": "LLM Binding",
|
"llmBinding": "LLM Binding",
|
||||||
"llmBindingHost": "LLM Binding Host",
|
"llmBindingHost": "LLM Endpoint",
|
||||||
"llmModel": "LLM Model",
|
"llmModel": "LLM Model",
|
||||||
"maxTokens": "Max Tokens",
|
"maxTokens": "Max Tokens",
|
||||||
"embeddingConfig": "Embedding Configuration",
|
"embeddingConfig": "Embedding Configuration",
|
||||||
"embeddingBinding": "Embedding Binding",
|
"embeddingBinding": "Embedding Binding",
|
||||||
"embeddingBindingHost": "Embedding Binding Host",
|
"embeddingBindingHost": "Embedding Endpoint",
|
||||||
"embeddingModel": "Embedding Model",
|
"embeddingModel": "Embedding Model",
|
||||||
"storageConfig": "Storage Configuration",
|
"storageConfig": "Storage Configuration",
|
||||||
"kvStorage": "KV Storage",
|
"kvStorage": "KV Storage",
|
||||||
|
|
@ -265,7 +265,11 @@
|
||||||
"graphStorage": "Graph Storage",
|
"graphStorage": "Graph Storage",
|
||||||
"vectorStorage": "Vector Storage",
|
"vectorStorage": "Vector Storage",
|
||||||
"workspace": "Workspace",
|
"workspace": "Workspace",
|
||||||
"maxGraphNodes": "Max Graph Nodes"
|
"maxGraphNodes": "Max Graph Nodes",
|
||||||
|
"rerankerConfig": "Reranker Configuration",
|
||||||
|
"rerankerBindingHost": "Reranker Endpoint",
|
||||||
|
"rerankerModel": "Reranker Model",
|
||||||
|
"lockStatus": "Lock Status"
|
||||||
},
|
},
|
||||||
"propertiesView": {
|
"propertiesView": {
|
||||||
"editProperty": "Edit {{property}}",
|
"editProperty": "Edit {{property}}",
|
||||||
|
|
|
||||||
|
|
@ -252,12 +252,12 @@
|
||||||
"inputDirectory": "Répertoire d'entrée",
|
"inputDirectory": "Répertoire d'entrée",
|
||||||
"llmConfig": "Configuration du modèle de langage",
|
"llmConfig": "Configuration du modèle de langage",
|
||||||
"llmBinding": "Liaison du modèle de langage",
|
"llmBinding": "Liaison du modèle de langage",
|
||||||
"llmBindingHost": "Hôte de liaison du modèle de langage",
|
"llmBindingHost": "Point de terminaison LLM",
|
||||||
"llmModel": "Modèle de langage",
|
"llmModel": "Modèle de langage",
|
||||||
"maxTokens": "Nombre maximum de jetons",
|
"maxTokens": "Nombre maximum de jetons",
|
||||||
"embeddingConfig": "Configuration d'incorporation",
|
"embeddingConfig": "Configuration d'incorporation",
|
||||||
"embeddingBinding": "Liaison d'incorporation",
|
"embeddingBinding": "Liaison d'incorporation",
|
||||||
"embeddingBindingHost": "Hôte de liaison d'incorporation",
|
"embeddingBindingHost": "Point de terminaison d'incorporation",
|
||||||
"embeddingModel": "Modèle d'incorporation",
|
"embeddingModel": "Modèle d'incorporation",
|
||||||
"storageConfig": "Configuration de stockage",
|
"storageConfig": "Configuration de stockage",
|
||||||
"kvStorage": "Stockage clé-valeur",
|
"kvStorage": "Stockage clé-valeur",
|
||||||
|
|
@ -265,7 +265,11 @@
|
||||||
"graphStorage": "Stockage du graphe",
|
"graphStorage": "Stockage du graphe",
|
||||||
"vectorStorage": "Stockage vectoriel",
|
"vectorStorage": "Stockage vectoriel",
|
||||||
"workspace": "Espace de travail",
|
"workspace": "Espace de travail",
|
||||||
"maxGraphNodes": "Nombre maximum de nœuds du graphe"
|
"maxGraphNodes": "Nombre maximum de nœuds du graphe",
|
||||||
|
"rerankerConfig": "Configuration du reclassement",
|
||||||
|
"rerankerBindingHost": "Point de terminaison de reclassement",
|
||||||
|
"rerankerModel": "Modèle de reclassement",
|
||||||
|
"lockStatus": "État des verrous"
|
||||||
},
|
},
|
||||||
"propertiesView": {
|
"propertiesView": {
|
||||||
"editProperty": "Modifier {{property}}",
|
"editProperty": "Modifier {{property}}",
|
||||||
|
|
|
||||||
|
|
@ -252,12 +252,12 @@
|
||||||
"inputDirectory": "输入目录",
|
"inputDirectory": "输入目录",
|
||||||
"llmConfig": "LLM配置",
|
"llmConfig": "LLM配置",
|
||||||
"llmBinding": "LLM绑定",
|
"llmBinding": "LLM绑定",
|
||||||
"llmBindingHost": "LLM绑定主机",
|
"llmBindingHost": "LLM端点",
|
||||||
"llmModel": "LLM模型",
|
"llmModel": "LLM模型",
|
||||||
"maxTokens": "最大令牌数",
|
"maxTokens": "最大令牌数",
|
||||||
"embeddingConfig": "嵌入配置",
|
"embeddingConfig": "嵌入配置",
|
||||||
"embeddingBinding": "嵌入绑定",
|
"embeddingBinding": "嵌入绑定",
|
||||||
"embeddingBindingHost": "嵌入绑定主机",
|
"embeddingBindingHost": "嵌入端点",
|
||||||
"embeddingModel": "嵌入模型",
|
"embeddingModel": "嵌入模型",
|
||||||
"storageConfig": "存储配置",
|
"storageConfig": "存储配置",
|
||||||
"kvStorage": "KV存储",
|
"kvStorage": "KV存储",
|
||||||
|
|
@ -265,7 +265,11 @@
|
||||||
"graphStorage": "图存储",
|
"graphStorage": "图存储",
|
||||||
"vectorStorage": "向量存储",
|
"vectorStorage": "向量存储",
|
||||||
"workspace": "工作空间",
|
"workspace": "工作空间",
|
||||||
"maxGraphNodes": "最大图节点数"
|
"maxGraphNodes": "最大图节点数",
|
||||||
|
"rerankerConfig": "重排序配置",
|
||||||
|
"rerankerBindingHost": "重排序端点",
|
||||||
|
"rerankerModel": "重排序模型",
|
||||||
|
"lockStatus": "锁状态"
|
||||||
},
|
},
|
||||||
"propertiesView": {
|
"propertiesView": {
|
||||||
"editProperty": "编辑{{property}}",
|
"editProperty": "编辑{{property}}",
|
||||||
|
|
|
||||||
|
|
@ -252,12 +252,12 @@
|
||||||
"inputDirectory": "輸入目錄",
|
"inputDirectory": "輸入目錄",
|
||||||
"llmConfig": "LLM 設定",
|
"llmConfig": "LLM 設定",
|
||||||
"llmBinding": "LLM 綁定",
|
"llmBinding": "LLM 綁定",
|
||||||
"llmBindingHost": "LLM 綁定主機",
|
"llmBindingHost": "LLM 端點",
|
||||||
"llmModel": "LLM 模型",
|
"llmModel": "LLM 模型",
|
||||||
"maxTokens": "最大權杖數",
|
"maxTokens": "最大權杖數",
|
||||||
"embeddingConfig": "嵌入設定",
|
"embeddingConfig": "嵌入設定",
|
||||||
"embeddingBinding": "嵌入綁定",
|
"embeddingBinding": "嵌入綁定",
|
||||||
"embeddingBindingHost": "嵌入綁定主機",
|
"embeddingBindingHost": "嵌入端點",
|
||||||
"embeddingModel": "嵌入模型",
|
"embeddingModel": "嵌入模型",
|
||||||
"storageConfig": "儲存設定",
|
"storageConfig": "儲存設定",
|
||||||
"kvStorage": "KV 儲存",
|
"kvStorage": "KV 儲存",
|
||||||
|
|
@ -265,7 +265,11 @@
|
||||||
"graphStorage": "圖形儲存",
|
"graphStorage": "圖形儲存",
|
||||||
"vectorStorage": "向量儲存",
|
"vectorStorage": "向量儲存",
|
||||||
"workspace": "工作空間",
|
"workspace": "工作空間",
|
||||||
"maxGraphNodes": "最大圖形節點數"
|
"maxGraphNodes": "最大圖形節點數",
|
||||||
|
"rerankerConfig": "重排序設定",
|
||||||
|
"rerankerBindingHost": "重排序端點",
|
||||||
|
"rerankerModel": "重排序模型",
|
||||||
|
"lockStatus": "鎖定狀態"
|
||||||
},
|
},
|
||||||
"propertiesView": {
|
"propertiesView": {
|
||||||
"editProperty": "編輯{{property}}",
|
"editProperty": "編輯{{property}}",
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue