Optimize log messages

This commit is contained in:
yangdx 2025-07-12 02:41:31 +08:00
parent a64c767298
commit 22c36f2fd2
2 changed files with 52 additions and 33 deletions

View file

@ -5,21 +5,42 @@ import multiprocessing as mp
from multiprocessing.synchronize import Lock as ProcessLock from multiprocessing.synchronize import Lock as ProcessLock
from multiprocessing import Manager from multiprocessing import Manager
import time import time
import logging
from typing import Any, Dict, List, Optional, Union, TypeVar, Generic from typing import Any, Dict, List, Optional, Union, TypeVar, Generic
# Define a direct print function for critical logs that must be visible in all processes # Define a direct print function for critical logs that must be visible in all processes
def direct_log(message, level="INFO", enable_output: bool = True): def direct_log(message, enable_output: bool = True, level: str = "DEBUG"):
""" """
Log a message directly to stderr to ensure visibility in all processes, Log a message directly to stderr to ensure visibility in all processes,
including the Gunicorn master process. including the Gunicorn master process.
Args: Args:
message: The message to log message: The message to log
level: Log level (default: "INFO") level: Log level (default: "DEBUG")
enable_output: Whether to actually output the log (default: True) enable_output: Whether to actually output the log (default: True)
""" """
if enable_output: # Get the current logger level from the lightrag logger
try:
from lightrag.utils import logger
current_level = logger.getEffectiveLevel()
except ImportError:
# Fallback if lightrag.utils is not available
current_level = logging.INFO
# Convert string level to numeric level for comparison
level_mapping = {
"DEBUG": logging.DEBUG, # 10
"INFO": logging.INFO, # 20
"WARNING": logging.WARNING, # 30
"ERROR": logging.ERROR, # 40
"CRITICAL": logging.CRITICAL, # 50
}
message_level = level_mapping.get(level.upper(), logging.DEBUG)
# print(f"Diret_log: {level.upper()} {message_level} ? {current_level}", file=sys.stderr, flush=True)
if enable_output or (message_level >= current_level):
print(f"{level}: {message}", file=sys.stderr, flush=True) print(f"{level}: {message}", file=sys.stderr, flush=True)
@ -67,11 +88,7 @@ def inc_debug_n_locks_acquired():
global _debug_n_locks_acquired global _debug_n_locks_acquired
if DEBUG_LOCKS: if DEBUG_LOCKS:
_debug_n_locks_acquired += 1 _debug_n_locks_acquired += 1
print( print(f"DEBUG: Keyed Lock acquired, total: {_debug_n_locks_acquired:>5}")
f"DEBUG: Keyed Lock acquired, total: {_debug_n_locks_acquired:>5}",
end="\r",
flush=True,
)
def dec_debug_n_locks_acquired(): def dec_debug_n_locks_acquired():
@ -79,11 +96,7 @@ def dec_debug_n_locks_acquired():
if DEBUG_LOCKS: if DEBUG_LOCKS:
if _debug_n_locks_acquired > 0: if _debug_n_locks_acquired > 0:
_debug_n_locks_acquired -= 1 _debug_n_locks_acquired -= 1
print( print(f"DEBUG: Keyed Lock released, total: {_debug_n_locks_acquired:>5}")
f"DEBUG: Keyed Lock released, total: {_debug_n_locks_acquired:>5}",
end="\r",
flush=True,
)
else: else:
raise RuntimeError("Attempting to release lock when no locks are acquired") raise RuntimeError("Attempting to release lock when no locks are acquired")
@ -113,17 +126,8 @@ class UnifiedLock(Generic[T]):
async def __aenter__(self) -> "UnifiedLock[T]": async def __aenter__(self) -> "UnifiedLock[T]":
try: try:
# direct_log(
# f"== Lock == Process {self._pid}: Acquiring lock '{self._name}' (async={self._is_async})",
# enable_output=self._enable_logging,
# )
# If in multiprocess mode and async lock exists, acquire it first # If in multiprocess mode and async lock exists, acquire it first
if not self._is_async and self._async_lock is not None: if not self._is_async and self._async_lock is not None:
# direct_log(
# f"== Lock == Process {self._pid}: Acquiring async lock for '{self._name}'",
# enable_output=self._enable_logging,
# )
await self._async_lock.acquire() await self._async_lock.acquire()
direct_log( direct_log(
f"== Lock == Process {self._pid}: Async lock for '{self._name}' acquired", f"== Lock == Process {self._pid}: Async lock for '{self._name}' acquired",
@ -328,12 +332,20 @@ def _release_shared_raw_mp_lock(factory_name: str, key: str):
_lock_cleanup_data[combined_key] = current_time _lock_cleanup_data[combined_key] = current_time
# Only perform cleanup when the pending cleanup list exceeds threshold # Only perform cleanup when the pending cleanup list exceeds threshold
if len(_lock_cleanup_data) > CLEANUP_THRESHOLD: total_cleanup_len = len(_lock_cleanup_data)
if total_cleanup_len >= CLEANUP_THRESHOLD:
cleaned_count = 0
for cleanup_key, cleanup_time in list(_lock_cleanup_data.items()): for cleanup_key, cleanup_time in list(_lock_cleanup_data.items()):
if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS:
_lock_registry.pop(cleanup_key, None) _lock_registry.pop(cleanup_key, None)
_lock_registry_count.pop(cleanup_key, None) _lock_registry_count.pop(cleanup_key, None)
_lock_cleanup_data.pop(cleanup_key, None) _lock_cleanup_data.pop(cleanup_key, None)
cleaned_count += 1
direct_log(
f"== mp Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired locks",
enable_output=False,
level="DEBUG",
)
class KeyedUnifiedLock: class KeyedUnifiedLock:
@ -400,7 +412,9 @@ class KeyedUnifiedLock:
self._async_lock_count[key] = count self._async_lock_count[key] = count
# Only perform cleanup when the pending cleanup list exceeds threshold # Only perform cleanup when the pending cleanup list exceeds threshold
if len(self._async_lock_cleanup_data) > CLEANUP_THRESHOLD: total_cleanup_len = len(self._async_lock_cleanup_data)
if total_cleanup_len >= CLEANUP_THRESHOLD:
cleaned_count = 0
for cleanup_key, cleanup_time in list( for cleanup_key, cleanup_time in list(
self._async_lock_cleanup_data.items() self._async_lock_cleanup_data.items()
): ):
@ -408,6 +422,12 @@ class KeyedUnifiedLock:
self._async_lock.pop(cleanup_key) self._async_lock.pop(cleanup_key)
self._async_lock_count.pop(cleanup_key) self._async_lock_count.pop(cleanup_key)
self._async_lock_cleanup_data.pop(cleanup_key) self._async_lock_cleanup_data.pop(cleanup_key)
cleaned_count += 1
direct_log(
f"== async Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired async locks",
enable_output=False,
level="DEBUG",
)
def _get_lock_for_key(self, key: str, enable_logging: bool = False) -> UnifiedLock: def _get_lock_for_key(self, key: str, enable_logging: bool = False) -> UnifiedLock:
# 1. get (or create) the perprocess async gate for this key # 1. get (or create) the perprocess async gate for this key
@ -626,7 +646,7 @@ def initialize_share_data(workers: int = 1):
_update_flags = _manager.dict() _update_flags = _manager.dict()
_graph_db_lock_keyed = KeyedUnifiedLock( _graph_db_lock_keyed = KeyedUnifiedLock(
factory_name="graph_db_lock", factory_name="GraphDB",
) )
# Initialize async locks for multiprocess mode # Initialize async locks for multiprocess mode
@ -654,7 +674,7 @@ def initialize_share_data(workers: int = 1):
_async_locks = None # No need for async locks in single process mode _async_locks = None # No need for async locks in single process mode
_graph_db_lock_keyed = KeyedUnifiedLock( _graph_db_lock_keyed = KeyedUnifiedLock(
factory_name="graph_db_lock", factory_name="GraphDB",
) )
direct_log(f"Process {os.getpid()} Shared-Data created for Single Process") direct_log(f"Process {os.getpid()} Shared-Data created for Single Process")

View file

@ -1143,19 +1143,18 @@ async def merge_nodes_and_edges(
total_relations_count = len(all_edges) total_relations_count = len(all_edges)
# Merge nodes and edges # Merge nodes and edges
log_message = f"Merging stage {current_file_number}/{total_files}: {file_path}"
logger.info(log_message)
async with pipeline_status_lock: async with pipeline_status_lock:
log_message = f"Merging stage {current_file_number}/{total_files}: {file_path}"
logger.info(log_message)
pipeline_status["latest_message"] = log_message pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message) pipeline_status["history_messages"].append(log_message)
# Process and update all entities and relationships in parallel # Process and update all entities and relationships in parallel
log_message = f"Updating {total_entities_count} entities and {total_relations_count} relations {current_file_number}/{total_files}: {file_path}" log_message = f"Processing: {total_entities_count} entities and {total_relations_count} relations"
logger.info(log_message) logger.info(log_message)
if pipeline_status is not None: async with pipeline_status_lock:
async with pipeline_status_lock: pipeline_status["latest_message"] = log_message
pipeline_status["latest_message"] = log_message pipeline_status["history_messages"].append(log_message)
pipeline_status["history_messages"].append(log_message)
# Get max async tasks limit from global_config for semaphore control # Get max async tasks limit from global_config for semaphore control
llm_model_max_async = global_config.get("llm_model_max_async", 4) llm_model_max_async = global_config.get("llm_model_max_async", 4)