diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index 8f3fbae1..eab614c3 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -917,6 +917,11 @@ def create_app(args): else: logger.info("Embedding max_token_size: not set (90% token warning disabled)") + # Set max_token_size if EMBEDDING_TOKEN_LIMIT is provided + if args.embedding_token_limit is not None: + embedding_func.max_token_size = args.embedding_token_limit + logger.info(f"Set embedding max_token_size to {args.embedding_token_limit}") + # Configure rerank function based on args.rerank_bindingparameter rerank_model_func = None if args.rerank_binding != "null": diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index a8a183df..72a4dc6d 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -64,10 +64,9 @@ from lightrag.kg import ( from lightrag.kg.shared_storage import ( get_namespace_data, + get_pipeline_status_lock, get_graph_db_lock, get_data_init_lock, - get_storage_keyed_lock, - initialize_pipeline_status, ) from lightrag.base import ( @@ -277,8 +276,12 @@ class LightRAG: embedding_func: EmbeddingFunc | None = field(default=None) """Function for computing text embeddings. Must be set before use.""" - embedding_token_limit: int | None = field(default=None, init=False) - """Token limit for embedding model. Set automatically from embedding_func.max_token_size in __post_init__.""" + @property + def embedding_token_limit(self) -> int | None: + """Get the token limit for embedding model from embedding_func.""" + if self.embedding_func and hasattr(self.embedding_func, "max_token_size"): + return self.embedding_func.max_token_size + return None embedding_batch_num: int = field(default=int(os.getenv("EMBEDDING_BATCH_NUM", 10))) """Batch size for embedding computations.""" @@ -523,28 +526,12 @@ class LightRAG: logger.debug(f"LightRAG init with param:\n {_print_config}\n") # Init Embedding - # Step 1: Capture max_token_size before applying decorator (decorator strips dataclass attributes) - embedding_max_token_size = None - if self.embedding_func and hasattr(self.embedding_func, "max_token_size"): - embedding_max_token_size = self.embedding_func.max_token_size - logger.debug( - f"Captured embedding max_token_size: {embedding_max_token_size}" - ) - self.embedding_token_limit = embedding_max_token_size - - # Step 2: Apply priority wrapper decorator self.embedding_func = priority_limit_async_func_call( self.embedding_func_max_async, llm_timeout=self.default_embedding_timeout, queue_name="Embedding func", )(self.embedding_func) - # Initialize embedding_token_limit from embedding_func - if self.embedding_func and hasattr(self.embedding_func, "max_token_size"): - self.embedding_token_limit = self.embedding_func.max_token_size - else: - self.embedding_token_limit = None - # Initialize all storages self.key_string_value_json_storage_cls: type[BaseKVStorage] = ( self._get_storage_class(self.kv_storage) @@ -665,13 +652,6 @@ class LightRAG: async def initialize_storages(self): """Storage initialization must be called one by one to prevent deadlock""" if self._storages_status == StoragesStatus.CREATED: - # Set default workspace for backward compatibility - # This allows initialize_pipeline_status() called without parameters - # to use the correct workspace - from lightrag.kg.shared_storage import set_default_workspace - - set_default_workspace(self.workspace) - for storage in ( self.full_docs, self.text_chunks, @@ -1606,22 +1586,8 @@ class LightRAG: """ # Get pipeline status shared data and lock - # Step 1: Get workspace - workspace = self.workspace - - # Step 2: Construct namespace (following GraphDB pattern) - namespace = f"{workspace}:pipeline" if workspace else "pipeline_status" - - # Step 3: Ensure initialization (on first access) - await initialize_pipeline_status(workspace) - - # Step 4: Get lock - pipeline_status_lock = get_storage_keyed_lock( - keys="status", namespace=namespace, enable_logging=False - ) - - # Step 5: Get data - pipeline_status = await get_namespace_data(namespace) + pipeline_status = await get_namespace_data("pipeline_status") + pipeline_status_lock = get_pipeline_status_lock() # Check if another process is already processing the queue async with pipeline_status_lock: @@ -2973,22 +2939,8 @@ class LightRAG: doc_llm_cache_ids: list[str] = [] # Get pipeline status shared data and lock for status updates - # Step 1: Get workspace - workspace = self.workspace - - # Step 2: Construct namespace (following GraphDB pattern) - namespace = f"{workspace}:pipeline" if workspace else "pipeline_status" - - # Step 3: Ensure initialization (on first access) - await initialize_pipeline_status(workspace) - - # Step 4: Get lock - pipeline_status_lock = get_storage_keyed_lock( - keys="status", namespace=namespace, enable_logging=False - ) - - # Step 5: Get data - pipeline_status = await get_namespace_data(namespace) + pipeline_status = await get_namespace_data("pipeline_status") + pipeline_status_lock = get_pipeline_status_lock() async with pipeline_status_lock: log_message = f"Starting deletion process for document {doc_id}"