This commit is contained in:
Raphaël MANSUY 2025-12-04 19:14:27 +08:00
parent 3558adae47
commit c83a76786a
3 changed files with 185 additions and 247 deletions

View file

@ -445,6 +445,11 @@ def parse_args() -> argparse.Namespace:
"EMBEDDING_BATCH_NUM", DEFAULT_EMBEDDING_BATCH_NUM, int "EMBEDDING_BATCH_NUM", DEFAULT_EMBEDDING_BATCH_NUM, int
) )
# Embedding token limit configuration
args.embedding_token_limit = get_env_value(
"EMBEDDING_TOKEN_LIMIT", None, int, special_none=True
)
ollama_server_infos.LIGHTRAG_NAME = args.simulated_model_name ollama_server_infos.LIGHTRAG_NAME = args.simulated_model_name
ollama_server_infos.LIGHTRAG_TAG = args.simulated_model_tag ollama_server_infos.LIGHTRAG_TAG = args.simulated_model_tag

View file

@ -777,6 +777,11 @@ def create_app(args):
send_dimensions=send_dimensions, send_dimensions=send_dimensions,
) )
# Set max_token_size if EMBEDDING_TOKEN_LIMIT is provided
if args.embedding_token_limit is not None:
embedding_func.max_token_size = args.embedding_token_limit
logger.info(f"Set embedding max_token_size to {args.embedding_token_limit}")
# Configure rerank function based on args.rerank_bindingparameter # Configure rerank function based on args.rerank_bindingparameter
rerank_model_func = None rerank_model_func = None
if args.rerank_binding != "null": if args.rerank_binding != "null":

View file

@ -64,10 +64,10 @@ from lightrag.kg import (
from lightrag.kg.shared_storage import ( from lightrag.kg.shared_storage import (
get_namespace_data, get_namespace_data,
get_graph_db_lock,
get_data_init_lock, get_data_init_lock,
get_default_workspace, get_storage_keyed_lock,
set_default_workspace, initialize_pipeline_status,
get_namespace_lock,
) )
from lightrag.base import ( from lightrag.base import (
@ -260,15 +260,13 @@ class LightRAG:
- `content`: The text to be split into chunks. - `content`: The text to be split into chunks.
- `split_by_character`: The character to split the text on. If None, the text is split into chunks of `chunk_token_size` tokens. - `split_by_character`: The character to split the text on. If None, the text is split into chunks of `chunk_token_size` tokens.
- `split_by_character_only`: If True, the text is split only on the specified character. - `split_by_character_only`: If True, the text is split only on the specified character.
- `chunk_overlap_token_size`: The number of overlapping tokens between consecutive chunks.
- `chunk_token_size`: The maximum number of tokens per chunk. - `chunk_token_size`: The maximum number of tokens per chunk.
- `chunk_overlap_token_size`: The number of overlapping tokens between consecutive chunks.
The function should return a list of dictionaries (or an awaitable that resolves to a list), The function should return a list of dictionaries (or an awaitable that resolves to a list),
where each dictionary contains the following keys: where each dictionary contains the following keys:
- `tokens` (int): The number of tokens in the chunk. - `tokens`: The number of tokens in the chunk.
- `content` (str): The text content of the chunk. - `content`: The text content of the chunk.
- `chunk_order_index` (int): Zero-based index indicating the chunk's order in the document.
Defaults to `chunking_by_token_size` if not specified. Defaults to `chunking_by_token_size` if not specified.
""" """
@ -279,8 +277,12 @@ class LightRAG:
embedding_func: EmbeddingFunc | None = field(default=None) embedding_func: EmbeddingFunc | None = field(default=None)
"""Function for computing text embeddings. Must be set before use.""" """Function for computing text embeddings. Must be set before use."""
embedding_token_limit: int | None = field(default=None, init=False) @property
"""Token limit for embedding model. Set automatically from embedding_func.max_token_size in __post_init__.""" def embedding_token_limit(self) -> int | None:
"""Get the token limit for embedding model from embedding_func."""
if self.embedding_func and hasattr(self.embedding_func, "max_token_size"):
return self.embedding_func.max_token_size
return None
embedding_batch_num: int = field(default=int(os.getenv("EMBEDDING_BATCH_NUM", 10))) embedding_batch_num: int = field(default=int(os.getenv("EMBEDDING_BATCH_NUM", 10)))
"""Batch size for embedding computations.""" """Batch size for embedding computations."""
@ -525,16 +527,6 @@ class LightRAG:
logger.debug(f"LightRAG init with param:\n {_print_config}\n") logger.debug(f"LightRAG init with param:\n {_print_config}\n")
# Init Embedding # Init Embedding
# Step 1: Capture max_token_size before applying decorator (decorator strips dataclass attributes)
embedding_max_token_size = None
if self.embedding_func and hasattr(self.embedding_func, "max_token_size"):
embedding_max_token_size = self.embedding_func.max_token_size
logger.debug(
f"Captured embedding max_token_size: {embedding_max_token_size}"
)
self.embedding_token_limit = embedding_max_token_size
# Step 2: Apply priority wrapper decorator
self.embedding_func = priority_limit_async_func_call( self.embedding_func = priority_limit_async_func_call(
self.embedding_func_max_async, self.embedding_func_max_async,
llm_timeout=self.default_embedding_timeout, llm_timeout=self.default_embedding_timeout,
@ -661,21 +653,12 @@ class LightRAG:
async def initialize_storages(self): async def initialize_storages(self):
"""Storage initialization must be called one by one to prevent deadlock""" """Storage initialization must be called one by one to prevent deadlock"""
if self._storages_status == StoragesStatus.CREATED: if self._storages_status == StoragesStatus.CREATED:
# Set the first initialized workspace will set the default workspace # Set default workspace for backward compatibility
# Allows namespace operation without specifying workspace for backward compatibility # This allows initialize_pipeline_status() called without parameters
default_workspace = get_default_workspace() # to use the correct workspace
if default_workspace is None: from lightrag.kg.shared_storage import set_default_workspace
set_default_workspace(self.workspace)
elif default_workspace != self.workspace:
logger.info(
f"Creating LightRAG instance with workspace='{self.workspace}' "
f"while default workspace is set to '{default_workspace}'"
)
# Auto-initialize pipeline_status for this workspace set_default_workspace(self.workspace)
from lightrag.kg.shared_storage import initialize_pipeline_status
await initialize_pipeline_status(workspace=self.workspace)
for storage in ( for storage in (
self.full_docs, self.full_docs,
@ -1611,13 +1594,23 @@ class LightRAG:
""" """
# Get pipeline status shared data and lock # Get pipeline status shared data and lock
pipeline_status = await get_namespace_data( # Step 1: Get workspace
"pipeline_status", workspace=self.workspace workspace = self.workspace
)
pipeline_status_lock = get_namespace_lock( # Step 2: Construct namespace (following GraphDB pattern)
"pipeline_status", workspace=self.workspace namespace = f"{workspace}:pipeline" if workspace else "pipeline_status"
# Step 3: Ensure initialization (on first access)
await initialize_pipeline_status(workspace)
# Step 4: Get lock
pipeline_status_lock = get_storage_keyed_lock(
keys="status", namespace=namespace, enable_logging=False
) )
# Step 5: Get data
pipeline_status = await get_namespace_data(namespace)
# Check if another process is already processing the queue # Check if another process is already processing the queue
async with pipeline_status_lock: async with pipeline_status_lock:
# Ensure only one worker is processing documents # Ensure only one worker is processing documents
@ -2950,26 +2943,6 @@ class LightRAG:
data across different storage layers are removed or rebuiled. If entities or relationships data across different storage layers are removed or rebuiled. If entities or relationships
are partially affected, they will be rebuilded using LLM cached from remaining documents. are partially affected, they will be rebuilded using LLM cached from remaining documents.
**Concurrency Control Design:**
This function implements a pipeline-based concurrency control to prevent data corruption:
1. **Single Document Deletion** (when WE acquire pipeline):
- Sets job_name to "Single document deletion" (NOT starting with "deleting")
- Prevents other adelete_by_doc_id calls from running concurrently
- Ensures exclusive access to graph operations for this deletion
2. **Batch Document Deletion** (when background_delete_documents acquires pipeline):
- Sets job_name to "Deleting {N} Documents" (starts with "deleting")
- Allows multiple adelete_by_doc_id calls to join the deletion queue
- Each call validates the job name to ensure it's part of a deletion operation
The validation logic `if not job_name.startswith("deleting") or "document" not in job_name`
ensures that:
- adelete_by_doc_id can only run when pipeline is idle OR during batch deletion
- Prevents concurrent single deletions that could cause race conditions
- Rejects operations when pipeline is busy with non-deletion tasks
Args: Args:
doc_id (str): The unique identifier of the document to be deleted. doc_id (str): The unique identifier of the document to be deleted.
delete_llm_cache (bool): Whether to delete cached LLM extraction results delete_llm_cache (bool): Whether to delete cached LLM extraction results
@ -2977,62 +2950,34 @@ class LightRAG:
Returns: Returns:
DeletionResult: An object containing the outcome of the deletion process. DeletionResult: An object containing the outcome of the deletion process.
- `status` (str): "success", "not_found", "not_allowed", or "failure". - `status` (str): "success", "not_found", or "failure".
- `doc_id` (str): The ID of the document attempted to be deleted. - `doc_id` (str): The ID of the document attempted to be deleted.
- `message` (str): A summary of the operation's result. - `message` (str): A summary of the operation's result.
- `status_code` (int): HTTP status code (e.g., 200, 404, 403, 500). - `status_code` (int): HTTP status code (e.g., 200, 404, 500).
- `file_path` (str | None): The file path of the deleted document, if available. - `file_path` (str | None): The file path of the deleted document, if available.
""" """
# Get pipeline status shared data and lock for validation
pipeline_status = await get_namespace_data(
"pipeline_status", workspace=self.workspace
)
pipeline_status_lock = get_namespace_lock(
"pipeline_status", workspace=self.workspace
)
# Track whether WE acquired the pipeline
we_acquired_pipeline = False
# Check and acquire pipeline if needed
async with pipeline_status_lock:
if not pipeline_status.get("busy", False):
# Pipeline is idle - WE acquire it for this deletion
we_acquired_pipeline = True
pipeline_status.update(
{
"busy": True,
"job_name": "Single document deletion",
"job_start": datetime.now(timezone.utc).isoformat(),
"docs": 1,
"batchs": 1,
"cur_batch": 0,
"request_pending": False,
"cancellation_requested": False,
"latest_message": f"Starting deletion for document: {doc_id}",
}
)
# Initialize history messages
pipeline_status["history_messages"][:] = [
f"Starting deletion for document: {doc_id}"
]
else:
# Pipeline already busy - verify it's a deletion job
job_name = pipeline_status.get("job_name", "").lower()
if not job_name.startswith("deleting") or "document" not in job_name:
return DeletionResult(
status="not_allowed",
doc_id=doc_id,
message=f"Deletion not allowed: current job '{pipeline_status.get('job_name')}' is not a document deletion job",
status_code=403,
file_path=None,
)
# Pipeline is busy with deletion - proceed without acquiring
deletion_operations_started = False deletion_operations_started = False
original_exception = None original_exception = None
doc_llm_cache_ids: list[str] = [] doc_llm_cache_ids: list[str] = []
# Get pipeline status shared data and lock for status updates
# Step 1: Get workspace
workspace = self.workspace
# Step 2: Construct namespace (following GraphDB pattern)
namespace = f"{workspace}:pipeline" if workspace else "pipeline_status"
# Step 3: Ensure initialization (on first access)
await initialize_pipeline_status(workspace)
# Step 4: Get lock
pipeline_status_lock = get_storage_keyed_lock(
keys="status", namespace=namespace, enable_logging=False
)
# Step 5: Get data
pipeline_status = await get_namespace_data(namespace)
async with pipeline_status_lock: async with pipeline_status_lock:
log_message = f"Starting deletion process for document {doc_id}" log_message = f"Starting deletion process for document {doc_id}"
logger.info(log_message) logger.info(log_message)
@ -3385,111 +3330,31 @@ class LightRAG:
logger.error(f"Failed to process graph analysis results: {e}") logger.error(f"Failed to process graph analysis results: {e}")
raise Exception(f"Failed to process graph dependencies: {e}") from e raise Exception(f"Failed to process graph dependencies: {e}") from e
# Data integrity is ensured by allowing only one process to hold pipeline at a timeno graph db lock is needed anymore) # Use graph database lock to prevent dirty read
graph_db_lock = get_graph_db_lock(enable_logging=False)
async with graph_db_lock:
# 5. Delete chunks from storage
if chunk_ids:
try:
await self.chunks_vdb.delete(chunk_ids)
await self.text_chunks.delete(chunk_ids)
# 5. Delete chunks from storage async with pipeline_status_lock:
if chunk_ids: log_message = f"Successfully deleted {len(chunk_ids)} chunks from storage"
try: logger.info(log_message)
await self.chunks_vdb.delete(chunk_ids) pipeline_status["latest_message"] = log_message
await self.text_chunks.delete(chunk_ids) pipeline_status["history_messages"].append(log_message)
async with pipeline_status_lock: except Exception as e:
log_message = ( logger.error(f"Failed to delete chunks: {e}")
f"Successfully deleted {len(chunk_ids)} chunks from storage" raise Exception(f"Failed to delete document chunks: {e}") from e
)
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
except Exception as e: # 6. Delete relationships that have no remaining sources
logger.error(f"Failed to delete chunks: {e}") if relationships_to_delete:
raise Exception(f"Failed to delete document chunks: {e}") from e try:
# Delete from relation vdb
# 6. Delete relationships that have no remaining sources
if relationships_to_delete:
try:
# Delete from relation vdb
rel_ids_to_delete = []
for src, tgt in relationships_to_delete:
rel_ids_to_delete.extend(
[
compute_mdhash_id(src + tgt, prefix="rel-"),
compute_mdhash_id(tgt + src, prefix="rel-"),
]
)
await self.relationships_vdb.delete(rel_ids_to_delete)
# Delete from graph
await self.chunk_entity_relation_graph.remove_edges(
list(relationships_to_delete)
)
# Delete from relation_chunks storage
if self.relation_chunks:
relation_storage_keys = [
make_relation_chunk_key(src, tgt)
for src, tgt in relationships_to_delete
]
await self.relation_chunks.delete(relation_storage_keys)
async with pipeline_status_lock:
log_message = f"Successfully deleted {len(relationships_to_delete)} relations"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
except Exception as e:
logger.error(f"Failed to delete relationships: {e}")
raise Exception(f"Failed to delete relationships: {e}") from e
# 7. Delete entities that have no remaining sources
if entities_to_delete:
try:
# Batch get all edges for entities to avoid N+1 query problem
nodes_edges_dict = (
await self.chunk_entity_relation_graph.get_nodes_edges_batch(
list(entities_to_delete)
)
)
# Debug: Check and log all edges before deleting nodes
edges_to_delete = set()
edges_still_exist = 0
for entity, edges in nodes_edges_dict.items():
if edges:
for src, tgt in edges:
# Normalize edge representation (sorted for consistency)
edge_tuple = tuple(sorted((src, tgt)))
edges_to_delete.add(edge_tuple)
if (
src in entities_to_delete
and tgt in entities_to_delete
):
logger.warning(
f"Edge still exists: {src} <-> {tgt}"
)
elif src in entities_to_delete:
logger.warning(
f"Edge still exists: {src} --> {tgt}"
)
else:
logger.warning(
f"Edge still exists: {src} <-- {tgt}"
)
edges_still_exist += 1
if edges_still_exist:
logger.warning(
f"⚠️ {edges_still_exist} entities still has edges before deletion"
)
# Clean residual edges from VDB and storage before deleting nodes
if edges_to_delete:
# Delete from relationships_vdb
rel_ids_to_delete = [] rel_ids_to_delete = []
for src, tgt in edges_to_delete: for src, tgt in relationships_to_delete:
rel_ids_to_delete.extend( rel_ids_to_delete.extend(
[ [
compute_mdhash_id(src + tgt, prefix="rel-"), compute_mdhash_id(src + tgt, prefix="rel-"),
@ -3498,48 +3363,123 @@ class LightRAG:
) )
await self.relationships_vdb.delete(rel_ids_to_delete) await self.relationships_vdb.delete(rel_ids_to_delete)
# Delete from graph
await self.chunk_entity_relation_graph.remove_edges(
list(relationships_to_delete)
)
# Delete from relation_chunks storage # Delete from relation_chunks storage
if self.relation_chunks: if self.relation_chunks:
relation_storage_keys = [ relation_storage_keys = [
make_relation_chunk_key(src, tgt) make_relation_chunk_key(src, tgt)
for src, tgt in edges_to_delete for src, tgt in relationships_to_delete
] ]
await self.relation_chunks.delete(relation_storage_keys) await self.relation_chunks.delete(relation_storage_keys)
logger.info( async with pipeline_status_lock:
f"Cleaned {len(edges_to_delete)} residual edges from VDB and chunk-tracking storage" log_message = f"Successfully deleted {len(relationships_to_delete)} relations"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
except Exception as e:
logger.error(f"Failed to delete relationships: {e}")
raise Exception(f"Failed to delete relationships: {e}") from e
# 7. Delete entities that have no remaining sources
if entities_to_delete:
try:
# Batch get all edges for entities to avoid N+1 query problem
nodes_edges_dict = await self.chunk_entity_relation_graph.get_nodes_edges_batch(
list(entities_to_delete)
) )
# Delete from graph (edges will be auto-deleted with nodes) # Debug: Check and log all edges before deleting nodes
await self.chunk_entity_relation_graph.remove_nodes( edges_to_delete = set()
list(entities_to_delete) edges_still_exist = 0
)
# Delete from vector vdb for entity, edges in nodes_edges_dict.items():
entity_vdb_ids = [ if edges:
compute_mdhash_id(entity, prefix="ent-") for src, tgt in edges:
for entity in entities_to_delete # Normalize edge representation (sorted for consistency)
] edge_tuple = tuple(sorted((src, tgt)))
await self.entities_vdb.delete(entity_vdb_ids) edges_to_delete.add(edge_tuple)
# Delete from entity_chunks storage if (
if self.entity_chunks: src in entities_to_delete
await self.entity_chunks.delete(list(entities_to_delete)) and tgt in entities_to_delete
):
logger.warning(
f"Edge still exists: {src} <-> {tgt}"
)
elif src in entities_to_delete:
logger.warning(
f"Edge still exists: {src} --> {tgt}"
)
else:
logger.warning(
f"Edge still exists: {src} <-- {tgt}"
)
edges_still_exist += 1
async with pipeline_status_lock: if edges_still_exist:
log_message = ( logger.warning(
f"Successfully deleted {len(entities_to_delete)} entities" f"⚠️ {edges_still_exist} entities still has edges before deletion"
)
# Clean residual edges from VDB and storage before deleting nodes
if edges_to_delete:
# Delete from relationships_vdb
rel_ids_to_delete = []
for src, tgt in edges_to_delete:
rel_ids_to_delete.extend(
[
compute_mdhash_id(src + tgt, prefix="rel-"),
compute_mdhash_id(tgt + src, prefix="rel-"),
]
)
await self.relationships_vdb.delete(rel_ids_to_delete)
# Delete from relation_chunks storage
if self.relation_chunks:
relation_storage_keys = [
make_relation_chunk_key(src, tgt)
for src, tgt in edges_to_delete
]
await self.relation_chunks.delete(relation_storage_keys)
logger.info(
f"Cleaned {len(edges_to_delete)} residual edges from VDB and chunk-tracking storage"
)
# Delete from graph (edges will be auto-deleted with nodes)
await self.chunk_entity_relation_graph.remove_nodes(
list(entities_to_delete)
) )
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
except Exception as e: # Delete from vector vdb
logger.error(f"Failed to delete entities: {e}") entity_vdb_ids = [
raise Exception(f"Failed to delete entities: {e}") from e compute_mdhash_id(entity, prefix="ent-")
for entity in entities_to_delete
]
await self.entities_vdb.delete(entity_vdb_ids)
# Persist changes to graph database before entity and relationship rebuild # Delete from entity_chunks storage
await self._insert_done() if self.entity_chunks:
await self.entity_chunks.delete(list(entities_to_delete))
async with pipeline_status_lock:
log_message = f"Successfully deleted {len(entities_to_delete)} entities"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
except Exception as e:
logger.error(f"Failed to delete entities: {e}")
raise Exception(f"Failed to delete entities: {e}") from e
# Persist changes to graph database before releasing graph database lock
await self._insert_done()
# 8. Rebuild entities and relationships from remaining chunks # 8. Rebuild entities and relationships from remaining chunks
if entities_to_rebuild or relationships_to_rebuild: if entities_to_rebuild or relationships_to_rebuild:
@ -3645,18 +3585,6 @@ class LightRAG:
f"No deletion operations were started for document {doc_id}, skipping persistence" f"No deletion operations were started for document {doc_id}, skipping persistence"
) )
# Release pipeline only if WE acquired it
if we_acquired_pipeline:
async with pipeline_status_lock:
pipeline_status["busy"] = False
pipeline_status["cancellation_requested"] = False
completion_msg = (
f"Deletion process completed for document: {doc_id}"
)
pipeline_status["latest_message"] = completion_msg
pipeline_status["history_messages"].append(completion_msg)
logger.info(completion_msg)
async def adelete_by_entity(self, entity_name: str) -> DeletionResult: async def adelete_by_entity(self, entity_name: str) -> DeletionResult:
"""Asynchronously delete an entity and all its relationships. """Asynchronously delete an entity and all its relationships.