diff --git a/lightrag/api/config.py b/lightrag/api/config.py index 95ab9f70..4f59d3c1 100644 --- a/lightrag/api/config.py +++ b/lightrag/api/config.py @@ -445,6 +445,11 @@ def parse_args() -> argparse.Namespace: "EMBEDDING_BATCH_NUM", DEFAULT_EMBEDDING_BATCH_NUM, int ) + # Embedding token limit configuration + args.embedding_token_limit = get_env_value( + "EMBEDDING_TOKEN_LIMIT", None, int, special_none=True + ) + ollama_server_infos.LIGHTRAG_NAME = args.simulated_model_name ollama_server_infos.LIGHTRAG_TAG = args.simulated_model_tag diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index 01ccbbdf..939129c0 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -777,6 +777,11 @@ def create_app(args): send_dimensions=send_dimensions, ) + # Set max_token_size if EMBEDDING_TOKEN_LIMIT is provided + if args.embedding_token_limit is not None: + embedding_func.max_token_size = args.embedding_token_limit + logger.info(f"Set embedding max_token_size to {args.embedding_token_limit}") + # Configure rerank function based on args.rerank_bindingparameter rerank_model_func = None if args.rerank_binding != "null": diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 8a638759..67ec2308 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -64,10 +64,10 @@ from lightrag.kg import ( from lightrag.kg.shared_storage import ( get_namespace_data, + get_graph_db_lock, get_data_init_lock, - get_default_workspace, - set_default_workspace, - get_namespace_lock, + get_storage_keyed_lock, + initialize_pipeline_status, ) from lightrag.base import ( @@ -260,15 +260,13 @@ class LightRAG: - `content`: The text to be split into chunks. - `split_by_character`: The character to split the text on. If None, the text is split into chunks of `chunk_token_size` tokens. - `split_by_character_only`: If True, the text is split only on the specified character. - - `chunk_overlap_token_size`: The number of overlapping tokens between consecutive chunks. - `chunk_token_size`: The maximum number of tokens per chunk. - + - `chunk_overlap_token_size`: The number of overlapping tokens between consecutive chunks. The function should return a list of dictionaries (or an awaitable that resolves to a list), where each dictionary contains the following keys: - - `tokens` (int): The number of tokens in the chunk. - - `content` (str): The text content of the chunk. - - `chunk_order_index` (int): Zero-based index indicating the chunk's order in the document. + - `tokens`: The number of tokens in the chunk. + - `content`: The text content of the chunk. Defaults to `chunking_by_token_size` if not specified. """ @@ -279,8 +277,12 @@ class LightRAG: embedding_func: EmbeddingFunc | None = field(default=None) """Function for computing text embeddings. Must be set before use.""" - embedding_token_limit: int | None = field(default=None, init=False) - """Token limit for embedding model. Set automatically from embedding_func.max_token_size in __post_init__.""" + @property + def embedding_token_limit(self) -> int | None: + """Get the token limit for embedding model from embedding_func.""" + if self.embedding_func and hasattr(self.embedding_func, "max_token_size"): + return self.embedding_func.max_token_size + return None embedding_batch_num: int = field(default=int(os.getenv("EMBEDDING_BATCH_NUM", 10))) """Batch size for embedding computations.""" @@ -525,16 +527,6 @@ class LightRAG: logger.debug(f"LightRAG init with param:\n {_print_config}\n") # Init Embedding - # Step 1: Capture max_token_size before applying decorator (decorator strips dataclass attributes) - embedding_max_token_size = None - if self.embedding_func and hasattr(self.embedding_func, "max_token_size"): - embedding_max_token_size = self.embedding_func.max_token_size - logger.debug( - f"Captured embedding max_token_size: {embedding_max_token_size}" - ) - self.embedding_token_limit = embedding_max_token_size - - # Step 2: Apply priority wrapper decorator self.embedding_func = priority_limit_async_func_call( self.embedding_func_max_async, llm_timeout=self.default_embedding_timeout, @@ -661,21 +653,12 @@ class LightRAG: async def initialize_storages(self): """Storage initialization must be called one by one to prevent deadlock""" if self._storages_status == StoragesStatus.CREATED: - # Set the first initialized workspace will set the default workspace - # Allows namespace operation without specifying workspace for backward compatibility - default_workspace = get_default_workspace() - if default_workspace is None: - set_default_workspace(self.workspace) - elif default_workspace != self.workspace: - logger.info( - f"Creating LightRAG instance with workspace='{self.workspace}' " - f"while default workspace is set to '{default_workspace}'" - ) + # Set default workspace for backward compatibility + # This allows initialize_pipeline_status() called without parameters + # to use the correct workspace + from lightrag.kg.shared_storage import set_default_workspace - # Auto-initialize pipeline_status for this workspace - from lightrag.kg.shared_storage import initialize_pipeline_status - - await initialize_pipeline_status(workspace=self.workspace) + set_default_workspace(self.workspace) for storage in ( self.full_docs, @@ -1611,13 +1594,23 @@ class LightRAG: """ # Get pipeline status shared data and lock - pipeline_status = await get_namespace_data( - "pipeline_status", workspace=self.workspace - ) - pipeline_status_lock = get_namespace_lock( - "pipeline_status", workspace=self.workspace + # Step 1: Get workspace + workspace = self.workspace + + # Step 2: Construct namespace (following GraphDB pattern) + namespace = f"{workspace}:pipeline" if workspace else "pipeline_status" + + # Step 3: Ensure initialization (on first access) + await initialize_pipeline_status(workspace) + + # Step 4: Get lock + pipeline_status_lock = get_storage_keyed_lock( + keys="status", namespace=namespace, enable_logging=False ) + # Step 5: Get data + pipeline_status = await get_namespace_data(namespace) + # Check if another process is already processing the queue async with pipeline_status_lock: # Ensure only one worker is processing documents @@ -2950,26 +2943,6 @@ class LightRAG: data across different storage layers are removed or rebuiled. If entities or relationships are partially affected, they will be rebuilded using LLM cached from remaining documents. - **Concurrency Control Design:** - - This function implements a pipeline-based concurrency control to prevent data corruption: - - 1. **Single Document Deletion** (when WE acquire pipeline): - - Sets job_name to "Single document deletion" (NOT starting with "deleting") - - Prevents other adelete_by_doc_id calls from running concurrently - - Ensures exclusive access to graph operations for this deletion - - 2. **Batch Document Deletion** (when background_delete_documents acquires pipeline): - - Sets job_name to "Deleting {N} Documents" (starts with "deleting") - - Allows multiple adelete_by_doc_id calls to join the deletion queue - - Each call validates the job name to ensure it's part of a deletion operation - - The validation logic `if not job_name.startswith("deleting") or "document" not in job_name` - ensures that: - - adelete_by_doc_id can only run when pipeline is idle OR during batch deletion - - Prevents concurrent single deletions that could cause race conditions - - Rejects operations when pipeline is busy with non-deletion tasks - Args: doc_id (str): The unique identifier of the document to be deleted. delete_llm_cache (bool): Whether to delete cached LLM extraction results @@ -2977,62 +2950,34 @@ class LightRAG: Returns: DeletionResult: An object containing the outcome of the deletion process. - - `status` (str): "success", "not_found", "not_allowed", or "failure". + - `status` (str): "success", "not_found", or "failure". - `doc_id` (str): The ID of the document attempted to be deleted. - `message` (str): A summary of the operation's result. - - `status_code` (int): HTTP status code (e.g., 200, 404, 403, 500). + - `status_code` (int): HTTP status code (e.g., 200, 404, 500). - `file_path` (str | None): The file path of the deleted document, if available. """ - # Get pipeline status shared data and lock for validation - pipeline_status = await get_namespace_data( - "pipeline_status", workspace=self.workspace - ) - pipeline_status_lock = get_namespace_lock( - "pipeline_status", workspace=self.workspace - ) - - # Track whether WE acquired the pipeline - we_acquired_pipeline = False - - # Check and acquire pipeline if needed - async with pipeline_status_lock: - if not pipeline_status.get("busy", False): - # Pipeline is idle - WE acquire it for this deletion - we_acquired_pipeline = True - pipeline_status.update( - { - "busy": True, - "job_name": "Single document deletion", - "job_start": datetime.now(timezone.utc).isoformat(), - "docs": 1, - "batchs": 1, - "cur_batch": 0, - "request_pending": False, - "cancellation_requested": False, - "latest_message": f"Starting deletion for document: {doc_id}", - } - ) - # Initialize history messages - pipeline_status["history_messages"][:] = [ - f"Starting deletion for document: {doc_id}" - ] - else: - # Pipeline already busy - verify it's a deletion job - job_name = pipeline_status.get("job_name", "").lower() - if not job_name.startswith("deleting") or "document" not in job_name: - return DeletionResult( - status="not_allowed", - doc_id=doc_id, - message=f"Deletion not allowed: current job '{pipeline_status.get('job_name')}' is not a document deletion job", - status_code=403, - file_path=None, - ) - # Pipeline is busy with deletion - proceed without acquiring - deletion_operations_started = False original_exception = None doc_llm_cache_ids: list[str] = [] + # Get pipeline status shared data and lock for status updates + # Step 1: Get workspace + workspace = self.workspace + + # Step 2: Construct namespace (following GraphDB pattern) + namespace = f"{workspace}:pipeline" if workspace else "pipeline_status" + + # Step 3: Ensure initialization (on first access) + await initialize_pipeline_status(workspace) + + # Step 4: Get lock + pipeline_status_lock = get_storage_keyed_lock( + keys="status", namespace=namespace, enable_logging=False + ) + + # Step 5: Get data + pipeline_status = await get_namespace_data(namespace) + async with pipeline_status_lock: log_message = f"Starting deletion process for document {doc_id}" logger.info(log_message) @@ -3385,111 +3330,31 @@ class LightRAG: logger.error(f"Failed to process graph analysis results: {e}") raise Exception(f"Failed to process graph dependencies: {e}") from e - # Data integrity is ensured by allowing only one process to hold pipeline at a time(no graph db lock is needed anymore) + # Use graph database lock to prevent dirty read + graph_db_lock = get_graph_db_lock(enable_logging=False) + async with graph_db_lock: + # 5. Delete chunks from storage + if chunk_ids: + try: + await self.chunks_vdb.delete(chunk_ids) + await self.text_chunks.delete(chunk_ids) - # 5. Delete chunks from storage - if chunk_ids: - try: - await self.chunks_vdb.delete(chunk_ids) - await self.text_chunks.delete(chunk_ids) + async with pipeline_status_lock: + log_message = f"Successfully deleted {len(chunk_ids)} chunks from storage" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) - async with pipeline_status_lock: - log_message = ( - f"Successfully deleted {len(chunk_ids)} chunks from storage" - ) - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) + except Exception as e: + logger.error(f"Failed to delete chunks: {e}") + raise Exception(f"Failed to delete document chunks: {e}") from e - except Exception as e: - logger.error(f"Failed to delete chunks: {e}") - raise Exception(f"Failed to delete document chunks: {e}") from e - - # 6. Delete relationships that have no remaining sources - if relationships_to_delete: - try: - # Delete from relation vdb - rel_ids_to_delete = [] - for src, tgt in relationships_to_delete: - rel_ids_to_delete.extend( - [ - compute_mdhash_id(src + tgt, prefix="rel-"), - compute_mdhash_id(tgt + src, prefix="rel-"), - ] - ) - await self.relationships_vdb.delete(rel_ids_to_delete) - - # Delete from graph - await self.chunk_entity_relation_graph.remove_edges( - list(relationships_to_delete) - ) - - # Delete from relation_chunks storage - if self.relation_chunks: - relation_storage_keys = [ - make_relation_chunk_key(src, tgt) - for src, tgt in relationships_to_delete - ] - await self.relation_chunks.delete(relation_storage_keys) - - async with pipeline_status_lock: - log_message = f"Successfully deleted {len(relationships_to_delete)} relations" - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) - - except Exception as e: - logger.error(f"Failed to delete relationships: {e}") - raise Exception(f"Failed to delete relationships: {e}") from e - - # 7. Delete entities that have no remaining sources - if entities_to_delete: - try: - # Batch get all edges for entities to avoid N+1 query problem - nodes_edges_dict = ( - await self.chunk_entity_relation_graph.get_nodes_edges_batch( - list(entities_to_delete) - ) - ) - - # Debug: Check and log all edges before deleting nodes - edges_to_delete = set() - edges_still_exist = 0 - - for entity, edges in nodes_edges_dict.items(): - if edges: - for src, tgt in edges: - # Normalize edge representation (sorted for consistency) - edge_tuple = tuple(sorted((src, tgt))) - edges_to_delete.add(edge_tuple) - - if ( - src in entities_to_delete - and tgt in entities_to_delete - ): - logger.warning( - f"Edge still exists: {src} <-> {tgt}" - ) - elif src in entities_to_delete: - logger.warning( - f"Edge still exists: {src} --> {tgt}" - ) - else: - logger.warning( - f"Edge still exists: {src} <-- {tgt}" - ) - edges_still_exist += 1 - - if edges_still_exist: - logger.warning( - f"⚠️ {edges_still_exist} entities still has edges before deletion" - ) - - # Clean residual edges from VDB and storage before deleting nodes - if edges_to_delete: - # Delete from relationships_vdb + # 6. Delete relationships that have no remaining sources + if relationships_to_delete: + try: + # Delete from relation vdb rel_ids_to_delete = [] - for src, tgt in edges_to_delete: + for src, tgt in relationships_to_delete: rel_ids_to_delete.extend( [ compute_mdhash_id(src + tgt, prefix="rel-"), @@ -3498,48 +3363,123 @@ class LightRAG: ) await self.relationships_vdb.delete(rel_ids_to_delete) + # Delete from graph + await self.chunk_entity_relation_graph.remove_edges( + list(relationships_to_delete) + ) + # Delete from relation_chunks storage if self.relation_chunks: relation_storage_keys = [ make_relation_chunk_key(src, tgt) - for src, tgt in edges_to_delete + for src, tgt in relationships_to_delete ] await self.relation_chunks.delete(relation_storage_keys) - logger.info( - f"Cleaned {len(edges_to_delete)} residual edges from VDB and chunk-tracking storage" + async with pipeline_status_lock: + log_message = f"Successfully deleted {len(relationships_to_delete)} relations" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + + except Exception as e: + logger.error(f"Failed to delete relationships: {e}") + raise Exception(f"Failed to delete relationships: {e}") from e + + # 7. Delete entities that have no remaining sources + if entities_to_delete: + try: + # Batch get all edges for entities to avoid N+1 query problem + nodes_edges_dict = await self.chunk_entity_relation_graph.get_nodes_edges_batch( + list(entities_to_delete) ) - # Delete from graph (edges will be auto-deleted with nodes) - await self.chunk_entity_relation_graph.remove_nodes( - list(entities_to_delete) - ) + # Debug: Check and log all edges before deleting nodes + edges_to_delete = set() + edges_still_exist = 0 - # Delete from vector vdb - entity_vdb_ids = [ - compute_mdhash_id(entity, prefix="ent-") - for entity in entities_to_delete - ] - await self.entities_vdb.delete(entity_vdb_ids) + for entity, edges in nodes_edges_dict.items(): + if edges: + for src, tgt in edges: + # Normalize edge representation (sorted for consistency) + edge_tuple = tuple(sorted((src, tgt))) + edges_to_delete.add(edge_tuple) - # Delete from entity_chunks storage - if self.entity_chunks: - await self.entity_chunks.delete(list(entities_to_delete)) + if ( + src in entities_to_delete + and tgt in entities_to_delete + ): + logger.warning( + f"Edge still exists: {src} <-> {tgt}" + ) + elif src in entities_to_delete: + logger.warning( + f"Edge still exists: {src} --> {tgt}" + ) + else: + logger.warning( + f"Edge still exists: {src} <-- {tgt}" + ) + edges_still_exist += 1 - async with pipeline_status_lock: - log_message = ( - f"Successfully deleted {len(entities_to_delete)} entities" + if edges_still_exist: + logger.warning( + f"⚠️ {edges_still_exist} entities still has edges before deletion" + ) + + # Clean residual edges from VDB and storage before deleting nodes + if edges_to_delete: + # Delete from relationships_vdb + rel_ids_to_delete = [] + for src, tgt in edges_to_delete: + rel_ids_to_delete.extend( + [ + compute_mdhash_id(src + tgt, prefix="rel-"), + compute_mdhash_id(tgt + src, prefix="rel-"), + ] + ) + await self.relationships_vdb.delete(rel_ids_to_delete) + + # Delete from relation_chunks storage + if self.relation_chunks: + relation_storage_keys = [ + make_relation_chunk_key(src, tgt) + for src, tgt in edges_to_delete + ] + await self.relation_chunks.delete(relation_storage_keys) + + logger.info( + f"Cleaned {len(edges_to_delete)} residual edges from VDB and chunk-tracking storage" + ) + + # Delete from graph (edges will be auto-deleted with nodes) + await self.chunk_entity_relation_graph.remove_nodes( + list(entities_to_delete) ) - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) - except Exception as e: - logger.error(f"Failed to delete entities: {e}") - raise Exception(f"Failed to delete entities: {e}") from e + # Delete from vector vdb + entity_vdb_ids = [ + compute_mdhash_id(entity, prefix="ent-") + for entity in entities_to_delete + ] + await self.entities_vdb.delete(entity_vdb_ids) - # Persist changes to graph database before entity and relationship rebuild - await self._insert_done() + # Delete from entity_chunks storage + if self.entity_chunks: + await self.entity_chunks.delete(list(entities_to_delete)) + + async with pipeline_status_lock: + log_message = f"Successfully deleted {len(entities_to_delete)} entities" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + + except Exception as e: + logger.error(f"Failed to delete entities: {e}") + raise Exception(f"Failed to delete entities: {e}") from e + + # Persist changes to graph database before releasing graph database lock + await self._insert_done() # 8. Rebuild entities and relationships from remaining chunks if entities_to_rebuild or relationships_to_rebuild: @@ -3645,18 +3585,6 @@ class LightRAG: f"No deletion operations were started for document {doc_id}, skipping persistence" ) - # Release pipeline only if WE acquired it - if we_acquired_pipeline: - async with pipeline_status_lock: - pipeline_status["busy"] = False - pipeline_status["cancellation_requested"] = False - completion_msg = ( - f"Deletion process completed for document: {doc_id}" - ) - pipeline_status["latest_message"] = completion_msg - pipeline_status["history_messages"].append(completion_msg) - logger.info(completion_msg) - async def adelete_by_entity(self, entity_name: str) -> DeletionResult: """Asynchronously delete an entity and all its relationships.