Fix linting

This commit is contained in:
yangdx 2025-07-12 05:17:44 +08:00
parent 964293f21b
commit 5ee509e671

View file

@ -338,42 +338,52 @@ def _release_shared_raw_mp_lock(factory_name: str, key: str):
current_time = time.time() current_time = time.time()
if count == 0: if count == 0:
_lock_cleanup_data[combined_key] = current_time _lock_cleanup_data[combined_key] = current_time
# Update earliest multiprocess cleanup time (only when earlier) # Update earliest multiprocess cleanup time (only when earlier)
if _earliest_mp_cleanup_time is None or current_time < _earliest_mp_cleanup_time: if (
_earliest_mp_cleanup_time is None
or current_time < _earliest_mp_cleanup_time
):
_earliest_mp_cleanup_time = current_time _earliest_mp_cleanup_time = current_time
# Efficient cleanup triggering with minimum interval control # Efficient cleanup triggering with minimum interval control
total_cleanup_len = len(_lock_cleanup_data) total_cleanup_len = len(_lock_cleanup_data)
if total_cleanup_len >= CLEANUP_THRESHOLD: if total_cleanup_len >= CLEANUP_THRESHOLD:
# Time rollback detection # Time rollback detection
if _last_mp_cleanup_time is not None and current_time < _last_mp_cleanup_time: if (
_last_mp_cleanup_time is not None
and current_time < _last_mp_cleanup_time
):
direct_log( direct_log(
"== mp Lock == Time rollback detected, resetting cleanup time", "== mp Lock == Time rollback detected, resetting cleanup time",
level="WARNING", level="WARNING",
enable_output=False, enable_output=False,
) )
_last_mp_cleanup_time = None _last_mp_cleanup_time = None
# Check cleanup conditions # Check cleanup conditions
has_expired_locks = ( has_expired_locks = (
_earliest_mp_cleanup_time is not None and _earliest_mp_cleanup_time is not None
current_time - _earliest_mp_cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS and current_time - _earliest_mp_cleanup_time
> CLEANUP_KEYED_LOCKS_AFTER_SECONDS
) )
interval_satisfied = ( interval_satisfied = (
_last_mp_cleanup_time is None or _last_mp_cleanup_time is None
current_time - _last_mp_cleanup_time > MIN_CLEANUP_INTERVAL_SECONDS or current_time - _last_mp_cleanup_time > MIN_CLEANUP_INTERVAL_SECONDS
) )
if has_expired_locks and interval_satisfied: if has_expired_locks and interval_satisfied:
try: try:
cleaned_count = 0 cleaned_count = 0
new_earliest_time = None new_earliest_time = None
# Perform cleanup while maintaining the new earliest time # Perform cleanup while maintaining the new earliest time
for cleanup_key, cleanup_time in list(_lock_cleanup_data.items()): for cleanup_key, cleanup_time in list(_lock_cleanup_data.items()):
if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: if (
current_time - cleanup_time
> CLEANUP_KEYED_LOCKS_AFTER_SECONDS
):
# Clean expired locks # Clean expired locks
_lock_registry.pop(cleanup_key, None) _lock_registry.pop(cleanup_key, None)
_lock_registry_count.pop(cleanup_key, None) _lock_registry_count.pop(cleanup_key, None)
@ -381,19 +391,26 @@ def _release_shared_raw_mp_lock(factory_name: str, key: str):
cleaned_count += 1 cleaned_count += 1
else: else:
# Track the earliest time among remaining locks # Track the earliest time among remaining locks
if new_earliest_time is None or cleanup_time < new_earliest_time: if (
new_earliest_time is None
or cleanup_time < new_earliest_time
):
new_earliest_time = cleanup_time new_earliest_time = cleanup_time
# Update state only after successful cleanup # Update state only after successful cleanup
_earliest_mp_cleanup_time = new_earliest_time _earliest_mp_cleanup_time = new_earliest_time
_last_mp_cleanup_time = current_time _last_mp_cleanup_time = current_time
if cleaned_count > 0: if cleaned_count > 0:
next_cleanup_in = ( next_cleanup_in = max(
max( (
(new_earliest_time + CLEANUP_KEYED_LOCKS_AFTER_SECONDS - current_time) if new_earliest_time else float('inf'), new_earliest_time
MIN_CLEANUP_INTERVAL_SECONDS + CLEANUP_KEYED_LOCKS_AFTER_SECONDS
- current_time
) )
if new_earliest_time
else float("inf"),
MIN_CLEANUP_INTERVAL_SECONDS,
) )
direct_log( direct_log(
f"== mp Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired locks, " f"== mp Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired locks, "
@ -401,7 +418,7 @@ def _release_shared_raw_mp_lock(factory_name: str, key: str):
enable_output=False, enable_output=False,
level="INFO", level="INFO",
) )
except Exception as e: except Exception as e:
direct_log( direct_log(
f"== mp Lock == Cleanup failed: {e}", f"== mp Lock == Cleanup failed: {e}",
@ -436,8 +453,12 @@ class KeyedUnifiedLock:
self._mp_locks: Dict[ self._mp_locks: Dict[
str, mp.synchronize.Lock str, mp.synchronize.Lock
] = {} # multi-process lock proxies ] = {} # multi-process lock proxies
self._earliest_async_cleanup_time: Optional[float] = None # track earliest async cleanup time self._earliest_async_cleanup_time: Optional[float] = (
self._last_async_cleanup_time: Optional[float] = None # track last async cleanup time for minimum interval None # track earliest async cleanup time
)
self._last_async_cleanup_time: Optional[float] = (
None # track last async cleanup time for minimum interval
)
def __call__(self, keys: list[str], *, enable_logging: Optional[bool] = None): def __call__(self, keys: list[str], *, enable_logging: Optional[bool] = None):
""" """
@ -474,9 +495,12 @@ class KeyedUnifiedLock:
current_time = time.time() current_time = time.time()
if count == 0: if count == 0:
self._async_lock_cleanup_data[key] = current_time self._async_lock_cleanup_data[key] = current_time
# Update earliest async cleanup time (only when earlier) # Update earliest async cleanup time (only when earlier)
if self._earliest_async_cleanup_time is None or current_time < self._earliest_async_cleanup_time: if (
self._earliest_async_cleanup_time is None
or current_time < self._earliest_async_cleanup_time
):
self._earliest_async_cleanup_time = current_time self._earliest_async_cleanup_time = current_time
self._async_lock_count[key] = count self._async_lock_count[key] = count
@ -484,33 +508,43 @@ class KeyedUnifiedLock:
total_cleanup_len = len(self._async_lock_cleanup_data) total_cleanup_len = len(self._async_lock_cleanup_data)
if total_cleanup_len >= CLEANUP_THRESHOLD: if total_cleanup_len >= CLEANUP_THRESHOLD:
# Time rollback detection # Time rollback detection
if self._last_async_cleanup_time is not None and current_time < self._last_async_cleanup_time: if (
self._last_async_cleanup_time is not None
and current_time < self._last_async_cleanup_time
):
direct_log( direct_log(
"== async Lock == Time rollback detected, resetting cleanup time", "== async Lock == Time rollback detected, resetting cleanup time",
level="WARNING", level="WARNING",
enable_output=False, enable_output=False,
) )
self._last_async_cleanup_time = None self._last_async_cleanup_time = None
# Check cleanup conditions # Check cleanup conditions
has_expired_locks = ( has_expired_locks = (
self._earliest_async_cleanup_time is not None and self._earliest_async_cleanup_time is not None
current_time - self._earliest_async_cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS and current_time - self._earliest_async_cleanup_time
> CLEANUP_KEYED_LOCKS_AFTER_SECONDS
) )
interval_satisfied = ( interval_satisfied = (
self._last_async_cleanup_time is None or self._last_async_cleanup_time is None
current_time - self._last_async_cleanup_time > MIN_CLEANUP_INTERVAL_SECONDS or current_time - self._last_async_cleanup_time
> MIN_CLEANUP_INTERVAL_SECONDS
) )
if has_expired_locks and interval_satisfied: if has_expired_locks and interval_satisfied:
try: try:
cleaned_count = 0 cleaned_count = 0
new_earliest_time = None new_earliest_time = None
# Perform cleanup while maintaining the new earliest time # Perform cleanup while maintaining the new earliest time
for cleanup_key, cleanup_time in list(self._async_lock_cleanup_data.items()): for cleanup_key, cleanup_time in list(
if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: self._async_lock_cleanup_data.items()
):
if (
current_time - cleanup_time
> CLEANUP_KEYED_LOCKS_AFTER_SECONDS
):
# Clean expired async locks # Clean expired async locks
self._async_lock.pop(cleanup_key) self._async_lock.pop(cleanup_key)
self._async_lock_count.pop(cleanup_key) self._async_lock_count.pop(cleanup_key)
@ -518,19 +552,26 @@ class KeyedUnifiedLock:
cleaned_count += 1 cleaned_count += 1
else: else:
# Track the earliest time among remaining locks # Track the earliest time among remaining locks
if new_earliest_time is None or cleanup_time < new_earliest_time: if (
new_earliest_time is None
or cleanup_time < new_earliest_time
):
new_earliest_time = cleanup_time new_earliest_time = cleanup_time
# Update state only after successful cleanup # Update state only after successful cleanup
self._earliest_async_cleanup_time = new_earliest_time self._earliest_async_cleanup_time = new_earliest_time
self._last_async_cleanup_time = current_time self._last_async_cleanup_time = current_time
if cleaned_count > 0: if cleaned_count > 0:
next_cleanup_in = ( next_cleanup_in = max(
max( (
(new_earliest_time + CLEANUP_KEYED_LOCKS_AFTER_SECONDS - current_time) if new_earliest_time else float('inf'), new_earliest_time
MIN_CLEANUP_INTERVAL_SECONDS + CLEANUP_KEYED_LOCKS_AFTER_SECONDS
- current_time
) )
if new_earliest_time
else float("inf"),
MIN_CLEANUP_INTERVAL_SECONDS,
) )
direct_log( direct_log(
f"== async Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired async locks, " f"== async Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired async locks, "
@ -538,7 +579,7 @@ class KeyedUnifiedLock:
enable_output=False, enable_output=False,
level="INFO", level="INFO",
) )
except Exception as e: except Exception as e:
direct_log( direct_log(
f"== async Lock == Cleanup failed: {e}", f"== async Lock == Cleanup failed: {e}",
@ -801,7 +842,7 @@ def initialize_share_data(workers: int = 1):
# Initialize multiprocess cleanup times # Initialize multiprocess cleanup times
_earliest_mp_cleanup_time = None _earliest_mp_cleanup_time = None
_last_mp_cleanup_time = None _last_mp_cleanup_time = None
# Mark as initialized # Mark as initialized
_initialized = True _initialized = True