diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index 2ce4267a..4947ff26 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -1619,6 +1619,7 @@ async def background_delete_documents( pipeline_status.update( { "busy": True, + # Job name can not be changed, it's verified in adelete_by_doc_id() "job_name": f"Deleting {total_docs} Documents", "job_start": datetime.now().isoformat(), "docs": total_docs, diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index c3b3ec44..c0fa8627 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -3,6 +3,7 @@ from __future__ import annotations import traceback import asyncio import configparser +import inspect import os import time import warnings @@ -12,6 +13,7 @@ from functools import partial from typing import ( Any, AsyncIterator, + Awaitable, Callable, Iterator, cast, @@ -20,6 +22,7 @@ from typing import ( Optional, List, Dict, + Union, ) from lightrag.prompt import PROMPTS from lightrag.exceptions import PipelineCancelledException @@ -61,9 +64,10 @@ from lightrag.kg import ( from lightrag.kg.shared_storage import ( get_namespace_data, - get_pipeline_status_lock, - get_graph_db_lock, get_data_init_lock, + get_default_workspace, + set_default_workspace, + get_namespace_lock, ) from lightrag.base import ( @@ -243,11 +247,13 @@ class LightRAG: int, int, ], - List[Dict[str, Any]], + Union[List[Dict[str, Any]], Awaitable[List[Dict[str, Any]]]], ] = field(default_factory=lambda: chunking_by_token_size) """ Custom chunking function for splitting text into chunks before processing. + The function can be either synchronous or asynchronous. + The function should take the following parameters: - `tokenizer`: A Tokenizer instance to use for tokenization. @@ -257,7 +263,8 @@ class LightRAG: - `chunk_token_size`: The maximum number of tokens per chunk. - `chunk_overlap_token_size`: The number of overlapping tokens between consecutive chunks. - The function should return a list of dictionaries, where each dictionary contains the following keys: + The function should return a list of dictionaries (or an awaitable that resolves to a list), + where each dictionary contains the following keys: - `tokens`: The number of tokens in the chunk. - `content`: The text content of the chunk. @@ -270,6 +277,9 @@ class LightRAG: embedding_func: EmbeddingFunc | None = field(default=None) """Function for computing text embeddings. Must be set before use.""" + embedding_token_limit: int | None = field(default=None, init=False) + """Token limit for embedding model. Set automatically from embedding_func.max_token_size in __post_init__.""" + embedding_batch_num: int = field(default=int(os.getenv("EMBEDDING_BATCH_NUM", 10))) """Batch size for embedding computations.""" @@ -513,6 +523,16 @@ class LightRAG: logger.debug(f"LightRAG init with param:\n {_print_config}\n") # Init Embedding + # Step 1: Capture max_token_size before applying decorator (decorator strips dataclass attributes) + embedding_max_token_size = None + if self.embedding_func and hasattr(self.embedding_func, "max_token_size"): + embedding_max_token_size = self.embedding_func.max_token_size + logger.debug( + f"Captured embedding max_token_size: {embedding_max_token_size}" + ) + self.embedding_token_limit = embedding_max_token_size + + # Step 2: Apply priority wrapper decorator self.embedding_func = priority_limit_async_func_call( self.embedding_func_max_async, llm_timeout=self.default_embedding_timeout, @@ -639,6 +659,22 @@ class LightRAG: async def initialize_storages(self): """Storage initialization must be called one by one to prevent deadlock""" if self._storages_status == StoragesStatus.CREATED: + # Set the first initialized workspace will set the default workspace + # Allows namespace operation without specifying workspace for backward compatibility + default_workspace = get_default_workspace() + if default_workspace is None: + set_default_workspace(self.workspace) + elif default_workspace != self.workspace: + logger.info( + f"Creating LightRAG instance with workspace='{self.workspace}' " + f"while default workspace is set to '{default_workspace}'" + ) + + # Auto-initialize pipeline_status for this workspace + from lightrag.kg.shared_storage import initialize_pipeline_status + + await initialize_pipeline_status(workspace=self.workspace) + for storage in ( self.full_docs, self.text_chunks, @@ -1573,8 +1609,12 @@ class LightRAG: """ # Get pipeline status shared data and lock - pipeline_status = await get_namespace_data("pipeline_status") - pipeline_status_lock = get_pipeline_status_lock() + pipeline_status = await get_namespace_data( + "pipeline_status", workspace=self.workspace + ) + pipeline_status_lock = get_namespace_lock( + "pipeline_status", workspace=self.workspace + ) # Check if another process is already processing the queue async with pipeline_status_lock: @@ -1756,7 +1796,28 @@ class LightRAG: ) content = content_data["content"] - # Generate chunks from document + # Call chunking function, supporting both sync and async implementations + chunking_result = self.chunking_func( + self.tokenizer, + content, + split_by_character, + split_by_character_only, + self.chunk_overlap_token_size, + self.chunk_token_size, + ) + + # If result is awaitable, await to get actual result + if inspect.isawaitable(chunking_result): + chunking_result = await chunking_result + + # Validate return type + if not isinstance(chunking_result, (list, tuple)): + raise TypeError( + f"chunking_func must return a list or tuple of dicts, " + f"got {type(chunking_result)}" + ) + + # Build chunks dictionary chunks: dict[str, Any] = { compute_mdhash_id(dp["content"], prefix="chunk-"): { **dp, @@ -1764,14 +1825,7 @@ class LightRAG: "file_path": file_path, # Add file path to each chunk "llm_cache_list": [], # Initialize empty LLM cache list for each chunk } - for dp in self.chunking_func( - self.tokenizer, - content, - split_by_character, - split_by_character_only, - self.chunk_overlap_token_size, - self.chunk_token_size, - ) + for dp in chunking_result } if not chunks: @@ -1835,7 +1889,7 @@ class LightRAG: chunks, pipeline_status, pipeline_status_lock ) ) - await entity_relation_task + chunk_results = await entity_relation_task file_extraction_stage_ok = True except Exception as e: @@ -1917,8 +1971,7 @@ class LightRAG: "User cancelled" ) - # Get chunk_results from entity_relation_task - chunk_results = await entity_relation_task + # Use chunk_results from entity_relation_task await merge_nodes_and_edges( chunk_results=chunk_results, # result collected from entity_relation_task knowledge_graph_inst=self.chunk_entity_relation_graph, @@ -2895,6 +2948,26 @@ class LightRAG: data across different storage layers are removed or rebuiled. If entities or relationships are partially affected, they will be rebuilded using LLM cached from remaining documents. + **Concurrency Control Design:** + + This function implements a pipeline-based concurrency control to prevent data corruption: + + 1. **Single Document Deletion** (when WE acquire pipeline): + - Sets job_name to "Single document deletion" (NOT starting with "deleting") + - Prevents other adelete_by_doc_id calls from running concurrently + - Ensures exclusive access to graph operations for this deletion + + 2. **Batch Document Deletion** (when background_delete_documents acquires pipeline): + - Sets job_name to "Deleting {N} Documents" (starts with "deleting") + - Allows multiple adelete_by_doc_id calls to join the deletion queue + - Each call validates the job name to ensure it's part of a deletion operation + + The validation logic `if not job_name.startswith("deleting") or "document" not in job_name` + ensures that: + - adelete_by_doc_id can only run when pipeline is idle OR during batch deletion + - Prevents concurrent single deletions that could cause race conditions + - Rejects operations when pipeline is busy with non-deletion tasks + Args: doc_id (str): The unique identifier of the document to be deleted. delete_llm_cache (bool): Whether to delete cached LLM extraction results @@ -2902,20 +2975,62 @@ class LightRAG: Returns: DeletionResult: An object containing the outcome of the deletion process. - - `status` (str): "success", "not_found", or "failure". + - `status` (str): "success", "not_found", "not_allowed", or "failure". - `doc_id` (str): The ID of the document attempted to be deleted. - `message` (str): A summary of the operation's result. - - `status_code` (int): HTTP status code (e.g., 200, 404, 500). + - `status_code` (int): HTTP status code (e.g., 200, 404, 403, 500). - `file_path` (str | None): The file path of the deleted document, if available. """ + # Get pipeline status shared data and lock for validation + pipeline_status = await get_namespace_data( + "pipeline_status", workspace=self.workspace + ) + pipeline_status_lock = get_namespace_lock( + "pipeline_status", workspace=self.workspace + ) + + # Track whether WE acquired the pipeline + we_acquired_pipeline = False + + # Check and acquire pipeline if needed + async with pipeline_status_lock: + if not pipeline_status.get("busy", False): + # Pipeline is idle - WE acquire it for this deletion + we_acquired_pipeline = True + pipeline_status.update( + { + "busy": True, + "job_name": "Single document deletion", + "job_start": datetime.now(timezone.utc).isoformat(), + "docs": 1, + "batchs": 1, + "cur_batch": 0, + "request_pending": False, + "cancellation_requested": False, + "latest_message": f"Starting deletion for document: {doc_id}", + } + ) + # Initialize history messages + pipeline_status["history_messages"][:] = [ + f"Starting deletion for document: {doc_id}" + ] + else: + # Pipeline already busy - verify it's a deletion job + job_name = pipeline_status.get("job_name", "").lower() + if not job_name.startswith("deleting") or "document" not in job_name: + return DeletionResult( + status="not_allowed", + doc_id=doc_id, + message=f"Deletion not allowed: current job '{pipeline_status.get('job_name')}' is not a document deletion job", + status_code=403, + file_path=None, + ) + # Pipeline is busy with deletion - proceed without acquiring + deletion_operations_started = False original_exception = None doc_llm_cache_ids: list[str] = [] - # Get pipeline status shared data and lock for status updates - pipeline_status = await get_namespace_data("pipeline_status") - pipeline_status_lock = get_pipeline_status_lock() - async with pipeline_status_lock: log_message = f"Starting deletion process for document {doc_id}" logger.info(log_message) @@ -3147,6 +3262,9 @@ class LightRAG: ] if not existing_sources: + # No chunk references means this entity should be deleted + entities_to_delete.add(node_label) + entity_chunk_updates[node_label] = [] continue remaining_sources = subtract_source_ids(existing_sources, chunk_ids) @@ -3168,6 +3286,7 @@ class LightRAG: # Process relationships for edge_data in affected_edges: + # source target is not in normalize order in graph db property src = edge_data.get("source") tgt = edge_data.get("target") @@ -3204,6 +3323,9 @@ class LightRAG: ] if not existing_sources: + # No chunk references means this relationship should be deleted + relationships_to_delete.add(edge_tuple) + relation_chunk_updates[edge_tuple] = [] continue remaining_sources = subtract_source_ids(existing_sources, chunk_ids) @@ -3229,38 +3351,31 @@ class LightRAG: if entity_chunk_updates and self.entity_chunks: entity_upsert_payload = {} - entity_delete_ids: set[str] = set() for entity_name, remaining in entity_chunk_updates.items(): if not remaining: - entity_delete_ids.add(entity_name) - else: - entity_upsert_payload[entity_name] = { - "chunk_ids": remaining, - "count": len(remaining), - "updated_at": current_time, - } - - if entity_delete_ids: - await self.entity_chunks.delete(list(entity_delete_ids)) + # Empty entities are deleted alongside graph nodes later + continue + entity_upsert_payload[entity_name] = { + "chunk_ids": remaining, + "count": len(remaining), + "updated_at": current_time, + } if entity_upsert_payload: await self.entity_chunks.upsert(entity_upsert_payload) if relation_chunk_updates and self.relation_chunks: relation_upsert_payload = {} - relation_delete_ids: set[str] = set() for edge_tuple, remaining in relation_chunk_updates.items(): - storage_key = make_relation_chunk_key(*edge_tuple) if not remaining: - relation_delete_ids.add(storage_key) - else: - relation_upsert_payload[storage_key] = { - "chunk_ids": remaining, - "count": len(remaining), - "updated_at": current_time, - } + # Empty relations are deleted alongside graph edges later + continue + storage_key = make_relation_chunk_key(*edge_tuple) + relation_upsert_payload[storage_key] = { + "chunk_ids": remaining, + "count": len(remaining), + "updated_at": current_time, + } - if relation_delete_ids: - await self.relation_chunks.delete(list(relation_delete_ids)) if relation_upsert_payload: await self.relation_chunks.upsert(relation_upsert_payload) @@ -3268,31 +3383,111 @@ class LightRAG: logger.error(f"Failed to process graph analysis results: {e}") raise Exception(f"Failed to process graph dependencies: {e}") from e - # Use graph database lock to prevent dirty read - graph_db_lock = get_graph_db_lock(enable_logging=False) - async with graph_db_lock: - # 5. Delete chunks from storage - if chunk_ids: - try: - await self.chunks_vdb.delete(chunk_ids) - await self.text_chunks.delete(chunk_ids) + # Data integrity is ensured by allowing only one process to hold pipeline at a time(no graph db lock is needed anymore) - async with pipeline_status_lock: - log_message = f"Successfully deleted {len(chunk_ids)} chunks from storage" - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) + # 5. Delete chunks from storage + if chunk_ids: + try: + await self.chunks_vdb.delete(chunk_ids) + await self.text_chunks.delete(chunk_ids) - except Exception as e: - logger.error(f"Failed to delete chunks: {e}") - raise Exception(f"Failed to delete document chunks: {e}") from e + async with pipeline_status_lock: + log_message = ( + f"Successfully deleted {len(chunk_ids)} chunks from storage" + ) + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) - # 6. Delete relationships that have no remaining sources - if relationships_to_delete: - try: - # Delete from vector database + except Exception as e: + logger.error(f"Failed to delete chunks: {e}") + raise Exception(f"Failed to delete document chunks: {e}") from e + + # 6. Delete relationships that have no remaining sources + if relationships_to_delete: + try: + # Delete from relation vdb + rel_ids_to_delete = [] + for src, tgt in relationships_to_delete: + rel_ids_to_delete.extend( + [ + compute_mdhash_id(src + tgt, prefix="rel-"), + compute_mdhash_id(tgt + src, prefix="rel-"), + ] + ) + await self.relationships_vdb.delete(rel_ids_to_delete) + + # Delete from graph + await self.chunk_entity_relation_graph.remove_edges( + list(relationships_to_delete) + ) + + # Delete from relation_chunks storage + if self.relation_chunks: + relation_storage_keys = [ + make_relation_chunk_key(src, tgt) + for src, tgt in relationships_to_delete + ] + await self.relation_chunks.delete(relation_storage_keys) + + async with pipeline_status_lock: + log_message = f"Successfully deleted {len(relationships_to_delete)} relations" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + + except Exception as e: + logger.error(f"Failed to delete relationships: {e}") + raise Exception(f"Failed to delete relationships: {e}") from e + + # 7. Delete entities that have no remaining sources + if entities_to_delete: + try: + # Batch get all edges for entities to avoid N+1 query problem + nodes_edges_dict = ( + await self.chunk_entity_relation_graph.get_nodes_edges_batch( + list(entities_to_delete) + ) + ) + + # Debug: Check and log all edges before deleting nodes + edges_to_delete = set() + edges_still_exist = 0 + + for entity, edges in nodes_edges_dict.items(): + if edges: + for src, tgt in edges: + # Normalize edge representation (sorted for consistency) + edge_tuple = tuple(sorted((src, tgt))) + edges_to_delete.add(edge_tuple) + + if ( + src in entities_to_delete + and tgt in entities_to_delete + ): + logger.warning( + f"Edge still exists: {src} <-> {tgt}" + ) + elif src in entities_to_delete: + logger.warning( + f"Edge still exists: {src} --> {tgt}" + ) + else: + logger.warning( + f"Edge still exists: {src} <-- {tgt}" + ) + edges_still_exist += 1 + + if edges_still_exist: + logger.warning( + f"⚠️ {edges_still_exist} entities still has edges before deletion" + ) + + # Clean residual edges from VDB and storage before deleting nodes + if edges_to_delete: + # Delete from relationships_vdb rel_ids_to_delete = [] - for src, tgt in relationships_to_delete: + for src, tgt in edges_to_delete: rel_ids_to_delete.extend( [ compute_mdhash_id(src + tgt, prefix="rel-"), @@ -3301,123 +3496,48 @@ class LightRAG: ) await self.relationships_vdb.delete(rel_ids_to_delete) - # Delete from graph - await self.chunk_entity_relation_graph.remove_edges( - list(relationships_to_delete) - ) - # Delete from relation_chunks storage if self.relation_chunks: relation_storage_keys = [ make_relation_chunk_key(src, tgt) - for src, tgt in relationships_to_delete + for src, tgt in edges_to_delete ] await self.relation_chunks.delete(relation_storage_keys) - async with pipeline_status_lock: - log_message = f"Successfully deleted {len(relationships_to_delete)} relations" - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) - - except Exception as e: - logger.error(f"Failed to delete relationships: {e}") - raise Exception(f"Failed to delete relationships: {e}") from e - - # 7. Delete entities that have no remaining sources - if entities_to_delete: - try: - # Batch get all edges for entities to avoid N+1 query problem - nodes_edges_dict = await self.chunk_entity_relation_graph.get_nodes_edges_batch( - list(entities_to_delete) + logger.info( + f"Cleaned {len(edges_to_delete)} residual edges from VDB and chunk-tracking storage" ) - # Debug: Check and log all edges before deleting nodes - edges_to_delete = set() - edges_still_exist = 0 + # Delete from graph (edges will be auto-deleted with nodes) + await self.chunk_entity_relation_graph.remove_nodes( + list(entities_to_delete) + ) - for entity, edges in nodes_edges_dict.items(): - if edges: - for src, tgt in edges: - # Normalize edge representation (sorted for consistency) - edge_tuple = tuple(sorted((src, tgt))) - edges_to_delete.add(edge_tuple) + # Delete from vector vdb + entity_vdb_ids = [ + compute_mdhash_id(entity, prefix="ent-") + for entity in entities_to_delete + ] + await self.entities_vdb.delete(entity_vdb_ids) - if ( - src in entities_to_delete - and tgt in entities_to_delete - ): - logger.warning( - f"Edge still exists: {src} <-> {tgt}" - ) - elif src in entities_to_delete: - logger.warning( - f"Edge still exists: {src} --> {tgt}" - ) - else: - logger.warning( - f"Edge still exists: {src} <-- {tgt}" - ) - edges_still_exist += 1 + # Delete from entity_chunks storage + if self.entity_chunks: + await self.entity_chunks.delete(list(entities_to_delete)) - if edges_still_exist: - logger.warning( - f"⚠️ {edges_still_exist} entities still has edges before deletion" - ) - - # Clean residual edges from VDB and storage before deleting nodes - if edges_to_delete: - # Delete from relationships_vdb - rel_ids_to_delete = [] - for src, tgt in edges_to_delete: - rel_ids_to_delete.extend( - [ - compute_mdhash_id(src + tgt, prefix="rel-"), - compute_mdhash_id(tgt + src, prefix="rel-"), - ] - ) - await self.relationships_vdb.delete(rel_ids_to_delete) - - # Delete from relation_chunks storage - if self.relation_chunks: - relation_storage_keys = [ - make_relation_chunk_key(src, tgt) - for src, tgt in edges_to_delete - ] - await self.relation_chunks.delete(relation_storage_keys) - - logger.info( - f"Cleaned {len(edges_to_delete)} residual edges from VDB and chunk-tracking storage" - ) - - # Delete from graph (edges will be auto-deleted with nodes) - await self.chunk_entity_relation_graph.remove_nodes( - list(entities_to_delete) + async with pipeline_status_lock: + log_message = ( + f"Successfully deleted {len(entities_to_delete)} entities" ) + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) - # Delete from vector database - entity_vdb_ids = [ - compute_mdhash_id(entity, prefix="ent-") - for entity in entities_to_delete - ] - await self.entities_vdb.delete(entity_vdb_ids) + except Exception as e: + logger.error(f"Failed to delete entities: {e}") + raise Exception(f"Failed to delete entities: {e}") from e - # Delete from entity_chunks storage - if self.entity_chunks: - await self.entity_chunks.delete(list(entities_to_delete)) - - async with pipeline_status_lock: - log_message = f"Successfully deleted {len(entities_to_delete)} entities" - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) - - except Exception as e: - logger.error(f"Failed to delete entities: {e}") - raise Exception(f"Failed to delete entities: {e}") from e - - # Persist changes to graph database before releasing graph database lock - await self._insert_done() + # Persist changes to graph database before entity and relationship rebuild + await self._insert_done() # 8. Rebuild entities and relationships from remaining chunks if entities_to_rebuild or relationships_to_rebuild: @@ -3523,6 +3643,18 @@ class LightRAG: f"No deletion operations were started for document {doc_id}, skipping persistence" ) + # Release pipeline only if WE acquired it + if we_acquired_pipeline: + async with pipeline_status_lock: + pipeline_status["busy"] = False + pipeline_status["cancellation_requested"] = False + completion_msg = ( + f"Deletion process completed for document: {doc_id}" + ) + pipeline_status["latest_message"] = completion_msg + pipeline_status["history_messages"].append(completion_msg) + logger.info(completion_msg) + async def adelete_by_entity(self, entity_name: str) -> DeletionResult: """Asynchronously delete an entity and all its relationships.