diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 8a638759..d288685e 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -3,7 +3,6 @@ from __future__ import annotations import traceback import asyncio import configparser -import inspect import os import time import warnings @@ -13,7 +12,6 @@ from functools import partial from typing import ( Any, AsyncIterator, - Awaitable, Callable, Iterator, cast, @@ -22,10 +20,7 @@ from typing import ( Optional, List, Dict, - Union, ) -from lightrag.prompt import PROMPTS -from lightrag.exceptions import PipelineCancelledException from lightrag.constants import ( DEFAULT_MAX_GLEANING, DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE, @@ -44,15 +39,10 @@ from lightrag.constants import ( DEFAULT_MAX_ASYNC, DEFAULT_MAX_PARALLEL_INSERT, DEFAULT_MAX_GRAPH_NODES, - DEFAULT_MAX_SOURCE_IDS_PER_ENTITY, - DEFAULT_MAX_SOURCE_IDS_PER_RELATION, DEFAULT_ENTITY_TYPES, DEFAULT_SUMMARY_LANGUAGE, DEFAULT_LLM_TIMEOUT, DEFAULT_EMBEDDING_TIMEOUT, - DEFAULT_SOURCE_IDS_LIMIT_METHOD, - DEFAULT_MAX_FILE_PATHS, - DEFAULT_FILE_PATH_MORE_PLACEHOLDER, ) from lightrag.utils import get_env_value @@ -64,10 +54,9 @@ from lightrag.kg import ( from lightrag.kg.shared_storage import ( get_namespace_data, + get_pipeline_status_lock, + get_graph_db_lock, get_data_init_lock, - get_default_workspace, - set_default_workspace, - get_namespace_lock, ) from lightrag.base import ( @@ -91,7 +80,7 @@ from lightrag.operate import ( merge_nodes_and_edges, kg_query, naive_query, - rebuild_knowledge_from_chunks, + _rebuild_knowledge_from_chunks, ) from lightrag.constants import GRAPH_FIELD_SEP from lightrag.utils import ( @@ -108,9 +97,6 @@ from lightrag.utils import ( generate_track_id, convert_to_user_format, logger, - subtract_source_ids, - make_relation_chunk_key, - normalize_source_ids_limit_method, ) from lightrag.types import KnowledgeGraph from dotenv import load_dotenv @@ -247,28 +233,23 @@ class LightRAG: int, int, ], - Union[List[Dict[str, Any]], Awaitable[List[Dict[str, Any]]]], + List[Dict[str, Any]], ] = field(default_factory=lambda: chunking_by_token_size) """ Custom chunking function for splitting text into chunks before processing. - The function can be either synchronous or asynchronous. - The function should take the following parameters: - `tokenizer`: A Tokenizer instance to use for tokenization. - `content`: The text to be split into chunks. - `split_by_character`: The character to split the text on. If None, the text is split into chunks of `chunk_token_size` tokens. - `split_by_character_only`: If True, the text is split only on the specified character. - - `chunk_overlap_token_size`: The number of overlapping tokens between consecutive chunks. - `chunk_token_size`: The maximum number of tokens per chunk. + - `chunk_overlap_token_size`: The number of overlapping tokens between consecutive chunks. - - The function should return a list of dictionaries (or an awaitable that resolves to a list), - where each dictionary contains the following keys: - - `tokens` (int): The number of tokens in the chunk. - - `content` (str): The text content of the chunk. - - `chunk_order_index` (int): Zero-based index indicating the chunk's order in the document. + The function should return a list of dictionaries, where each dictionary contains the following keys: + - `tokens`: The number of tokens in the chunk. + - `content`: The text content of the chunk. Defaults to `chunking_by_token_size` if not specified. """ @@ -279,9 +260,6 @@ class LightRAG: embedding_func: EmbeddingFunc | None = field(default=None) """Function for computing text embeddings. Must be set before use.""" - embedding_token_limit: int | None = field(default=None, init=False) - """Token limit for embedding model. Set automatically from embedding_func.max_token_size in __post_init__.""" - embedding_batch_num: int = field(default=int(os.getenv("EMBEDDING_BATCH_NUM", 10))) """Batch size for embedding computations.""" @@ -381,41 +359,6 @@ class LightRAG: ) """Maximum number of graph nodes to return in knowledge graph queries.""" - max_source_ids_per_entity: int = field( - default=get_env_value( - "MAX_SOURCE_IDS_PER_ENTITY", DEFAULT_MAX_SOURCE_IDS_PER_ENTITY, int - ) - ) - """Maximum number of source (chunk) ids in entity Grpah + VDB.""" - - max_source_ids_per_relation: int = field( - default=get_env_value( - "MAX_SOURCE_IDS_PER_RELATION", - DEFAULT_MAX_SOURCE_IDS_PER_RELATION, - int, - ) - ) - """Maximum number of source (chunk) ids in relation Graph + VDB.""" - - source_ids_limit_method: str = field( - default_factory=lambda: normalize_source_ids_limit_method( - get_env_value( - "SOURCE_IDS_LIMIT_METHOD", - DEFAULT_SOURCE_IDS_LIMIT_METHOD, - str, - ) - ) - ) - """Strategy for enforcing source_id limits: IGNORE_NEW or FIFO.""" - - max_file_paths: int = field( - default=get_env_value("MAX_FILE_PATHS", DEFAULT_MAX_FILE_PATHS, int) - ) - """Maximum number of file paths to store in entity/relation file_path field.""" - - file_path_more_placeholder: str = field(default=DEFAULT_FILE_PATH_MORE_PLACEHOLDER) - """Placeholder text when file paths exceed max_file_paths limit.""" - addon_params: dict[str, Any] = field( default_factory=lambda: { "language": get_env_value( @@ -525,16 +468,6 @@ class LightRAG: logger.debug(f"LightRAG init with param:\n {_print_config}\n") # Init Embedding - # Step 1: Capture max_token_size before applying decorator (decorator strips dataclass attributes) - embedding_max_token_size = None - if self.embedding_func and hasattr(self.embedding_func, "max_token_size"): - embedding_max_token_size = self.embedding_func.max_token_size - logger.debug( - f"Captured embedding max_token_size: {embedding_max_token_size}" - ) - self.embedding_token_limit = embedding_max_token_size - - # Step 2: Apply priority wrapper decorator self.embedding_func = priority_limit_async_func_call( self.embedding_func_max_async, llm_timeout=self.default_embedding_timeout, @@ -595,18 +528,6 @@ class LightRAG: embedding_func=self.embedding_func, ) - self.entity_chunks: BaseKVStorage = self.key_string_value_json_storage_cls( # type: ignore - namespace=NameSpace.KV_STORE_ENTITY_CHUNKS, - workspace=self.workspace, - embedding_func=self.embedding_func, - ) - - self.relation_chunks: BaseKVStorage = self.key_string_value_json_storage_cls( # type: ignore - namespace=NameSpace.KV_STORE_RELATION_CHUNKS, - workspace=self.workspace, - embedding_func=self.embedding_func, - ) - self.chunk_entity_relation_graph: BaseGraphStorage = self.graph_storage_cls( # type: ignore namespace=NameSpace.GRAPH_STORE_CHUNK_ENTITY_RELATION, workspace=self.workspace, @@ -661,29 +582,11 @@ class LightRAG: async def initialize_storages(self): """Storage initialization must be called one by one to prevent deadlock""" if self._storages_status == StoragesStatus.CREATED: - # Set the first initialized workspace will set the default workspace - # Allows namespace operation without specifying workspace for backward compatibility - default_workspace = get_default_workspace() - if default_workspace is None: - set_default_workspace(self.workspace) - elif default_workspace != self.workspace: - logger.info( - f"Creating LightRAG instance with workspace='{self.workspace}' " - f"while default workspace is set to '{default_workspace}'" - ) - - # Auto-initialize pipeline_status for this workspace - from lightrag.kg.shared_storage import initialize_pipeline_status - - await initialize_pipeline_status(workspace=self.workspace) - for storage in ( self.full_docs, self.text_chunks, self.full_entities, self.full_relations, - self.entity_chunks, - self.relation_chunks, self.entities_vdb, self.relationships_vdb, self.chunks_vdb, @@ -706,8 +609,6 @@ class LightRAG: ("text_chunks", self.text_chunks), ("full_entities", self.full_entities), ("full_relations", self.full_relations), - ("entity_chunks", self.entity_chunks), - ("relation_chunks", self.relation_chunks), ("entities_vdb", self.entities_vdb), ("relationships_vdb", self.relationships_vdb), ("chunks_vdb", self.chunks_vdb), @@ -748,7 +649,7 @@ class LightRAG: async def check_and_migrate_data(self): """Check if data migration is needed and perform migration if necessary""" - async with get_data_init_lock(): + async with get_data_init_lock(enable_logging=True): try: # Check if migration is needed: # 1. chunk_entity_relation_graph has entities and relations (count > 0) @@ -763,13 +664,6 @@ class LightRAG: logger.debug("No entities found in graph, skipping migration check") return - try: - # Initialize chunk tracking storage after migration - await self._migrate_chunk_tracking_storage() - except Exception as e: - logger.error(f"Error during chunk_tracking migration: {e}") - raise e - # Check if full_entities and full_relations are empty # Get all processed documents to check their entity/relation data try: @@ -810,11 +704,11 @@ class LightRAG: except Exception as e: logger.error(f"Error during migration check: {e}") - raise e + # Don't raise the error, just log it to avoid breaking initialization except Exception as e: logger.error(f"Error in data migration check: {e}") - raise e + # Don't raise the error to avoid breaking initialization async def _migrate_entity_relation_data(self, processed_docs: dict): """Migrate existing entity and relation data to full_entities and full_relations storage""" @@ -913,140 +807,6 @@ class LightRAG: f"Data migration completed: migrated {migration_count} documents with entities/relations" ) - async def _migrate_chunk_tracking_storage(self) -> None: - """Ensure entity/relation chunk tracking KV stores exist and are seeded.""" - - if not self.entity_chunks or not self.relation_chunks: - return - - need_entity_migration = False - need_relation_migration = False - - try: - need_entity_migration = await self.entity_chunks.is_empty() - except Exception as exc: # pragma: no cover - defensive logging - logger.error(f"Failed to check entity chunks storage: {exc}") - raise exc - - try: - need_relation_migration = await self.relation_chunks.is_empty() - except Exception as exc: # pragma: no cover - defensive logging - logger.error(f"Failed to check relation chunks storage: {exc}") - raise exc - - if not need_entity_migration and not need_relation_migration: - return - - BATCH_SIZE = 500 # Process 500 records per batch - - if need_entity_migration: - try: - nodes = await self.chunk_entity_relation_graph.get_all_nodes() - except Exception as exc: - logger.error(f"Failed to fetch nodes for chunk migration: {exc}") - nodes = [] - - logger.info(f"Starting chunk_tracking data migration: {len(nodes)} nodes") - - # Process nodes in batches - total_nodes = len(nodes) - total_batches = (total_nodes + BATCH_SIZE - 1) // BATCH_SIZE - total_migrated = 0 - - for batch_idx in range(total_batches): - start_idx = batch_idx * BATCH_SIZE - end_idx = min((batch_idx + 1) * BATCH_SIZE, total_nodes) - batch_nodes = nodes[start_idx:end_idx] - - upsert_payload: dict[str, dict[str, object]] = {} - for node in batch_nodes: - entity_id = node.get("entity_id") or node.get("id") - if not entity_id: - continue - - raw_source = node.get("source_id") or "" - chunk_ids = [ - chunk_id - for chunk_id in raw_source.split(GRAPH_FIELD_SEP) - if chunk_id - ] - if not chunk_ids: - continue - - upsert_payload[entity_id] = { - "chunk_ids": chunk_ids, - "count": len(chunk_ids), - } - - if upsert_payload: - await self.entity_chunks.upsert(upsert_payload) - total_migrated += len(upsert_payload) - logger.info( - f"Processed entity batch {batch_idx + 1}/{total_batches}: {len(upsert_payload)} records (total: {total_migrated}/{total_nodes})" - ) - - if total_migrated > 0: - # Persist entity_chunks data to disk - await self.entity_chunks.index_done_callback() - logger.info( - f"Entity chunk_tracking migration completed: {total_migrated} records persisted" - ) - - if need_relation_migration: - try: - edges = await self.chunk_entity_relation_graph.get_all_edges() - except Exception as exc: - logger.error(f"Failed to fetch edges for chunk migration: {exc}") - edges = [] - - logger.info(f"Starting chunk_tracking data migration: {len(edges)} edges") - - # Process edges in batches - total_edges = len(edges) - total_batches = (total_edges + BATCH_SIZE - 1) // BATCH_SIZE - total_migrated = 0 - - for batch_idx in range(total_batches): - start_idx = batch_idx * BATCH_SIZE - end_idx = min((batch_idx + 1) * BATCH_SIZE, total_edges) - batch_edges = edges[start_idx:end_idx] - - upsert_payload: dict[str, dict[str, object]] = {} - for edge in batch_edges: - src = edge.get("source") or edge.get("src_id") or edge.get("src") - tgt = edge.get("target") or edge.get("tgt_id") or edge.get("tgt") - if not src or not tgt: - continue - - raw_source = edge.get("source_id") or "" - chunk_ids = [ - chunk_id - for chunk_id in raw_source.split(GRAPH_FIELD_SEP) - if chunk_id - ] - if not chunk_ids: - continue - - storage_key = make_relation_chunk_key(src, tgt) - upsert_payload[storage_key] = { - "chunk_ids": chunk_ids, - "count": len(chunk_ids), - } - - if upsert_payload: - await self.relation_chunks.upsert(upsert_payload) - total_migrated += len(upsert_payload) - logger.info( - f"Processed relation batch {batch_idx + 1}/{total_batches}: {len(upsert_payload)} records (total: {total_migrated}/{total_edges})" - ) - - if total_migrated > 0: - # Persist relation_chunks data to disk - await self.relation_chunks.index_done_callback() - logger.info( - f"Relation chunk_tracking migration completed: {total_migrated} records persisted" - ) - async def get_graph_labels(self): text = await self.chunk_entity_relation_graph.get_all_labels() return text @@ -1611,12 +1371,8 @@ class LightRAG: """ # Get pipeline status shared data and lock - pipeline_status = await get_namespace_data( - "pipeline_status", workspace=self.workspace - ) - pipeline_status_lock = get_namespace_lock( - "pipeline_status", workspace=self.workspace - ) + pipeline_status = await get_namespace_data("pipeline_status") + pipeline_status_lock = get_pipeline_status_lock() # Check if another process is already processing the queue async with pipeline_status_lock: @@ -1646,7 +1402,6 @@ class LightRAG: "batchs": 0, # Total number of files to be processed "cur_batch": 0, # Number of files already processed "request_pending": False, # Clear any previous request - "cancellation_requested": False, # Initialize cancellation flag "latest_message": "", } ) @@ -1663,22 +1418,6 @@ class LightRAG: try: # Process documents until no more documents or requests while True: - # Check for cancellation request at the start of main loop - async with pipeline_status_lock: - if pipeline_status.get("cancellation_requested", False): - # Clear pending request - pipeline_status["request_pending"] = False - # Celar cancellation flag - pipeline_status["cancellation_requested"] = False - - log_message = "Pipeline cancelled by user" - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) - - # Exit directly, skipping request_pending check - return - if not to_process_docs: log_message = "All enqueued documents have been processed" logger.info(log_message) @@ -1741,25 +1480,14 @@ class LightRAG: semaphore: asyncio.Semaphore, ) -> None: """Process single document""" - # Initialize variables at the start to prevent UnboundLocalError in error handling - file_path = "unknown_source" - current_file_number = 0 file_extraction_stage_ok = False - processing_start_time = int(time.time()) - first_stage_tasks = [] - entity_relation_task = None - async with semaphore: nonlocal processed_count + current_file_number = 0 # Initialize to prevent UnboundLocalError in error handling first_stage_tasks = [] entity_relation_task = None try: - # Check for cancellation before starting document processing - async with pipeline_status_lock: - if pipeline_status.get("cancellation_requested", False): - raise PipelineCancelledException("User cancelled") - # Get file path from status document file_path = getattr( status_doc, "file_path", "unknown_source" @@ -1798,28 +1526,7 @@ class LightRAG: ) content = content_data["content"] - # Call chunking function, supporting both sync and async implementations - chunking_result = self.chunking_func( - self.tokenizer, - content, - split_by_character, - split_by_character_only, - self.chunk_overlap_token_size, - self.chunk_token_size, - ) - - # If result is awaitable, await to get actual result - if inspect.isawaitable(chunking_result): - chunking_result = await chunking_result - - # Validate return type - if not isinstance(chunking_result, (list, tuple)): - raise TypeError( - f"chunking_func must return a list or tuple of dicts, " - f"got {type(chunking_result)}" - ) - - # Build chunks dictionary + # Generate chunks from document chunks: dict[str, Any] = { compute_mdhash_id(dp["content"], prefix="chunk-"): { **dp, @@ -1827,7 +1534,14 @@ class LightRAG: "file_path": file_path, # Add file path to each chunk "llm_cache_list": [], # Initialize empty LLM cache list for each chunk } - for dp in chunking_result + for dp in self.chunking_func( + self.tokenizer, + content, + split_by_character, + split_by_character_only, + self.chunk_overlap_token_size, + self.chunk_token_size, + ) } if not chunks: @@ -1836,11 +1550,6 @@ class LightRAG: # Record processing start time processing_start_time = int(time.time()) - # Check for cancellation before entity extraction - async with pipeline_status_lock: - if pipeline_status.get("cancellation_requested", False): - raise PipelineCancelledException("User cancelled") - # Process document in two stages # Stage 1: Process text chunks and docs (parallel execution) doc_status_task = asyncio.create_task( @@ -1891,33 +1600,20 @@ class LightRAG: chunks, pipeline_status, pipeline_status_lock ) ) - chunk_results = await entity_relation_task + await entity_relation_task file_extraction_stage_ok = True except Exception as e: - # Check if this is a user cancellation - if isinstance(e, PipelineCancelledException): - # User cancellation - log brief message only, no traceback - error_msg = f"User cancelled {current_file_number}/{total_files}: {file_path}" - logger.warning(error_msg) - async with pipeline_status_lock: - pipeline_status["latest_message"] = error_msg - pipeline_status["history_messages"].append( - error_msg - ) - else: - # Other exceptions - log with traceback - logger.error(traceback.format_exc()) - error_msg = f"Failed to extract document {current_file_number}/{total_files}: {file_path}" - logger.error(error_msg) - async with pipeline_status_lock: - pipeline_status["latest_message"] = error_msg - pipeline_status["history_messages"].append( - traceback.format_exc() - ) - pipeline_status["history_messages"].append( - error_msg - ) + # Log error and update pipeline status + logger.error(traceback.format_exc()) + error_msg = f"Failed to extract document {current_file_number}/{total_files}: {file_path}" + logger.error(error_msg) + async with pipeline_status_lock: + pipeline_status["latest_message"] = error_msg + pipeline_status["history_messages"].append( + traceback.format_exc() + ) + pipeline_status["history_messages"].append(error_msg) # Cancel tasks that are not yet completed all_tasks = first_stage_tasks + ( @@ -1927,14 +1623,9 @@ class LightRAG: if task and not task.done(): task.cancel() - # Persistent llm cache with error handling + # Persistent llm cache if self.llm_response_cache: - try: - await self.llm_response_cache.index_done_callback() - except Exception as persist_error: - logger.error( - f"Failed to persist LLM cache: {persist_error}" - ) + await self.llm_response_cache.index_done_callback() # Record processing end time for failed case processing_end_time = int(time.time()) @@ -1964,16 +1655,8 @@ class LightRAG: # Concurrency is controlled by keyed lock for individual entities and relationships if file_extraction_stage_ok: try: - # Check for cancellation before merge - async with pipeline_status_lock: - if pipeline_status.get( - "cancellation_requested", False - ): - raise PipelineCancelledException( - "User cancelled" - ) - - # Use chunk_results from entity_relation_task + # Get chunk_results from entity_relation_task + chunk_results = await entity_relation_task await merge_nodes_and_edges( chunk_results=chunk_results, # result collected from entity_relation_task knowledge_graph_inst=self.chunk_entity_relation_graph, @@ -1986,8 +1669,6 @@ class LightRAG: pipeline_status=pipeline_status, pipeline_status_lock=pipeline_status_lock, llm_response_cache=self.llm_response_cache, - entity_chunks_storage=self.entity_chunks, - relation_chunks_storage=self.relation_chunks, current_file_number=current_file_number, total_files=total_files, file_path=file_path, @@ -2030,38 +1711,22 @@ class LightRAG: ) except Exception as e: - # Check if this is a user cancellation - if isinstance(e, PipelineCancelledException): - # User cancellation - log brief message only, no traceback - error_msg = f"User cancelled during merge {current_file_number}/{total_files}: {file_path}" - logger.warning(error_msg) - async with pipeline_status_lock: - pipeline_status["latest_message"] = error_msg - pipeline_status["history_messages"].append( - error_msg - ) - else: - # Other exceptions - log with traceback - logger.error(traceback.format_exc()) - error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}" - logger.error(error_msg) - async with pipeline_status_lock: - pipeline_status["latest_message"] = error_msg - pipeline_status["history_messages"].append( - traceback.format_exc() - ) - pipeline_status["history_messages"].append( - error_msg - ) + # Log error and update pipeline status + logger.error(traceback.format_exc()) + error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}" + logger.error(error_msg) + async with pipeline_status_lock: + pipeline_status["latest_message"] = error_msg + pipeline_status["history_messages"].append( + traceback.format_exc() + ) + pipeline_status["history_messages"].append( + error_msg + ) - # Persistent llm cache with error handling + # Persistent llm cache if self.llm_response_cache: - try: - await self.llm_response_cache.index_done_callback() - except Exception as persist_error: - logger.error( - f"Failed to persist LLM cache: {persist_error}" - ) + await self.llm_response_cache.index_done_callback() # Record processing end time for failed case processing_end_time = int(time.time()) @@ -2102,19 +1767,7 @@ class LightRAG: ) # Wait for all document processing to complete - try: - await asyncio.gather(*doc_tasks) - except PipelineCancelledException: - # Cancel all remaining tasks - for task in doc_tasks: - if not task.done(): - task.cancel() - - # Wait for all tasks to complete cancellation - await asyncio.wait(doc_tasks, return_when=asyncio.ALL_COMPLETED) - - # Exit directly (document statuses already updated in process_document) - return + await asyncio.gather(*doc_tasks) # Check if there's a pending request to process more documents (with lock) has_pending_request = False @@ -2145,14 +1798,11 @@ class LightRAG: to_process_docs.update(pending_docs) finally: - log_message = "Enqueued document processing pipeline stopped" + log_message = "Enqueued document processing pipeline stoped" logger.info(log_message) - # Always reset busy status and cancellation flag when done or if an exception occurs (with lock) + # Always reset busy status when done or if an exception occurs (with lock) async with pipeline_status_lock: pipeline_status["busy"] = False - pipeline_status["cancellation_requested"] = ( - False # Always reset cancellation flag - ) pipeline_status["latest_message"] = log_message pipeline_status["history_messages"].append(log_message) @@ -2188,8 +1838,6 @@ class LightRAG: self.text_chunks, self.full_entities, self.full_relations, - self.entity_chunks, - self.relation_chunks, self.llm_response_cache, self.entities_vdb, self.relationships_vdb, @@ -2639,35 +2287,20 @@ class LightRAG: else: raise ValueError(f"Unknown mode {data_param.mode}") - if query_result is None: - no_result_message = "Query returned no results" - if data_param.mode == "naive": - no_result_message = "No relevant document chunks found." - final_data: dict[str, Any] = { - "status": "failure", - "message": no_result_message, - "data": {}, - "metadata": { - "failure_reason": "no_results", - "mode": data_param.mode, - }, - } - logger.info("[aquery_data] Query returned no results.") - else: - # Extract raw_data from QueryResult - final_data = query_result.raw_data or {} + # Extract raw_data from QueryResult + final_data = query_result.raw_data if query_result else {} - # Log final result counts - adapt to new data format from convert_to_user_format - if final_data and "data" in final_data: - data_section = final_data["data"] - entities_count = len(data_section.get("entities", [])) - relationships_count = len(data_section.get("relationships", [])) - chunks_count = len(data_section.get("chunks", [])) - logger.debug( - f"[aquery_data] Final result: {entities_count} entities, {relationships_count} relationships, {chunks_count} chunks" - ) - else: - logger.warning("[aquery_data] No data section found in query result") + # Log final result counts - adapt to new data format from convert_to_user_format + if final_data and "data" in final_data: + data_section = final_data["data"] + entities_count = len(data_section.get("entities", [])) + relationships_count = len(data_section.get("relationships", [])) + chunks_count = len(data_section.get("chunks", [])) + logger.debug( + f"[aquery_data] Final result: {entities_count} entities, {relationships_count} relationships, {chunks_count} chunks" + ) + else: + logger.warning("[aquery_data] No data section found in query result") await self._query_done() return final_data @@ -2770,19 +2403,16 @@ class LightRAG: "status": "failure", "message": "Query returned no results", "data": {}, - "metadata": { - "failure_reason": "no_results", - "mode": param.mode, - }, + "metadata": {}, "llm_response": { - "content": PROMPTS["fail_response"], + "content": None, "response_iterator": None, "is_streaming": False, }, } # Extract structured data from query result - raw_data = query_result.raw_data or {} + raw_data = query_result.raw_data if query_result else {} raw_data["llm_response"] = { "content": query_result.content if not query_result.is_streaming @@ -2940,9 +2570,7 @@ class LightRAG: # Return the dictionary containing statuses only for the found document IDs return found_statuses - async def adelete_by_doc_id( - self, doc_id: str, delete_llm_cache: bool = False - ) -> DeletionResult: + async def adelete_by_doc_id(self, doc_id: str) -> DeletionResult: """Delete a document and all its related data, including chunks, graph elements. This method orchestrates a comprehensive deletion process for a given document ID. @@ -2950,88 +2578,23 @@ class LightRAG: data across different storage layers are removed or rebuiled. If entities or relationships are partially affected, they will be rebuilded using LLM cached from remaining documents. - **Concurrency Control Design:** - - This function implements a pipeline-based concurrency control to prevent data corruption: - - 1. **Single Document Deletion** (when WE acquire pipeline): - - Sets job_name to "Single document deletion" (NOT starting with "deleting") - - Prevents other adelete_by_doc_id calls from running concurrently - - Ensures exclusive access to graph operations for this deletion - - 2. **Batch Document Deletion** (when background_delete_documents acquires pipeline): - - Sets job_name to "Deleting {N} Documents" (starts with "deleting") - - Allows multiple adelete_by_doc_id calls to join the deletion queue - - Each call validates the job name to ensure it's part of a deletion operation - - The validation logic `if not job_name.startswith("deleting") or "document" not in job_name` - ensures that: - - adelete_by_doc_id can only run when pipeline is idle OR during batch deletion - - Prevents concurrent single deletions that could cause race conditions - - Rejects operations when pipeline is busy with non-deletion tasks - Args: doc_id (str): The unique identifier of the document to be deleted. - delete_llm_cache (bool): Whether to delete cached LLM extraction results - associated with the document. Defaults to False. Returns: DeletionResult: An object containing the outcome of the deletion process. - - `status` (str): "success", "not_found", "not_allowed", or "failure". + - `status` (str): "success", "not_found", or "failure". - `doc_id` (str): The ID of the document attempted to be deleted. - `message` (str): A summary of the operation's result. - - `status_code` (int): HTTP status code (e.g., 200, 404, 403, 500). + - `status_code` (int): HTTP status code (e.g., 200, 404, 500). - `file_path` (str | None): The file path of the deleted document, if available. """ - # Get pipeline status shared data and lock for validation - pipeline_status = await get_namespace_data( - "pipeline_status", workspace=self.workspace - ) - pipeline_status_lock = get_namespace_lock( - "pipeline_status", workspace=self.workspace - ) - - # Track whether WE acquired the pipeline - we_acquired_pipeline = False - - # Check and acquire pipeline if needed - async with pipeline_status_lock: - if not pipeline_status.get("busy", False): - # Pipeline is idle - WE acquire it for this deletion - we_acquired_pipeline = True - pipeline_status.update( - { - "busy": True, - "job_name": "Single document deletion", - "job_start": datetime.now(timezone.utc).isoformat(), - "docs": 1, - "batchs": 1, - "cur_batch": 0, - "request_pending": False, - "cancellation_requested": False, - "latest_message": f"Starting deletion for document: {doc_id}", - } - ) - # Initialize history messages - pipeline_status["history_messages"][:] = [ - f"Starting deletion for document: {doc_id}" - ] - else: - # Pipeline already busy - verify it's a deletion job - job_name = pipeline_status.get("job_name", "").lower() - if not job_name.startswith("deleting") or "document" not in job_name: - return DeletionResult( - status="not_allowed", - doc_id=doc_id, - message=f"Deletion not allowed: current job '{pipeline_status.get('job_name')}' is not a document deletion job", - status_code=403, - file_path=None, - ) - # Pipeline is busy with deletion - proceed without acquiring - deletion_operations_started = False original_exception = None - doc_llm_cache_ids: list[str] = [] + + # Get pipeline status shared data and lock for status updates + pipeline_status = await get_namespace_data("pipeline_status") + pipeline_status_lock = get_pipeline_status_lock() async with pipeline_status_lock: log_message = f"Starting deletion process for document {doc_id}" @@ -3054,12 +2617,7 @@ class LightRAG: ) # Check document status and log warning for non-completed documents - raw_status = doc_status_data.get("status") - try: - doc_status = DocStatus(raw_status) - except ValueError: - doc_status = raw_status - + doc_status = doc_status_data.get("status") if doc_status != DocStatus.PROCESSED: if doc_status == DocStatus.PENDING: warning_msg = ( @@ -3069,23 +2627,12 @@ class LightRAG: warning_msg = ( f"Deleting {doc_id} {file_path}(previous status: PROCESSING)" ) - elif doc_status == DocStatus.PREPROCESSED: - warning_msg = ( - f"Deleting {doc_id} {file_path}(previous status: PREPROCESSED)" - ) elif doc_status == DocStatus.FAILED: warning_msg = ( f"Deleting {doc_id} {file_path}(previous status: FAILED)" ) else: - status_text = ( - doc_status.value - if isinstance(doc_status, DocStatus) - else str(doc_status) - ) - warning_msg = ( - f"Deleting {doc_id} {file_path}(previous status: {status_text})" - ) + warning_msg = f"Deleting {doc_id} {file_path}(previous status: {doc_status.value})" logger.info(warning_msg) # Update pipeline status for monitoring async with pipeline_status_lock: @@ -3128,64 +2675,11 @@ class LightRAG: # Mark that deletion operations have started deletion_operations_started = True - if delete_llm_cache and chunk_ids: - if not self.llm_response_cache: - logger.info( - "Skipping LLM cache collection for document %s because cache storage is unavailable", - doc_id, - ) - elif not self.text_chunks: - logger.info( - "Skipping LLM cache collection for document %s because text chunk storage is unavailable", - doc_id, - ) - else: - try: - chunk_data_list = await self.text_chunks.get_by_ids( - list(chunk_ids) - ) - seen_cache_ids: set[str] = set() - for chunk_data in chunk_data_list: - if not chunk_data or not isinstance(chunk_data, dict): - continue - cache_ids = chunk_data.get("llm_cache_list", []) - if not isinstance(cache_ids, list): - continue - for cache_id in cache_ids: - if ( - isinstance(cache_id, str) - and cache_id - and cache_id not in seen_cache_ids - ): - doc_llm_cache_ids.append(cache_id) - seen_cache_ids.add(cache_id) - if doc_llm_cache_ids: - logger.info( - "Collected %d LLM cache entries for document %s", - len(doc_llm_cache_ids), - doc_id, - ) - else: - logger.info( - "No LLM cache entries found for document %s", doc_id - ) - except Exception as cache_collect_error: - logger.error( - "Failed to collect LLM cache ids for document %s: %s", - doc_id, - cache_collect_error, - ) - raise Exception( - f"Failed to collect LLM cache ids for document {doc_id}: {cache_collect_error}" - ) from cache_collect_error - # 4. Analyze entities and relationships that will be affected entities_to_delete = set() - entities_to_rebuild = {} # entity_name -> remaining chunk id list + entities_to_rebuild = {} # entity_name -> remaining_chunk_ids relationships_to_delete = set() - relationships_to_rebuild = {} # (src, tgt) -> remaining chunk id list - entity_chunk_updates: dict[str, list[str]] = {} - relation_chunk_updates: dict[tuple[str, str], list[str]] = {} + relationships_to_rebuild = {} # (src, tgt) -> remaining_chunk_ids try: # Get affected entities and relations from full_entities and full_relations storage @@ -3241,44 +2735,14 @@ class LightRAG: # Process entities for node_data in affected_nodes: node_label = node_data.get("entity_id") - if not node_label: - continue + if node_label and "source_id" in node_data: + sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP)) + remaining_sources = sources - chunk_ids - existing_sources: list[str] = [] - if self.entity_chunks: - stored_chunks = await self.entity_chunks.get_by_id(node_label) - if stored_chunks and isinstance(stored_chunks, dict): - existing_sources = [ - chunk_id - for chunk_id in stored_chunks.get("chunk_ids", []) - if chunk_id - ] - - if not existing_sources and node_data.get("source_id"): - existing_sources = [ - chunk_id - for chunk_id in node_data["source_id"].split( - GRAPH_FIELD_SEP - ) - if chunk_id - ] - - if not existing_sources: - # No chunk references means this entity should be deleted - entities_to_delete.add(node_label) - entity_chunk_updates[node_label] = [] - continue - - remaining_sources = subtract_source_ids(existing_sources, chunk_ids) - - if not remaining_sources: - entities_to_delete.add(node_label) - entity_chunk_updates[node_label] = [] - elif remaining_sources != existing_sources: - entities_to_rebuild[node_label] = remaining_sources - entity_chunk_updates[node_label] = remaining_sources - else: - logger.info(f"Untouch entity: {node_label}") + if not remaining_sources: + entities_to_delete.add(node_label) + elif remaining_sources != sources: + entities_to_rebuild[node_label] = remaining_sources async with pipeline_status_lock: log_message = f"Found {len(entities_to_rebuild)} affected entities" @@ -3288,58 +2752,24 @@ class LightRAG: # Process relationships for edge_data in affected_edges: - # source target is not in normalize order in graph db property src = edge_data.get("source") tgt = edge_data.get("target") - if not src or not tgt or "source_id" not in edge_data: - continue + if src and tgt and "source_id" in edge_data: + edge_tuple = tuple(sorted((src, tgt))) + if ( + edge_tuple in relationships_to_delete + or edge_tuple in relationships_to_rebuild + ): + continue - edge_tuple = tuple(sorted((src, tgt))) - if ( - edge_tuple in relationships_to_delete - or edge_tuple in relationships_to_rebuild - ): - continue + sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP)) + remaining_sources = sources - chunk_ids - existing_sources: list[str] = [] - if self.relation_chunks: - storage_key = make_relation_chunk_key(src, tgt) - stored_chunks = await self.relation_chunks.get_by_id( - storage_key - ) - if stored_chunks and isinstance(stored_chunks, dict): - existing_sources = [ - chunk_id - for chunk_id in stored_chunks.get("chunk_ids", []) - if chunk_id - ] - - if not existing_sources: - existing_sources = [ - chunk_id - for chunk_id in edge_data["source_id"].split( - GRAPH_FIELD_SEP - ) - if chunk_id - ] - - if not existing_sources: - # No chunk references means this relationship should be deleted - relationships_to_delete.add(edge_tuple) - relation_chunk_updates[edge_tuple] = [] - continue - - remaining_sources = subtract_source_ids(existing_sources, chunk_ids) - - if not remaining_sources: - relationships_to_delete.add(edge_tuple) - relation_chunk_updates[edge_tuple] = [] - elif remaining_sources != existing_sources: - relationships_to_rebuild[edge_tuple] = remaining_sources - relation_chunk_updates[edge_tuple] = remaining_sources - else: - logger.info(f"Untouch relation: {edge_tuple}") + if not remaining_sources: + relationships_to_delete.add(edge_tuple) + elif remaining_sources != sources: + relationships_to_rebuild[edge_tuple] = remaining_sources async with pipeline_status_lock: log_message = ( @@ -3349,147 +2779,60 @@ class LightRAG: pipeline_status["latest_message"] = log_message pipeline_status["history_messages"].append(log_message) - current_time = int(time.time()) - - if entity_chunk_updates and self.entity_chunks: - entity_upsert_payload = {} - for entity_name, remaining in entity_chunk_updates.items(): - if not remaining: - # Empty entities are deleted alongside graph nodes later - continue - entity_upsert_payload[entity_name] = { - "chunk_ids": remaining, - "count": len(remaining), - "updated_at": current_time, - } - if entity_upsert_payload: - await self.entity_chunks.upsert(entity_upsert_payload) - - if relation_chunk_updates and self.relation_chunks: - relation_upsert_payload = {} - for edge_tuple, remaining in relation_chunk_updates.items(): - if not remaining: - # Empty relations are deleted alongside graph edges later - continue - storage_key = make_relation_chunk_key(*edge_tuple) - relation_upsert_payload[storage_key] = { - "chunk_ids": remaining, - "count": len(remaining), - "updated_at": current_time, - } - - if relation_upsert_payload: - await self.relation_chunks.upsert(relation_upsert_payload) - except Exception as e: logger.error(f"Failed to process graph analysis results: {e}") raise Exception(f"Failed to process graph dependencies: {e}") from e - # Data integrity is ensured by allowing only one process to hold pipeline at a time(no graph db lock is needed anymore) + # Use graph database lock to prevent dirty read + graph_db_lock = get_graph_db_lock(enable_logging=False) + async with graph_db_lock: + # 5. Delete chunks from storage + if chunk_ids: + try: + await self.chunks_vdb.delete(chunk_ids) + await self.text_chunks.delete(chunk_ids) - # 5. Delete chunks from storage - if chunk_ids: - try: - await self.chunks_vdb.delete(chunk_ids) - await self.text_chunks.delete(chunk_ids) + async with pipeline_status_lock: + log_message = f"Successfully deleted {len(chunk_ids)} chunks from storage" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) - async with pipeline_status_lock: - log_message = ( - f"Successfully deleted {len(chunk_ids)} chunks from storage" - ) - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) + except Exception as e: + logger.error(f"Failed to delete chunks: {e}") + raise Exception(f"Failed to delete document chunks: {e}") from e - except Exception as e: - logger.error(f"Failed to delete chunks: {e}") - raise Exception(f"Failed to delete document chunks: {e}") from e - - # 6. Delete relationships that have no remaining sources - if relationships_to_delete: - try: - # Delete from relation vdb - rel_ids_to_delete = [] - for src, tgt in relationships_to_delete: - rel_ids_to_delete.extend( - [ - compute_mdhash_id(src + tgt, prefix="rel-"), - compute_mdhash_id(tgt + src, prefix="rel-"), - ] - ) - await self.relationships_vdb.delete(rel_ids_to_delete) - - # Delete from graph - await self.chunk_entity_relation_graph.remove_edges( - list(relationships_to_delete) - ) - - # Delete from relation_chunks storage - if self.relation_chunks: - relation_storage_keys = [ - make_relation_chunk_key(src, tgt) - for src, tgt in relationships_to_delete + # 6. Delete entities that have no remaining sources + if entities_to_delete: + try: + # Delete from vector database + entity_vdb_ids = [ + compute_mdhash_id(entity, prefix="ent-") + for entity in entities_to_delete ] - await self.relation_chunks.delete(relation_storage_keys) + await self.entities_vdb.delete(entity_vdb_ids) - async with pipeline_status_lock: - log_message = f"Successfully deleted {len(relationships_to_delete)} relations" - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) - - except Exception as e: - logger.error(f"Failed to delete relationships: {e}") - raise Exception(f"Failed to delete relationships: {e}") from e - - # 7. Delete entities that have no remaining sources - if entities_to_delete: - try: - # Batch get all edges for entities to avoid N+1 query problem - nodes_edges_dict = ( - await self.chunk_entity_relation_graph.get_nodes_edges_batch( + # Delete from graph + await self.chunk_entity_relation_graph.remove_nodes( list(entities_to_delete) ) - ) - # Debug: Check and log all edges before deleting nodes - edges_to_delete = set() - edges_still_exist = 0 + async with pipeline_status_lock: + log_message = f"Successfully deleted {len(entities_to_delete)} entities" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) - for entity, edges in nodes_edges_dict.items(): - if edges: - for src, tgt in edges: - # Normalize edge representation (sorted for consistency) - edge_tuple = tuple(sorted((src, tgt))) - edges_to_delete.add(edge_tuple) + except Exception as e: + logger.error(f"Failed to delete entities: {e}") + raise Exception(f"Failed to delete entities: {e}") from e - if ( - src in entities_to_delete - and tgt in entities_to_delete - ): - logger.warning( - f"Edge still exists: {src} <-> {tgt}" - ) - elif src in entities_to_delete: - logger.warning( - f"Edge still exists: {src} --> {tgt}" - ) - else: - logger.warning( - f"Edge still exists: {src} <-- {tgt}" - ) - edges_still_exist += 1 - - if edges_still_exist: - logger.warning( - f"⚠️ {edges_still_exist} entities still has edges before deletion" - ) - - # Clean residual edges from VDB and storage before deleting nodes - if edges_to_delete: - # Delete from relationships_vdb + # 7. Delete relationships that have no remaining sources + if relationships_to_delete: + try: + # Delete from vector database rel_ids_to_delete = [] - for src, tgt in edges_to_delete: + for src, tgt in relationships_to_delete: rel_ids_to_delete.extend( [ compute_mdhash_id(src + tgt, prefix="rel-"), @@ -3498,53 +2841,28 @@ class LightRAG: ) await self.relationships_vdb.delete(rel_ids_to_delete) - # Delete from relation_chunks storage - if self.relation_chunks: - relation_storage_keys = [ - make_relation_chunk_key(src, tgt) - for src, tgt in edges_to_delete - ] - await self.relation_chunks.delete(relation_storage_keys) - - logger.info( - f"Cleaned {len(edges_to_delete)} residual edges from VDB and chunk-tracking storage" + # Delete from graph + await self.chunk_entity_relation_graph.remove_edges( + list(relationships_to_delete) ) - # Delete from graph (edges will be auto-deleted with nodes) - await self.chunk_entity_relation_graph.remove_nodes( - list(entities_to_delete) - ) + async with pipeline_status_lock: + log_message = f"Successfully deleted {len(relationships_to_delete)} relations" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) - # Delete from vector vdb - entity_vdb_ids = [ - compute_mdhash_id(entity, prefix="ent-") - for entity in entities_to_delete - ] - await self.entities_vdb.delete(entity_vdb_ids) + except Exception as e: + logger.error(f"Failed to delete relationships: {e}") + raise Exception(f"Failed to delete relationships: {e}") from e - # Delete from entity_chunks storage - if self.entity_chunks: - await self.entity_chunks.delete(list(entities_to_delete)) - - async with pipeline_status_lock: - log_message = ( - f"Successfully deleted {len(entities_to_delete)} entities" - ) - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) - - except Exception as e: - logger.error(f"Failed to delete entities: {e}") - raise Exception(f"Failed to delete entities: {e}") from e - - # Persist changes to graph database before entity and relationship rebuild - await self._insert_done() + # Persist changes to graph database before releasing graph database lock + await self._insert_done() # 8. Rebuild entities and relationships from remaining chunks if entities_to_rebuild or relationships_to_rebuild: try: - await rebuild_knowledge_from_chunks( + await _rebuild_knowledge_from_chunks( entities_to_rebuild=entities_to_rebuild, relationships_to_rebuild=relationships_to_rebuild, knowledge_graph_inst=self.chunk_entity_relation_graph, @@ -3555,8 +2873,6 @@ class LightRAG: global_config=asdict(self), pipeline_status=pipeline_status, pipeline_status_lock=pipeline_status_lock, - entity_chunks_storage=self.entity_chunks, - relation_chunks_storage=self.relation_chunks, ) except Exception as e: @@ -3581,23 +2897,6 @@ class LightRAG: logger.error(f"Failed to delete document and status: {e}") raise Exception(f"Failed to delete document and status: {e}") from e - if delete_llm_cache and doc_llm_cache_ids and self.llm_response_cache: - try: - await self.llm_response_cache.delete(doc_llm_cache_ids) - cache_log_message = f"Successfully deleted {len(doc_llm_cache_ids)} LLM cache entries for document {doc_id}" - logger.info(cache_log_message) - async with pipeline_status_lock: - pipeline_status["latest_message"] = cache_log_message - pipeline_status["history_messages"].append(cache_log_message) - log_message = cache_log_message - except Exception as cache_delete_error: - log_message = f"Failed to delete LLM cache for document {doc_id}: {cache_delete_error}" - logger.error(log_message) - logger.error(traceback.format_exc()) - async with pipeline_status_lock: - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) - return DeletionResult( status="success", doc_id=doc_id, @@ -3645,18 +2944,6 @@ class LightRAG: f"No deletion operations were started for document {doc_id}, skipping persistence" ) - # Release pipeline only if WE acquired it - if we_acquired_pipeline: - async with pipeline_status_lock: - pipeline_status["busy"] = False - pipeline_status["cancellation_requested"] = False - completion_msg = ( - f"Deletion process completed for document: {doc_id}" - ) - pipeline_status["latest_message"] = completion_msg - pipeline_status["history_messages"].append(completion_msg) - logger.info(completion_msg) - async def adelete_by_entity(self, entity_name: str) -> DeletionResult: """Asynchronously delete an entity and all its relationships. @@ -3774,22 +3061,16 @@ class LightRAG: ) async def aedit_entity( - self, - entity_name: str, - updated_data: dict[str, str], - allow_rename: bool = True, - allow_merge: bool = False, + self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True ) -> dict[str, Any]: """Asynchronously edit entity information. Updates entity information in the knowledge graph and re-embeds the entity in the vector database. - Also synchronizes entity_chunks_storage and relation_chunks_storage to track chunk references. Args: entity_name: Name of the entity to edit updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "entity_type": "new type"} allow_rename: Whether to allow entity renaming, defaults to True - allow_merge: Whether to merge into an existing entity when renaming to an existing name Returns: Dictionary containing updated entity information @@ -3803,21 +3084,14 @@ class LightRAG: entity_name, updated_data, allow_rename, - allow_merge, - self.entity_chunks, - self.relation_chunks, ) def edit_entity( - self, - entity_name: str, - updated_data: dict[str, str], - allow_rename: bool = True, - allow_merge: bool = False, + self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True ) -> dict[str, Any]: loop = always_get_an_event_loop() return loop.run_until_complete( - self.aedit_entity(entity_name, updated_data, allow_rename, allow_merge) + self.aedit_entity(entity_name, updated_data, allow_rename) ) async def aedit_relation( @@ -3826,7 +3100,6 @@ class LightRAG: """Asynchronously edit relation information. Updates relation (edge) information in the knowledge graph and re-embeds the relation in the vector database. - Also synchronizes the relation_chunks_storage to track which chunks reference this relation. Args: source_entity: Name of the source entity @@ -3845,7 +3118,6 @@ class LightRAG: source_entity, target_entity, updated_data, - self.relation_chunks, ) def edit_relation( @@ -3957,8 +3229,6 @@ class LightRAG: target_entity, merge_strategy, target_entity_data, - self.entity_chunks, - self.relation_chunks, ) def merge_entities(