This commit is contained in:
Raphaël MANSUY 2025-12-04 19:14:27 +08:00
parent 778e6d57c4
commit 759980e522
2 changed files with 16 additions and 59 deletions

View file

@ -917,6 +917,11 @@ def create_app(args):
else:
logger.info("Embedding max_token_size: not set (90% token warning disabled)")
# Set max_token_size if EMBEDDING_TOKEN_LIMIT is provided
if args.embedding_token_limit is not None:
embedding_func.max_token_size = args.embedding_token_limit
logger.info(f"Set embedding max_token_size to {args.embedding_token_limit}")
# Configure rerank function based on args.rerank_bindingparameter
rerank_model_func = None
if args.rerank_binding != "null":

View file

@ -64,10 +64,9 @@ from lightrag.kg import (
from lightrag.kg.shared_storage import (
get_namespace_data,
get_pipeline_status_lock,
get_graph_db_lock,
get_data_init_lock,
get_storage_keyed_lock,
initialize_pipeline_status,
)
from lightrag.base import (
@ -277,8 +276,12 @@ class LightRAG:
embedding_func: EmbeddingFunc | None = field(default=None)
"""Function for computing text embeddings. Must be set before use."""
embedding_token_limit: int | None = field(default=None, init=False)
"""Token limit for embedding model. Set automatically from embedding_func.max_token_size in __post_init__."""
@property
def embedding_token_limit(self) -> int | None:
"""Get the token limit for embedding model from embedding_func."""
if self.embedding_func and hasattr(self.embedding_func, "max_token_size"):
return self.embedding_func.max_token_size
return None
embedding_batch_num: int = field(default=int(os.getenv("EMBEDDING_BATCH_NUM", 10)))
"""Batch size for embedding computations."""
@ -523,28 +526,12 @@ class LightRAG:
logger.debug(f"LightRAG init with param:\n {_print_config}\n")
# Init Embedding
# Step 1: Capture max_token_size before applying decorator (decorator strips dataclass attributes)
embedding_max_token_size = None
if self.embedding_func and hasattr(self.embedding_func, "max_token_size"):
embedding_max_token_size = self.embedding_func.max_token_size
logger.debug(
f"Captured embedding max_token_size: {embedding_max_token_size}"
)
self.embedding_token_limit = embedding_max_token_size
# Step 2: Apply priority wrapper decorator
self.embedding_func = priority_limit_async_func_call(
self.embedding_func_max_async,
llm_timeout=self.default_embedding_timeout,
queue_name="Embedding func",
)(self.embedding_func)
# Initialize embedding_token_limit from embedding_func
if self.embedding_func and hasattr(self.embedding_func, "max_token_size"):
self.embedding_token_limit = self.embedding_func.max_token_size
else:
self.embedding_token_limit = None
# Initialize all storages
self.key_string_value_json_storage_cls: type[BaseKVStorage] = (
self._get_storage_class(self.kv_storage)
@ -665,13 +652,6 @@ class LightRAG:
async def initialize_storages(self):
"""Storage initialization must be called one by one to prevent deadlock"""
if self._storages_status == StoragesStatus.CREATED:
# Set default workspace for backward compatibility
# This allows initialize_pipeline_status() called without parameters
# to use the correct workspace
from lightrag.kg.shared_storage import set_default_workspace
set_default_workspace(self.workspace)
for storage in (
self.full_docs,
self.text_chunks,
@ -1606,22 +1586,8 @@ class LightRAG:
"""
# Get pipeline status shared data and lock
# Step 1: Get workspace
workspace = self.workspace
# Step 2: Construct namespace (following GraphDB pattern)
namespace = f"{workspace}:pipeline" if workspace else "pipeline_status"
# Step 3: Ensure initialization (on first access)
await initialize_pipeline_status(workspace)
# Step 4: Get lock
pipeline_status_lock = get_storage_keyed_lock(
keys="status", namespace=namespace, enable_logging=False
)
# Step 5: Get data
pipeline_status = await get_namespace_data(namespace)
pipeline_status = await get_namespace_data("pipeline_status")
pipeline_status_lock = get_pipeline_status_lock()
# Check if another process is already processing the queue
async with pipeline_status_lock:
@ -2973,22 +2939,8 @@ class LightRAG:
doc_llm_cache_ids: list[str] = []
# Get pipeline status shared data and lock for status updates
# Step 1: Get workspace
workspace = self.workspace
# Step 2: Construct namespace (following GraphDB pattern)
namespace = f"{workspace}:pipeline" if workspace else "pipeline_status"
# Step 3: Ensure initialization (on first access)
await initialize_pipeline_status(workspace)
# Step 4: Get lock
pipeline_status_lock = get_storage_keyed_lock(
keys="status", namespace=namespace, enable_logging=False
)
# Step 5: Get data
pipeline_status = await get_namespace_data(namespace)
pipeline_status = await get_namespace_data("pipeline_status")
pipeline_status_lock = get_pipeline_status_lock()
async with pipeline_status_lock:
log_message = f"Starting deletion process for document {doc_id}"