diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 46d31ca2..8a638759 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -3,6 +3,7 @@ from __future__ import annotations import traceback import asyncio import configparser +import inspect import os import time import warnings @@ -12,6 +13,7 @@ from functools import partial from typing import ( Any, AsyncIterator, + Awaitable, Callable, Iterator, cast, @@ -20,8 +22,10 @@ from typing import ( Optional, List, Dict, + Union, ) from lightrag.prompt import PROMPTS +from lightrag.exceptions import PipelineCancelledException from lightrag.constants import ( DEFAULT_MAX_GLEANING, DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE, @@ -60,9 +64,10 @@ from lightrag.kg import ( from lightrag.kg.shared_storage import ( get_namespace_data, - get_pipeline_status_lock, - get_graph_db_lock, get_data_init_lock, + get_default_workspace, + set_default_workspace, + get_namespace_lock, ) from lightrag.base import ( @@ -86,7 +91,7 @@ from lightrag.operate import ( merge_nodes_and_edges, kg_query, naive_query, - _rebuild_knowledge_from_chunks, + rebuild_knowledge_from_chunks, ) from lightrag.constants import GRAPH_FIELD_SEP from lightrag.utils import ( @@ -242,23 +247,28 @@ class LightRAG: int, int, ], - List[Dict[str, Any]], + Union[List[Dict[str, Any]], Awaitable[List[Dict[str, Any]]]], ] = field(default_factory=lambda: chunking_by_token_size) """ Custom chunking function for splitting text into chunks before processing. + The function can be either synchronous or asynchronous. + The function should take the following parameters: - `tokenizer`: A Tokenizer instance to use for tokenization. - `content`: The text to be split into chunks. - `split_by_character`: The character to split the text on. If None, the text is split into chunks of `chunk_token_size` tokens. - `split_by_character_only`: If True, the text is split only on the specified character. - - `chunk_token_size`: The maximum number of tokens per chunk. - `chunk_overlap_token_size`: The number of overlapping tokens between consecutive chunks. + - `chunk_token_size`: The maximum number of tokens per chunk. - The function should return a list of dictionaries, where each dictionary contains the following keys: - - `tokens`: The number of tokens in the chunk. - - `content`: The text content of the chunk. + + The function should return a list of dictionaries (or an awaitable that resolves to a list), + where each dictionary contains the following keys: + - `tokens` (int): The number of tokens in the chunk. + - `content` (str): The text content of the chunk. + - `chunk_order_index` (int): Zero-based index indicating the chunk's order in the document. Defaults to `chunking_by_token_size` if not specified. """ @@ -269,6 +279,9 @@ class LightRAG: embedding_func: EmbeddingFunc | None = field(default=None) """Function for computing text embeddings. Must be set before use.""" + embedding_token_limit: int | None = field(default=None, init=False) + """Token limit for embedding model. Set automatically from embedding_func.max_token_size in __post_init__.""" + embedding_batch_num: int = field(default=int(os.getenv("EMBEDDING_BATCH_NUM", 10))) """Batch size for embedding computations.""" @@ -512,6 +525,16 @@ class LightRAG: logger.debug(f"LightRAG init with param:\n {_print_config}\n") # Init Embedding + # Step 1: Capture max_token_size before applying decorator (decorator strips dataclass attributes) + embedding_max_token_size = None + if self.embedding_func and hasattr(self.embedding_func, "max_token_size"): + embedding_max_token_size = self.embedding_func.max_token_size + logger.debug( + f"Captured embedding max_token_size: {embedding_max_token_size}" + ) + self.embedding_token_limit = embedding_max_token_size + + # Step 2: Apply priority wrapper decorator self.embedding_func = priority_limit_async_func_call( self.embedding_func_max_async, llm_timeout=self.default_embedding_timeout, @@ -638,6 +661,22 @@ class LightRAG: async def initialize_storages(self): """Storage initialization must be called one by one to prevent deadlock""" if self._storages_status == StoragesStatus.CREATED: + # Set the first initialized workspace will set the default workspace + # Allows namespace operation without specifying workspace for backward compatibility + default_workspace = get_default_workspace() + if default_workspace is None: + set_default_workspace(self.workspace) + elif default_workspace != self.workspace: + logger.info( + f"Creating LightRAG instance with workspace='{self.workspace}' " + f"while default workspace is set to '{default_workspace}'" + ) + + # Auto-initialize pipeline_status for this workspace + from lightrag.kg.shared_storage import initialize_pipeline_status + + await initialize_pipeline_status(workspace=self.workspace) + for storage in ( self.full_docs, self.text_chunks, @@ -709,7 +748,7 @@ class LightRAG: async def check_and_migrate_data(self): """Check if data migration is needed and perform migration if necessary""" - async with get_data_init_lock(enable_logging=True): + async with get_data_init_lock(): try: # Check if migration is needed: # 1. chunk_entity_relation_graph has entities and relations (count > 0) @@ -1572,8 +1611,12 @@ class LightRAG: """ # Get pipeline status shared data and lock - pipeline_status = await get_namespace_data("pipeline_status") - pipeline_status_lock = get_pipeline_status_lock() + pipeline_status = await get_namespace_data( + "pipeline_status", workspace=self.workspace + ) + pipeline_status_lock = get_namespace_lock( + "pipeline_status", workspace=self.workspace + ) # Check if another process is already processing the queue async with pipeline_status_lock: @@ -1603,6 +1646,7 @@ class LightRAG: "batchs": 0, # Total number of files to be processed "cur_batch": 0, # Number of files already processed "request_pending": False, # Clear any previous request + "cancellation_requested": False, # Initialize cancellation flag "latest_message": "", } ) @@ -1619,6 +1663,22 @@ class LightRAG: try: # Process documents until no more documents or requests while True: + # Check for cancellation request at the start of main loop + async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + # Clear pending request + pipeline_status["request_pending"] = False + # Celar cancellation flag + pipeline_status["cancellation_requested"] = False + + log_message = "Pipeline cancelled by user" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + + # Exit directly, skipping request_pending check + return + if not to_process_docs: log_message = "All enqueued documents have been processed" logger.info(log_message) @@ -1681,14 +1741,25 @@ class LightRAG: semaphore: asyncio.Semaphore, ) -> None: """Process single document""" + # Initialize variables at the start to prevent UnboundLocalError in error handling + file_path = "unknown_source" + current_file_number = 0 file_extraction_stage_ok = False + processing_start_time = int(time.time()) + first_stage_tasks = [] + entity_relation_task = None + async with semaphore: nonlocal processed_count - current_file_number = 0 # Initialize to prevent UnboundLocalError in error handling first_stage_tasks = [] entity_relation_task = None try: + # Check for cancellation before starting document processing + async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + raise PipelineCancelledException("User cancelled") + # Get file path from status document file_path = getattr( status_doc, "file_path", "unknown_source" @@ -1727,7 +1798,28 @@ class LightRAG: ) content = content_data["content"] - # Generate chunks from document + # Call chunking function, supporting both sync and async implementations + chunking_result = self.chunking_func( + self.tokenizer, + content, + split_by_character, + split_by_character_only, + self.chunk_overlap_token_size, + self.chunk_token_size, + ) + + # If result is awaitable, await to get actual result + if inspect.isawaitable(chunking_result): + chunking_result = await chunking_result + + # Validate return type + if not isinstance(chunking_result, (list, tuple)): + raise TypeError( + f"chunking_func must return a list or tuple of dicts, " + f"got {type(chunking_result)}" + ) + + # Build chunks dictionary chunks: dict[str, Any] = { compute_mdhash_id(dp["content"], prefix="chunk-"): { **dp, @@ -1735,14 +1827,7 @@ class LightRAG: "file_path": file_path, # Add file path to each chunk "llm_cache_list": [], # Initialize empty LLM cache list for each chunk } - for dp in self.chunking_func( - self.tokenizer, - content, - split_by_character, - split_by_character_only, - self.chunk_overlap_token_size, - self.chunk_token_size, - ) + for dp in chunking_result } if not chunks: @@ -1751,6 +1836,11 @@ class LightRAG: # Record processing start time processing_start_time = int(time.time()) + # Check for cancellation before entity extraction + async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + raise PipelineCancelledException("User cancelled") + # Process document in two stages # Stage 1: Process text chunks and docs (parallel execution) doc_status_task = asyncio.create_task( @@ -1801,20 +1891,33 @@ class LightRAG: chunks, pipeline_status, pipeline_status_lock ) ) - await entity_relation_task + chunk_results = await entity_relation_task file_extraction_stage_ok = True except Exception as e: - # Log error and update pipeline status - logger.error(traceback.format_exc()) - error_msg = f"Failed to extract document {current_file_number}/{total_files}: {file_path}" - logger.error(error_msg) - async with pipeline_status_lock: - pipeline_status["latest_message"] = error_msg - pipeline_status["history_messages"].append( - traceback.format_exc() - ) - pipeline_status["history_messages"].append(error_msg) + # Check if this is a user cancellation + if isinstance(e, PipelineCancelledException): + # User cancellation - log brief message only, no traceback + error_msg = f"User cancelled {current_file_number}/{total_files}: {file_path}" + logger.warning(error_msg) + async with pipeline_status_lock: + pipeline_status["latest_message"] = error_msg + pipeline_status["history_messages"].append( + error_msg + ) + else: + # Other exceptions - log with traceback + logger.error(traceback.format_exc()) + error_msg = f"Failed to extract document {current_file_number}/{total_files}: {file_path}" + logger.error(error_msg) + async with pipeline_status_lock: + pipeline_status["latest_message"] = error_msg + pipeline_status["history_messages"].append( + traceback.format_exc() + ) + pipeline_status["history_messages"].append( + error_msg + ) # Cancel tasks that are not yet completed all_tasks = first_stage_tasks + ( @@ -1824,9 +1927,14 @@ class LightRAG: if task and not task.done(): task.cancel() - # Persistent llm cache + # Persistent llm cache with error handling if self.llm_response_cache: - await self.llm_response_cache.index_done_callback() + try: + await self.llm_response_cache.index_done_callback() + except Exception as persist_error: + logger.error( + f"Failed to persist LLM cache: {persist_error}" + ) # Record processing end time for failed case processing_end_time = int(time.time()) @@ -1856,8 +1964,16 @@ class LightRAG: # Concurrency is controlled by keyed lock for individual entities and relationships if file_extraction_stage_ok: try: - # Get chunk_results from entity_relation_task - chunk_results = await entity_relation_task + # Check for cancellation before merge + async with pipeline_status_lock: + if pipeline_status.get( + "cancellation_requested", False + ): + raise PipelineCancelledException( + "User cancelled" + ) + + # Use chunk_results from entity_relation_task await merge_nodes_and_edges( chunk_results=chunk_results, # result collected from entity_relation_task knowledge_graph_inst=self.chunk_entity_relation_graph, @@ -1914,22 +2030,38 @@ class LightRAG: ) except Exception as e: - # Log error and update pipeline status - logger.error(traceback.format_exc()) - error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}" - logger.error(error_msg) - async with pipeline_status_lock: - pipeline_status["latest_message"] = error_msg - pipeline_status["history_messages"].append( - traceback.format_exc() - ) - pipeline_status["history_messages"].append( - error_msg - ) + # Check if this is a user cancellation + if isinstance(e, PipelineCancelledException): + # User cancellation - log brief message only, no traceback + error_msg = f"User cancelled during merge {current_file_number}/{total_files}: {file_path}" + logger.warning(error_msg) + async with pipeline_status_lock: + pipeline_status["latest_message"] = error_msg + pipeline_status["history_messages"].append( + error_msg + ) + else: + # Other exceptions - log with traceback + logger.error(traceback.format_exc()) + error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}" + logger.error(error_msg) + async with pipeline_status_lock: + pipeline_status["latest_message"] = error_msg + pipeline_status["history_messages"].append( + traceback.format_exc() + ) + pipeline_status["history_messages"].append( + error_msg + ) - # Persistent llm cache + # Persistent llm cache with error handling if self.llm_response_cache: - await self.llm_response_cache.index_done_callback() + try: + await self.llm_response_cache.index_done_callback() + except Exception as persist_error: + logger.error( + f"Failed to persist LLM cache: {persist_error}" + ) # Record processing end time for failed case processing_end_time = int(time.time()) @@ -1970,7 +2102,19 @@ class LightRAG: ) # Wait for all document processing to complete - await asyncio.gather(*doc_tasks) + try: + await asyncio.gather(*doc_tasks) + except PipelineCancelledException: + # Cancel all remaining tasks + for task in doc_tasks: + if not task.done(): + task.cancel() + + # Wait for all tasks to complete cancellation + await asyncio.wait(doc_tasks, return_when=asyncio.ALL_COMPLETED) + + # Exit directly (document statuses already updated in process_document) + return # Check if there's a pending request to process more documents (with lock) has_pending_request = False @@ -2001,11 +2145,14 @@ class LightRAG: to_process_docs.update(pending_docs) finally: - log_message = "Enqueued document processing pipeline stoped" + log_message = "Enqueued document processing pipeline stopped" logger.info(log_message) - # Always reset busy status when done or if an exception occurs (with lock) + # Always reset busy status and cancellation flag when done or if an exception occurs (with lock) async with pipeline_status_lock: pipeline_status["busy"] = False + pipeline_status["cancellation_requested"] = ( + False # Always reset cancellation flag + ) pipeline_status["latest_message"] = log_message pipeline_status["history_messages"].append(log_message) @@ -2803,6 +2950,26 @@ class LightRAG: data across different storage layers are removed or rebuiled. If entities or relationships are partially affected, they will be rebuilded using LLM cached from remaining documents. + **Concurrency Control Design:** + + This function implements a pipeline-based concurrency control to prevent data corruption: + + 1. **Single Document Deletion** (when WE acquire pipeline): + - Sets job_name to "Single document deletion" (NOT starting with "deleting") + - Prevents other adelete_by_doc_id calls from running concurrently + - Ensures exclusive access to graph operations for this deletion + + 2. **Batch Document Deletion** (when background_delete_documents acquires pipeline): + - Sets job_name to "Deleting {N} Documents" (starts with "deleting") + - Allows multiple adelete_by_doc_id calls to join the deletion queue + - Each call validates the job name to ensure it's part of a deletion operation + + The validation logic `if not job_name.startswith("deleting") or "document" not in job_name` + ensures that: + - adelete_by_doc_id can only run when pipeline is idle OR during batch deletion + - Prevents concurrent single deletions that could cause race conditions + - Rejects operations when pipeline is busy with non-deletion tasks + Args: doc_id (str): The unique identifier of the document to be deleted. delete_llm_cache (bool): Whether to delete cached LLM extraction results @@ -2810,20 +2977,62 @@ class LightRAG: Returns: DeletionResult: An object containing the outcome of the deletion process. - - `status` (str): "success", "not_found", or "failure". + - `status` (str): "success", "not_found", "not_allowed", or "failure". - `doc_id` (str): The ID of the document attempted to be deleted. - `message` (str): A summary of the operation's result. - - `status_code` (int): HTTP status code (e.g., 200, 404, 500). + - `status_code` (int): HTTP status code (e.g., 200, 404, 403, 500). - `file_path` (str | None): The file path of the deleted document, if available. """ + # Get pipeline status shared data and lock for validation + pipeline_status = await get_namespace_data( + "pipeline_status", workspace=self.workspace + ) + pipeline_status_lock = get_namespace_lock( + "pipeline_status", workspace=self.workspace + ) + + # Track whether WE acquired the pipeline + we_acquired_pipeline = False + + # Check and acquire pipeline if needed + async with pipeline_status_lock: + if not pipeline_status.get("busy", False): + # Pipeline is idle - WE acquire it for this deletion + we_acquired_pipeline = True + pipeline_status.update( + { + "busy": True, + "job_name": "Single document deletion", + "job_start": datetime.now(timezone.utc).isoformat(), + "docs": 1, + "batchs": 1, + "cur_batch": 0, + "request_pending": False, + "cancellation_requested": False, + "latest_message": f"Starting deletion for document: {doc_id}", + } + ) + # Initialize history messages + pipeline_status["history_messages"][:] = [ + f"Starting deletion for document: {doc_id}" + ] + else: + # Pipeline already busy - verify it's a deletion job + job_name = pipeline_status.get("job_name", "").lower() + if not job_name.startswith("deleting") or "document" not in job_name: + return DeletionResult( + status="not_allowed", + doc_id=doc_id, + message=f"Deletion not allowed: current job '{pipeline_status.get('job_name')}' is not a document deletion job", + status_code=403, + file_path=None, + ) + # Pipeline is busy with deletion - proceed without acquiring + deletion_operations_started = False original_exception = None doc_llm_cache_ids: list[str] = [] - # Get pipeline status shared data and lock for status updates - pipeline_status = await get_namespace_data("pipeline_status") - pipeline_status_lock = get_pipeline_status_lock() - async with pipeline_status_lock: log_message = f"Starting deletion process for document {doc_id}" logger.info(log_message) @@ -3055,6 +3264,9 @@ class LightRAG: ] if not existing_sources: + # No chunk references means this entity should be deleted + entities_to_delete.add(node_label) + entity_chunk_updates[node_label] = [] continue remaining_sources = subtract_source_ids(existing_sources, chunk_ids) @@ -3076,6 +3288,7 @@ class LightRAG: # Process relationships for edge_data in affected_edges: + # source target is not in normalize order in graph db property src = edge_data.get("source") tgt = edge_data.get("target") @@ -3112,6 +3325,9 @@ class LightRAG: ] if not existing_sources: + # No chunk references means this relationship should be deleted + relationships_to_delete.add(edge_tuple) + relation_chunk_updates[edge_tuple] = [] continue remaining_sources = subtract_source_ids(existing_sources, chunk_ids) @@ -3137,38 +3353,31 @@ class LightRAG: if entity_chunk_updates and self.entity_chunks: entity_upsert_payload = {} - entity_delete_ids: set[str] = set() for entity_name, remaining in entity_chunk_updates.items(): if not remaining: - entity_delete_ids.add(entity_name) - else: - entity_upsert_payload[entity_name] = { - "chunk_ids": remaining, - "count": len(remaining), - "updated_at": current_time, - } - - if entity_delete_ids: - await self.entity_chunks.delete(list(entity_delete_ids)) + # Empty entities are deleted alongside graph nodes later + continue + entity_upsert_payload[entity_name] = { + "chunk_ids": remaining, + "count": len(remaining), + "updated_at": current_time, + } if entity_upsert_payload: await self.entity_chunks.upsert(entity_upsert_payload) if relation_chunk_updates and self.relation_chunks: relation_upsert_payload = {} - relation_delete_ids: set[str] = set() for edge_tuple, remaining in relation_chunk_updates.items(): - storage_key = make_relation_chunk_key(*edge_tuple) if not remaining: - relation_delete_ids.add(storage_key) - else: - relation_upsert_payload[storage_key] = { - "chunk_ids": remaining, - "count": len(remaining), - "updated_at": current_time, - } + # Empty relations are deleted alongside graph edges later + continue + storage_key = make_relation_chunk_key(*edge_tuple) + relation_upsert_payload[storage_key] = { + "chunk_ids": remaining, + "count": len(remaining), + "updated_at": current_time, + } - if relation_delete_ids: - await self.relation_chunks.delete(list(relation_delete_ids)) if relation_upsert_payload: await self.relation_chunks.upsert(relation_upsert_payload) @@ -3176,56 +3385,111 @@ class LightRAG: logger.error(f"Failed to process graph analysis results: {e}") raise Exception(f"Failed to process graph dependencies: {e}") from e - # Use graph database lock to prevent dirty read - graph_db_lock = get_graph_db_lock(enable_logging=False) - async with graph_db_lock: - # 5. Delete chunks from storage - if chunk_ids: - try: - await self.chunks_vdb.delete(chunk_ids) - await self.text_chunks.delete(chunk_ids) + # Data integrity is ensured by allowing only one process to hold pipeline at a time(no graph db lock is needed anymore) - async with pipeline_status_lock: - log_message = f"Successfully deleted {len(chunk_ids)} chunks from storage" - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) + # 5. Delete chunks from storage + if chunk_ids: + try: + await self.chunks_vdb.delete(chunk_ids) + await self.text_chunks.delete(chunk_ids) - except Exception as e: - logger.error(f"Failed to delete chunks: {e}") - raise Exception(f"Failed to delete document chunks: {e}") from e + async with pipeline_status_lock: + log_message = ( + f"Successfully deleted {len(chunk_ids)} chunks from storage" + ) + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) - # 6. Delete entities that have no remaining sources - if entities_to_delete: - try: - # Delete from vector database - entity_vdb_ids = [ - compute_mdhash_id(entity, prefix="ent-") - for entity in entities_to_delete + except Exception as e: + logger.error(f"Failed to delete chunks: {e}") + raise Exception(f"Failed to delete document chunks: {e}") from e + + # 6. Delete relationships that have no remaining sources + if relationships_to_delete: + try: + # Delete from relation vdb + rel_ids_to_delete = [] + for src, tgt in relationships_to_delete: + rel_ids_to_delete.extend( + [ + compute_mdhash_id(src + tgt, prefix="rel-"), + compute_mdhash_id(tgt + src, prefix="rel-"), + ] + ) + await self.relationships_vdb.delete(rel_ids_to_delete) + + # Delete from graph + await self.chunk_entity_relation_graph.remove_edges( + list(relationships_to_delete) + ) + + # Delete from relation_chunks storage + if self.relation_chunks: + relation_storage_keys = [ + make_relation_chunk_key(src, tgt) + for src, tgt in relationships_to_delete ] - await self.entities_vdb.delete(entity_vdb_ids) + await self.relation_chunks.delete(relation_storage_keys) - # Delete from graph - await self.chunk_entity_relation_graph.remove_nodes( + async with pipeline_status_lock: + log_message = f"Successfully deleted {len(relationships_to_delete)} relations" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + + except Exception as e: + logger.error(f"Failed to delete relationships: {e}") + raise Exception(f"Failed to delete relationships: {e}") from e + + # 7. Delete entities that have no remaining sources + if entities_to_delete: + try: + # Batch get all edges for entities to avoid N+1 query problem + nodes_edges_dict = ( + await self.chunk_entity_relation_graph.get_nodes_edges_batch( list(entities_to_delete) ) + ) - async with pipeline_status_lock: - log_message = f"Successfully deleted {len(entities_to_delete)} entities" - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) + # Debug: Check and log all edges before deleting nodes + edges_to_delete = set() + edges_still_exist = 0 - except Exception as e: - logger.error(f"Failed to delete entities: {e}") - raise Exception(f"Failed to delete entities: {e}") from e + for entity, edges in nodes_edges_dict.items(): + if edges: + for src, tgt in edges: + # Normalize edge representation (sorted for consistency) + edge_tuple = tuple(sorted((src, tgt))) + edges_to_delete.add(edge_tuple) - # 7. Delete relationships that have no remaining sources - if relationships_to_delete: - try: - # Delete from vector database + if ( + src in entities_to_delete + and tgt in entities_to_delete + ): + logger.warning( + f"Edge still exists: {src} <-> {tgt}" + ) + elif src in entities_to_delete: + logger.warning( + f"Edge still exists: {src} --> {tgt}" + ) + else: + logger.warning( + f"Edge still exists: {src} <-- {tgt}" + ) + edges_still_exist += 1 + + if edges_still_exist: + logger.warning( + f"⚠️ {edges_still_exist} entities still has edges before deletion" + ) + + # Clean residual edges from VDB and storage before deleting nodes + if edges_to_delete: + # Delete from relationships_vdb rel_ids_to_delete = [] - for src, tgt in relationships_to_delete: + for src, tgt in edges_to_delete: rel_ids_to_delete.extend( [ compute_mdhash_id(src + tgt, prefix="rel-"), @@ -3234,28 +3498,53 @@ class LightRAG: ) await self.relationships_vdb.delete(rel_ids_to_delete) - # Delete from graph - await self.chunk_entity_relation_graph.remove_edges( - list(relationships_to_delete) + # Delete from relation_chunks storage + if self.relation_chunks: + relation_storage_keys = [ + make_relation_chunk_key(src, tgt) + for src, tgt in edges_to_delete + ] + await self.relation_chunks.delete(relation_storage_keys) + + logger.info( + f"Cleaned {len(edges_to_delete)} residual edges from VDB and chunk-tracking storage" ) - async with pipeline_status_lock: - log_message = f"Successfully deleted {len(relationships_to_delete)} relations" - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) + # Delete from graph (edges will be auto-deleted with nodes) + await self.chunk_entity_relation_graph.remove_nodes( + list(entities_to_delete) + ) - except Exception as e: - logger.error(f"Failed to delete relationships: {e}") - raise Exception(f"Failed to delete relationships: {e}") from e + # Delete from vector vdb + entity_vdb_ids = [ + compute_mdhash_id(entity, prefix="ent-") + for entity in entities_to_delete + ] + await self.entities_vdb.delete(entity_vdb_ids) - # Persist changes to graph database before releasing graph database lock - await self._insert_done() + # Delete from entity_chunks storage + if self.entity_chunks: + await self.entity_chunks.delete(list(entities_to_delete)) + + async with pipeline_status_lock: + log_message = ( + f"Successfully deleted {len(entities_to_delete)} entities" + ) + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + + except Exception as e: + logger.error(f"Failed to delete entities: {e}") + raise Exception(f"Failed to delete entities: {e}") from e + + # Persist changes to graph database before entity and relationship rebuild + await self._insert_done() # 8. Rebuild entities and relationships from remaining chunks if entities_to_rebuild or relationships_to_rebuild: try: - await _rebuild_knowledge_from_chunks( + await rebuild_knowledge_from_chunks( entities_to_rebuild=entities_to_rebuild, relationships_to_rebuild=relationships_to_rebuild, knowledge_graph_inst=self.chunk_entity_relation_graph, @@ -3302,14 +3591,12 @@ class LightRAG: pipeline_status["history_messages"].append(cache_log_message) log_message = cache_log_message except Exception as cache_delete_error: - logger.error( - "Failed to delete LLM cache for document %s: %s", - doc_id, - cache_delete_error, - ) - raise Exception( - f"Failed to delete LLM cache for document {doc_id}: {cache_delete_error}" - ) from cache_delete_error + log_message = f"Failed to delete LLM cache for document {doc_id}: {cache_delete_error}" + logger.error(log_message) + logger.error(traceback.format_exc()) + async with pipeline_status_lock: + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) return DeletionResult( status="success", @@ -3358,6 +3645,18 @@ class LightRAG: f"No deletion operations were started for document {doc_id}, skipping persistence" ) + # Release pipeline only if WE acquired it + if we_acquired_pipeline: + async with pipeline_status_lock: + pipeline_status["busy"] = False + pipeline_status["cancellation_requested"] = False + completion_msg = ( + f"Deletion process completed for document: {doc_id}" + ) + pipeline_status["latest_message"] = completion_msg + pipeline_status["history_messages"].append(completion_msg) + logger.info(completion_msg) + async def adelete_by_entity(self, entity_name: str) -> DeletionResult: """Asynchronously delete an entity and all its relationships. @@ -3475,16 +3774,22 @@ class LightRAG: ) async def aedit_entity( - self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True + self, + entity_name: str, + updated_data: dict[str, str], + allow_rename: bool = True, + allow_merge: bool = False, ) -> dict[str, Any]: """Asynchronously edit entity information. Updates entity information in the knowledge graph and re-embeds the entity in the vector database. + Also synchronizes entity_chunks_storage and relation_chunks_storage to track chunk references. Args: entity_name: Name of the entity to edit updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "entity_type": "new type"} allow_rename: Whether to allow entity renaming, defaults to True + allow_merge: Whether to merge into an existing entity when renaming to an existing name Returns: Dictionary containing updated entity information @@ -3498,14 +3803,21 @@ class LightRAG: entity_name, updated_data, allow_rename, + allow_merge, + self.entity_chunks, + self.relation_chunks, ) def edit_entity( - self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True + self, + entity_name: str, + updated_data: dict[str, str], + allow_rename: bool = True, + allow_merge: bool = False, ) -> dict[str, Any]: loop = always_get_an_event_loop() return loop.run_until_complete( - self.aedit_entity(entity_name, updated_data, allow_rename) + self.aedit_entity(entity_name, updated_data, allow_rename, allow_merge) ) async def aedit_relation( @@ -3514,6 +3826,7 @@ class LightRAG: """Asynchronously edit relation information. Updates relation (edge) information in the knowledge graph and re-embeds the relation in the vector database. + Also synchronizes the relation_chunks_storage to track which chunks reference this relation. Args: source_entity: Name of the source entity @@ -3532,6 +3845,7 @@ class LightRAG: source_entity, target_entity, updated_data, + self.relation_chunks, ) def edit_relation( @@ -3643,6 +3957,8 @@ class LightRAG: target_entity, merge_strategy, target_entity_data, + self.entity_chunks, + self.relation_chunks, ) def merge_entities( diff --git a/lightrag/operate.py b/lightrag/operate.py index cd8d8a64..c6724974 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -1,5 +1,6 @@ from __future__ import annotations from functools import partial +from pathlib import Path import asyncio import json @@ -7,7 +8,11 @@ import json_repair from typing import Any, AsyncIterator, overload, Literal from collections import Counter, defaultdict -from .utils import ( +from lightrag.exceptions import ( + PipelineCancelledException, + ChunkTokenLimitExceededError, +) +from lightrag.utils import ( logger, compute_mdhash_id, Tokenizer, @@ -26,14 +31,16 @@ from .utils import ( pick_by_weighted_polling, pick_by_vector_similarity, process_chunks_unified, - build_file_path, safe_vdb_operation_with_exception, create_prefixed_exception, fix_tuple_delimiter_corruption, convert_to_user_format, generate_reference_list_from_chunks, + apply_source_ids_limit, + merge_source_ids, + make_relation_chunk_key, ) -from .base import ( +from lightrag.base import ( BaseGraphStorage, BaseKVStorage, BaseVectorStorage, @@ -42,8 +49,8 @@ from .base import ( QueryResult, QueryContextResult, ) -from .prompt import PROMPTS -from .constants import ( +from lightrag.prompt import PROMPTS +from lightrag.constants import ( GRAPH_FIELD_SEP, DEFAULT_MAX_ENTITY_TOKENS, DEFAULT_MAX_RELATION_TOKENS, @@ -52,15 +59,41 @@ from .constants import ( DEFAULT_KG_CHUNK_PICK_METHOD, DEFAULT_ENTITY_TYPES, DEFAULT_SUMMARY_LANGUAGE, + SOURCE_IDS_LIMIT_METHOD_KEEP, + SOURCE_IDS_LIMIT_METHOD_FIFO, + DEFAULT_FILE_PATH_MORE_PLACEHOLDER, + DEFAULT_MAX_FILE_PATHS, + DEFAULT_ENTITY_NAME_MAX_LENGTH, ) -from .kg.shared_storage import get_storage_keyed_lock +from lightrag.kg.shared_storage import get_storage_keyed_lock import time from dotenv import load_dotenv # use the .env that is inside the current folder # allows to use different .env file for each lightrag instance # the OS environment variables take precedence over the .env file -load_dotenv(dotenv_path=".env", override=False) +load_dotenv(dotenv_path=Path(__file__).resolve().parent / ".env", override=False) + + +def _truncate_entity_identifier( + identifier: str, limit: int, chunk_key: str, identifier_role: str +) -> str: + """Truncate entity identifiers that exceed the configured length limit.""" + + if len(identifier) <= limit: + return identifier + + display_value = identifier[:limit] + preview = identifier[:20] # Show first 20 characters as preview + logger.warning( + "%s: %s len %d > %d chars (Name: '%s...')", + chunk_key, + identifier_role, + len(identifier), + limit, + preview, + ) + return display_value def chunking_by_token_size( @@ -68,8 +101,8 @@ def chunking_by_token_size( content: str, split_by_character: str | None = None, split_by_character_only: bool = False, - overlap_token_size: int = 128, - max_token_size: int = 1024, + chunk_overlap_token_size: int = 100, + chunk_token_size: int = 1200, ) -> list[dict[str, Any]]: tokens = tokenizer.encode(content) results: list[dict[str, Any]] = [] @@ -79,19 +112,30 @@ def chunking_by_token_size( if split_by_character_only: for chunk in raw_chunks: _tokens = tokenizer.encode(chunk) + if len(_tokens) > chunk_token_size: + logger.warning( + "Chunk split_by_character exceeds token limit: len=%d limit=%d", + len(_tokens), + chunk_token_size, + ) + raise ChunkTokenLimitExceededError( + chunk_tokens=len(_tokens), + chunk_token_limit=chunk_token_size, + chunk_preview=chunk[:120], + ) new_chunks.append((len(_tokens), chunk)) else: for chunk in raw_chunks: _tokens = tokenizer.encode(chunk) - if len(_tokens) > max_token_size: + if len(_tokens) > chunk_token_size: for start in range( - 0, len(_tokens), max_token_size - overlap_token_size + 0, len(_tokens), chunk_token_size - chunk_overlap_token_size ): chunk_content = tokenizer.decode( - _tokens[start : start + max_token_size] + _tokens[start : start + chunk_token_size] ) new_chunks.append( - (min(max_token_size, len(_tokens) - start), chunk_content) + (min(chunk_token_size, len(_tokens) - start), chunk_content) ) else: new_chunks.append((len(_tokens), chunk)) @@ -105,12 +149,12 @@ def chunking_by_token_size( ) else: for index, start in enumerate( - range(0, len(tokens), max_token_size - overlap_token_size) + range(0, len(tokens), chunk_token_size - chunk_overlap_token_size) ): - chunk_content = tokenizer.decode(tokens[start : start + max_token_size]) + chunk_content = tokenizer.decode(tokens[start : start + chunk_token_size]) results.append( { - "tokens": min(max_token_size, len(tokens) - start), + "tokens": min(chunk_token_size, len(tokens) - start), "content": chunk_content.strip(), "chunk_order_index": index, } @@ -315,6 +359,20 @@ async def _summarize_descriptions( llm_response_cache=llm_response_cache, cache_type="summary", ) + + # Check summary token length against embedding limit + embedding_token_limit = global_config.get("embedding_token_limit") + if embedding_token_limit is not None and summary: + tokenizer = global_config["tokenizer"] + summary_token_count = len(tokenizer.encode(summary)) + threshold = int(embedding_token_limit * 0.9) + + if summary_token_count > threshold: + logger.warning( + f"Summary tokens ({summary_token_count}) exceeds 90% of embedding limit " + f"({embedding_token_limit}) for {description_type}: {description_name}" + ) + return summary @@ -339,8 +397,8 @@ async def _handle_single_entity_extraction( # Validate entity name after all cleaning steps if not entity_name or not entity_name.strip(): - logger.warning( - f"Entity extraction error: entity name became empty after cleaning. Original: '{record_attributes[1]}'" + logger.info( + f"Empty entity name found after sanitization. Original: '{record_attributes[1]}'" ) return None @@ -401,7 +459,7 @@ async def _handle_single_relationship_extraction( ): # treat "relationship" and "relation" interchangeable if len(record_attributes) > 1 and "relation" in record_attributes[0]: logger.warning( - f"{chunk_key}: LLM output format error; found {len(record_attributes)}/5 fields on REALTION `{record_attributes[1]}`~`{record_attributes[2] if len(record_attributes) >2 else 'N/A'}`" + f"{chunk_key}: LLM output format error; found {len(record_attributes)}/5 fields on REALTION `{record_attributes[1]}`~`{record_attributes[2] if len(record_attributes) > 2 else 'N/A'}`" ) logger.debug(record_attributes) return None @@ -416,14 +474,14 @@ async def _handle_single_relationship_extraction( # Validate entity names after all cleaning steps if not source: - logger.warning( - f"Relationship extraction error: source entity became empty after cleaning. Original: '{record_attributes[1]}'" + logger.info( + f"Empty source entity found after sanitization. Original: '{record_attributes[1]}'" ) return None if not target: - logger.warning( - f"Relationship extraction error: target entity became empty after cleaning. Original: '{record_attributes[2]}'" + logger.info( + f"Empty target entity found after sanitization. Original: '{record_attributes[2]}'" ) return None @@ -472,9 +530,9 @@ async def _handle_single_relationship_extraction( return None -async def _rebuild_knowledge_from_chunks( - entities_to_rebuild: dict[str, set[str]], - relationships_to_rebuild: dict[tuple[str, str], set[str]], +async def rebuild_knowledge_from_chunks( + entities_to_rebuild: dict[str, list[str]], + relationships_to_rebuild: dict[tuple[str, str], list[str]], knowledge_graph_inst: BaseGraphStorage, entities_vdb: BaseVectorStorage, relationships_vdb: BaseVectorStorage, @@ -483,6 +541,8 @@ async def _rebuild_knowledge_from_chunks( global_config: dict[str, str], pipeline_status: dict | None = None, pipeline_status_lock=None, + entity_chunks_storage: BaseKVStorage | None = None, + relation_chunks_storage: BaseKVStorage | None = None, ) -> None: """Rebuild entity and relationship descriptions from cached extraction results with parallel processing @@ -491,8 +551,8 @@ async def _rebuild_knowledge_from_chunks( controlled by llm_model_max_async and using get_storage_keyed_lock for data consistency. Args: - entities_to_rebuild: Dict mapping entity_name -> set of remaining chunk_ids - relationships_to_rebuild: Dict mapping (src, tgt) -> set of remaining chunk_ids + entities_to_rebuild: Dict mapping entity_name -> list of remaining chunk_ids + relationships_to_rebuild: Dict mapping (src, tgt) -> list of remaining chunk_ids knowledge_graph_inst: Knowledge graph storage entities_vdb: Entity vector database relationships_vdb: Relationship vector database @@ -501,6 +561,8 @@ async def _rebuild_knowledge_from_chunks( global_config: Global configuration containing llm_model_max_async pipeline_status: Pipeline status dictionary pipeline_status_lock: Lock for pipeline status + entity_chunks_storage: KV storage maintaining full chunk IDs per entity + relation_chunks_storage: KV storage maintaining full chunk IDs per relation """ if not entities_to_rebuild and not relationships_to_rebuild: return @@ -640,16 +702,9 @@ async def _rebuild_knowledge_from_chunks( chunk_entities=chunk_entities, llm_response_cache=llm_response_cache, global_config=global_config, + entity_chunks_storage=entity_chunks_storage, ) rebuilt_entities_count += 1 - status_message = ( - f"Rebuilt `{entity_name}` from {len(chunk_ids)} chunks" - ) - logger.info(status_message) - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) except Exception as e: failed_entities_count += 1 status_message = f"Failed to rebuild `{entity_name}`: {e}" @@ -675,25 +730,22 @@ async def _rebuild_knowledge_from_chunks( await _rebuild_single_relationship( knowledge_graph_inst=knowledge_graph_inst, relationships_vdb=relationships_vdb, + entities_vdb=entities_vdb, src=src, tgt=tgt, chunk_ids=chunk_ids, chunk_relationships=chunk_relationships, llm_response_cache=llm_response_cache, global_config=global_config, + relation_chunks_storage=relation_chunks_storage, + entity_chunks_storage=entity_chunks_storage, + pipeline_status=pipeline_status, + pipeline_status_lock=pipeline_status_lock, ) rebuilt_relationships_count += 1 - status_message = ( - f"Rebuilt `{src} - {tgt}` from {len(chunk_ids)} chunks" - ) - logger.info(status_message) - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) except Exception as e: failed_relationships_count += 1 - status_message = f"Failed to rebuild `{src} - {tgt}`: {e}" + status_message = f"Failed to rebuild `{src}`~`{tgt}`: {e}" logger.info(status_message) # Per requirement, change to info if pipeline_status is not None and pipeline_status_lock is not None: async with pipeline_status_lock: @@ -946,7 +998,14 @@ async def _process_extraction_result( record_attributes, chunk_key, timestamp, file_path ) if entity_data is not None: - maybe_nodes[entity_data["entity_name"]].append(entity_data) + truncated_name = _truncate_entity_identifier( + entity_data["entity_name"], + DEFAULT_ENTITY_NAME_MAX_LENGTH, + chunk_key, + "Entity name", + ) + entity_data["entity_name"] = truncated_name + maybe_nodes[truncated_name].append(entity_data) continue # Try to parse as relationship @@ -954,9 +1013,21 @@ async def _process_extraction_result( record_attributes, chunk_key, timestamp, file_path ) if relationship_data is not None: - maybe_edges[ - (relationship_data["src_id"], relationship_data["tgt_id"]) - ].append(relationship_data) + truncated_source = _truncate_entity_identifier( + relationship_data["src_id"], + DEFAULT_ENTITY_NAME_MAX_LENGTH, + chunk_key, + "Relation entity", + ) + truncated_target = _truncate_entity_identifier( + relationship_data["tgt_id"], + DEFAULT_ENTITY_NAME_MAX_LENGTH, + chunk_key, + "Relation entity", + ) + relationship_data["src_id"] = truncated_source + relationship_data["tgt_id"] = truncated_target + maybe_edges[(truncated_source, truncated_target)].append(relationship_data) return dict(maybe_nodes), dict(maybe_edges) @@ -1001,10 +1072,13 @@ async def _rebuild_single_entity( knowledge_graph_inst: BaseGraphStorage, entities_vdb: BaseVectorStorage, entity_name: str, - chunk_ids: set[str], + chunk_ids: list[str], chunk_entities: dict, llm_response_cache: BaseKVStorage, global_config: dict[str, str], + entity_chunks_storage: BaseKVStorage | None = None, + pipeline_status: dict | None = None, + pipeline_status_lock=None, ) -> None: """Rebuild a single entity from cached extraction results""" @@ -1015,7 +1089,11 @@ async def _rebuild_single_entity( # Helper function to update entity in both graph and vector storage async def _update_entity_storage( - final_description: str, entity_type: str, file_paths: set[str] + final_description: str, + entity_type: str, + file_paths: list[str], + source_chunk_ids: list[str], + truncation_info: str = "", ): try: # Update entity in graph storage (critical path) @@ -1023,10 +1101,12 @@ async def _rebuild_single_entity( **current_entity, "description": final_description, "entity_type": entity_type, - "source_id": GRAPH_FIELD_SEP.join(chunk_ids), + "source_id": GRAPH_FIELD_SEP.join(source_chunk_ids), "file_path": GRAPH_FIELD_SEP.join(file_paths) if file_paths else current_entity.get("file_path", "unknown_source"), + "created_at": int(time.time()), + "truncate": truncation_info, } await knowledge_graph_inst.upsert_node(entity_name, updated_entity_data) @@ -1059,9 +1139,33 @@ async def _rebuild_single_entity( logger.error(error_msg) raise # Re-raise exception - # Collect all entity data from relevant chunks + # normalized_chunk_ids = merge_source_ids([], chunk_ids) + normalized_chunk_ids = chunk_ids + + if entity_chunks_storage is not None and normalized_chunk_ids: + await entity_chunks_storage.upsert( + { + entity_name: { + "chunk_ids": normalized_chunk_ids, + "count": len(normalized_chunk_ids), + } + } + ) + + limit_method = ( + global_config.get("source_ids_limit_method") or SOURCE_IDS_LIMIT_METHOD_KEEP + ) + + limited_chunk_ids = apply_source_ids_limit( + normalized_chunk_ids, + global_config["max_source_ids_per_entity"], + limit_method, + identifier=f"`{entity_name}`", + ) + + # Collect all entity data from relevant (limited) chunks all_entity_data = [] - for chunk_id in chunk_ids: + for chunk_id in limited_chunk_ids: if chunk_id in chunk_entities and entity_name in chunk_entities[chunk_id]: all_entity_data.extend(chunk_entities[chunk_id][entity_name]) @@ -1108,13 +1212,19 @@ async def _rebuild_single_entity( final_description = current_entity.get("description", "") entity_type = current_entity.get("entity_type", "UNKNOWN") - await _update_entity_storage(final_description, entity_type, file_paths) + await _update_entity_storage( + final_description, + entity_type, + file_paths, + limited_chunk_ids, + ) return # Process cached entity data descriptions = [] entity_types = [] - file_paths = set() + file_paths_list = [] + seen_paths = set() for entity_data in all_entity_data: if entity_data.get("description"): @@ -1122,7 +1232,33 @@ async def _rebuild_single_entity( if entity_data.get("entity_type"): entity_types.append(entity_data["entity_type"]) if entity_data.get("file_path"): - file_paths.add(entity_data["file_path"]) + file_path = entity_data["file_path"] + if file_path and file_path not in seen_paths: + file_paths_list.append(file_path) + seen_paths.add(file_path) + + # Apply MAX_FILE_PATHS limit + max_file_paths = global_config.get("max_file_paths") + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER + ) + limit_method = global_config.get("source_ids_limit_method") + + original_count = len(file_paths_list) + if original_count > max_file_paths: + if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: + # FIFO: keep tail (newest), discard head + file_paths_list = file_paths_list[-max_file_paths:] + else: + # KEEP: keep head (earliest), discard tail + file_paths_list = file_paths_list[:max_file_paths] + + file_paths_list.append( + f"...{file_path_placeholder}...({limit_method} {max_file_paths}/{original_count})" + ) + logger.info( + f"Limited `{entity_name}`: file_path {original_count} -> {max_file_paths} ({limit_method})" + ) # Remove duplicates while preserving order description_list = list(dict.fromkeys(descriptions)) @@ -1148,18 +1284,47 @@ async def _rebuild_single_entity( else: final_description = current_entity.get("description", "") - await _update_entity_storage(final_description, entity_type, file_paths) + if len(limited_chunk_ids) < len(normalized_chunk_ids): + truncation_info = ( + f"{limit_method} {len(limited_chunk_ids)}/{len(normalized_chunk_ids)}" + ) + else: + truncation_info = "" + + await _update_entity_storage( + final_description, + entity_type, + file_paths_list, + limited_chunk_ids, + truncation_info, + ) + + # Log rebuild completion with truncation info + status_message = f"Rebuild `{entity_name}` from {len(chunk_ids)} chunks" + if truncation_info: + status_message += f" ({truncation_info})" + logger.info(status_message) + # Update pipeline status + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) async def _rebuild_single_relationship( knowledge_graph_inst: BaseGraphStorage, relationships_vdb: BaseVectorStorage, + entities_vdb: BaseVectorStorage, src: str, tgt: str, - chunk_ids: set[str], + chunk_ids: list[str], chunk_relationships: dict, llm_response_cache: BaseKVStorage, global_config: dict[str, str], + relation_chunks_storage: BaseKVStorage | None = None, + entity_chunks_storage: BaseKVStorage | None = None, + pipeline_status: dict | None = None, + pipeline_status_lock=None, ) -> None: """Rebuild a single relationship from cached extraction results @@ -1172,9 +1337,33 @@ async def _rebuild_single_relationship( if not current_relationship: return + # normalized_chunk_ids = merge_source_ids([], chunk_ids) + normalized_chunk_ids = chunk_ids + + if relation_chunks_storage is not None and normalized_chunk_ids: + storage_key = make_relation_chunk_key(src, tgt) + await relation_chunks_storage.upsert( + { + storage_key: { + "chunk_ids": normalized_chunk_ids, + "count": len(normalized_chunk_ids), + } + } + ) + + limit_method = ( + global_config.get("source_ids_limit_method") or SOURCE_IDS_LIMIT_METHOD_KEEP + ) + limited_chunk_ids = apply_source_ids_limit( + normalized_chunk_ids, + global_config["max_source_ids_per_relation"], + limit_method, + identifier=f"`{src}`~`{tgt}`", + ) + # Collect all relationship data from relevant chunks all_relationship_data = [] - for chunk_id in chunk_ids: + for chunk_id in limited_chunk_ids: if chunk_id in chunk_relationships: # Check both (src, tgt) and (tgt, src) since relationships can be bidirectional for edge_key in [(src, tgt), (tgt, src)]: @@ -1191,7 +1380,8 @@ async def _rebuild_single_relationship( descriptions = [] keywords = [] weights = [] - file_paths = set() + file_paths_list = [] + seen_paths = set() for rel_data in all_relationship_data: if rel_data.get("description"): @@ -1201,7 +1391,33 @@ async def _rebuild_single_relationship( if rel_data.get("weight"): weights.append(rel_data["weight"]) if rel_data.get("file_path"): - file_paths.add(rel_data["file_path"]) + file_path = rel_data["file_path"] + if file_path and file_path not in seen_paths: + file_paths_list.append(file_path) + seen_paths.add(file_path) + + # Apply count limit + max_file_paths = global_config.get("max_file_paths") + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER + ) + limit_method = global_config.get("source_ids_limit_method") + + original_count = len(file_paths_list) + if original_count > max_file_paths: + if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: + # FIFO: keep tail (newest), discard head + file_paths_list = file_paths_list[-max_file_paths:] + else: + # KEEP: keep head (earliest), discard tail + file_paths_list = file_paths_list[:max_file_paths] + + file_paths_list.append( + f"...{file_path_placeholder}...({limit_method} {max_file_paths}/{original_count})" + ) + logger.info( + f"Limited `{src}`~`{tgt}`: file_path {original_count} -> {max_file_paths} ({limit_method})" + ) # Remove duplicates while preserving order description_list = list(dict.fromkeys(descriptions)) @@ -1229,6 +1445,13 @@ async def _rebuild_single_relationship( # fallback to keep current(unchanged) final_description = current_relationship.get("description", "") + if len(limited_chunk_ids) < len(normalized_chunk_ids): + truncation_info = ( + f"{limit_method} {len(limited_chunk_ids)}/{len(normalized_chunk_ids)}" + ) + else: + truncation_info = "" + # Update relationship in graph storage updated_relationship_data = { **current_relationship, @@ -1237,14 +1460,75 @@ async def _rebuild_single_relationship( else current_relationship.get("description", ""), "keywords": combined_keywords, "weight": weight, - "source_id": GRAPH_FIELD_SEP.join(chunk_ids), - "file_path": GRAPH_FIELD_SEP.join([fp for fp in file_paths if fp]) - if file_paths + "source_id": GRAPH_FIELD_SEP.join(limited_chunk_ids), + "file_path": GRAPH_FIELD_SEP.join([fp for fp in file_paths_list if fp]) + if file_paths_list else current_relationship.get("file_path", "unknown_source"), + "truncate": truncation_info, } + + # Ensure both endpoint nodes exist before writing the edge back + # (certain storage backends require pre-existing nodes). + node_description = ( + updated_relationship_data["description"] + if updated_relationship_data.get("description") + else current_relationship.get("description", "") + ) + node_source_id = updated_relationship_data.get("source_id", "") + node_file_path = updated_relationship_data.get("file_path", "unknown_source") + + for node_id in {src, tgt}: + if not (await knowledge_graph_inst.has_node(node_id)): + node_created_at = int(time.time()) + node_data = { + "entity_id": node_id, + "source_id": node_source_id, + "description": node_description, + "entity_type": "UNKNOWN", + "file_path": node_file_path, + "created_at": node_created_at, + "truncate": "", + } + await knowledge_graph_inst.upsert_node(node_id, node_data=node_data) + + # Update entity_chunks_storage for the newly created entity + if entity_chunks_storage is not None and limited_chunk_ids: + await entity_chunks_storage.upsert( + { + node_id: { + "chunk_ids": limited_chunk_ids, + "count": len(limited_chunk_ids), + } + } + ) + + # Update entity_vdb for the newly created entity + if entities_vdb is not None: + entity_vdb_id = compute_mdhash_id(node_id, prefix="ent-") + entity_content = f"{node_id}\n{node_description}" + vdb_data = { + entity_vdb_id: { + "content": entity_content, + "entity_name": node_id, + "source_id": node_source_id, + "entity_type": "UNKNOWN", + "file_path": node_file_path, + } + } + await safe_vdb_operation_with_exception( + operation=lambda payload=vdb_data: entities_vdb.upsert(payload), + operation_name="rebuild_added_entity_upsert", + entity_name=node_id, + max_retries=3, + retry_delay=0.1, + ) + await knowledge_graph_inst.upsert_edge(src, tgt, updated_relationship_data) # Update relationship in vector database + # Sort src and tgt to ensure consistent ordering (smaller string first) + if src > tgt: + src, tgt = tgt, src try: rel_vdb_id = compute_mdhash_id(src + tgt, prefix="rel-") rel_vdb_id_reverse = compute_mdhash_id(tgt + src, prefix="rel-") @@ -1286,15 +1570,36 @@ async def _rebuild_single_relationship( logger.error(error_msg) raise # Re-raise exception + # Log rebuild completion with truncation info + status_message = f"Rebuild `{src}`~`{tgt}` from {len(chunk_ids)} chunks" + if truncation_info: + status_message += f" ({truncation_info})" + # Add truncation info from apply_source_ids_limit if truncation occurred + if len(limited_chunk_ids) < len(normalized_chunk_ids): + truncation_info = ( + f" ({limit_method}:{len(limited_chunk_ids)}/{len(normalized_chunk_ids)})" + ) + status_message += truncation_info + + logger.info(status_message) + + # Update pipeline status + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) + async def _merge_nodes_then_upsert( entity_name: str, nodes_data: list[dict], knowledge_graph_inst: BaseGraphStorage, + entity_vdb: BaseVectorStorage | None, global_config: dict, pipeline_status: dict = None, pipeline_status_lock=None, llm_response_cache: BaseKVStorage | None = None, + entity_chunks_storage: BaseKVStorage | None = None, ): """Get existing nodes from knowledge graph use name,if exists, merge data, else create, then upsert.""" already_entity_types = [] @@ -1302,6 +1607,7 @@ async def _merge_nodes_then_upsert( already_description = [] already_file_paths = [] + # 1. Get existing node data from knowledge graph already_node = await knowledge_graph_inst.get_node(entity_name) if already_node: already_entity_types.append(already_node["entity_type"]) @@ -1309,22 +1615,102 @@ async def _merge_nodes_then_upsert( already_file_paths.extend(already_node["file_path"].split(GRAPH_FIELD_SEP)) already_description.extend(already_node["description"].split(GRAPH_FIELD_SEP)) + new_source_ids = [dp["source_id"] for dp in nodes_data if dp.get("source_id")] + + existing_full_source_ids = [] + if entity_chunks_storage is not None: + stored_chunks = await entity_chunks_storage.get_by_id(entity_name) + if stored_chunks and isinstance(stored_chunks, dict): + existing_full_source_ids = [ + chunk_id for chunk_id in stored_chunks.get("chunk_ids", []) if chunk_id + ] + + if not existing_full_source_ids: + existing_full_source_ids = [ + chunk_id for chunk_id in already_source_ids if chunk_id + ] + + # 2. Merging new source ids with existing ones + full_source_ids = merge_source_ids(existing_full_source_ids, new_source_ids) + + if entity_chunks_storage is not None and full_source_ids: + await entity_chunks_storage.upsert( + { + entity_name: { + "chunk_ids": full_source_ids, + "count": len(full_source_ids), + } + } + ) + + # 3. Finalize source_id by applying source ids limit + limit_method = global_config.get("source_ids_limit_method") + max_source_limit = global_config.get("max_source_ids_per_entity") + source_ids = apply_source_ids_limit( + full_source_ids, + max_source_limit, + limit_method, + identifier=f"`{entity_name}`", + ) + + # 4. Only keep nodes not filter by apply_source_ids_limit if limit_method is KEEP + if limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP: + allowed_source_ids = set(source_ids) + filtered_nodes = [] + for dp in nodes_data: + source_id = dp.get("source_id") + # Skip descriptions sourced from chunks dropped by the limitation cap + if ( + source_id + and source_id not in allowed_source_ids + and source_id not in existing_full_source_ids + ): + continue + filtered_nodes.append(dp) + nodes_data = filtered_nodes + else: # In FIFO mode, keep all nodes - truncation happens at source_ids level only + nodes_data = list(nodes_data) + + # 5. Check if we need to skip summary due to source_ids limit + if ( + limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP + and len(existing_full_source_ids) >= max_source_limit + and not nodes_data + ): + if already_node: + logger.info( + f"Skipped `{entity_name}`: KEEP old chunks {already_source_ids}/{len(full_source_ids)}" + ) + existing_node_data = dict(already_node) + return existing_node_data + else: + logger.error(f"Internal Error: already_node missing for `{entity_name}`") + raise ValueError( + f"Internal Error: already_node missing for `{entity_name}`" + ) + + # 6.1 Finalize source_id + source_id = GRAPH_FIELD_SEP.join(source_ids) + + # 6.2 Finalize entity type by highest count entity_type = sorted( Counter( [dp["entity_type"] for dp in nodes_data] + already_entity_types ).items(), key=lambda x: x[1], reverse=True, - )[0][0] # Get the entity type with the highest count + )[0][0] - # Deduplicate by description, keeping first occurrence + # 7. Deduplicate nodes by description, keeping first occurrence in the same document unique_nodes = {} for dp in nodes_data: - desc = dp["description"] + desc = dp.get("description") + if not desc: + continue if desc not in unique_nodes: unique_nodes[desc] = dp - # Sort description by timestamp, then by description length (largest to smallest) when timestamps are the same + # Sort description by timestamp, then by description length when timestamps are the same sorted_nodes = sorted( unique_nodes.values(), key=lambda x: (x.get("timestamp", 0), -len(x.get("description", ""))), @@ -1333,49 +1719,119 @@ async def _merge_nodes_then_upsert( # Combine already_description with sorted new sorted descriptions description_list = already_description + sorted_descriptions + if not description_list: + logger.error(f"Entity {entity_name} has no description") + raise ValueError(f"Entity {entity_name} has no description") - num_fragment = len(description_list) - already_fragment = len(already_description) - deduplicated_num = already_fragment + len(nodes_data) - num_fragment - if deduplicated_num > 0: - dd_message = f"(dd:{deduplicated_num})" - else: - dd_message = "" - if num_fragment > 0: - # Get summary and LLM usage status - description, llm_was_used = await _handle_entity_relation_summary( - "Entity", - entity_name, - description_list, - GRAPH_FIELD_SEP, - global_config, - llm_response_cache, + # Check for cancellation before LLM summary + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + raise PipelineCancelledException("User cancelled during entity summary") + + # 8. Get summary description an LLM usage status + description, llm_was_used = await _handle_entity_relation_summary( + "Entity", + entity_name, + description_list, + GRAPH_FIELD_SEP, + global_config, + llm_response_cache, + ) + + # 9. Build file_path within MAX_FILE_PATHS + file_paths_list = [] + seen_paths = set() + has_placeholder = False # Indicating file_path has been truncated before + + max_file_paths = global_config.get("max_file_paths", DEFAULT_MAX_FILE_PATHS) + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER + ) + + # Collect from already_file_paths, excluding placeholder + for fp in already_file_paths: + if fp and fp.startswith(f"...{file_path_placeholder}"): # Skip placeholders + has_placeholder = True + continue + if fp and fp not in seen_paths: + file_paths_list.append(fp) + seen_paths.add(fp) + + # Collect from new data + for dp in nodes_data: + file_path_item = dp.get("file_path") + if file_path_item and file_path_item not in seen_paths: + file_paths_list.append(file_path_item) + seen_paths.add(file_path_item) + + # Apply count limit + if len(file_paths_list) > max_file_paths: + limit_method = global_config.get( + "source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP + ) + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER + ) + # Add + sign to indicate actual file count is higher + original_count_str = ( + f"{len(file_paths_list)}+" if has_placeholder else str(len(file_paths_list)) ) - # Log based on actual LLM usage - if llm_was_used: - status_message = f"LLMmrg: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}{dd_message}" + if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: + # FIFO: keep tail (newest), discard head + file_paths_list = file_paths_list[-max_file_paths:] + file_paths_list.append(f"...{file_path_placeholder}...(FIFO)") else: - status_message = f"Merged: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}{dd_message}" + # KEEP: keep head (earliest), discard tail + file_paths_list = file_paths_list[:max_file_paths] + file_paths_list.append(f"...{file_path_placeholder}...(KEEP Old)") - if already_fragment > 0 or llm_was_used: - logger.info(status_message) - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) - else: - logger.debug(status_message) + logger.info( + f"Limited `{entity_name}`: file_path {original_count_str} -> {max_file_paths} ({limit_method})" + ) + # Finalize file_path + file_path = GRAPH_FIELD_SEP.join(file_paths_list) + # 10.Log based on actual LLM usage + num_fragment = len(description_list) + already_fragment = len(already_description) + if llm_was_used: + status_message = f"LLMmrg: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}" else: - logger.error(f"Entity {entity_name} has no description") - description = "(no description)" + status_message = f"Merged: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}" - source_id = GRAPH_FIELD_SEP.join( - set([dp["source_id"] for dp in nodes_data] + already_source_ids) - ) - file_path = build_file_path(already_file_paths, nodes_data, entity_name) + truncation_info = truncation_info_log = "" + if len(source_ids) < len(full_source_ids): + # Add truncation info from apply_source_ids_limit if truncation occurred + truncation_info_log = f"{limit_method} {len(source_ids)}/{len(full_source_ids)}" + if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: + truncation_info = truncation_info_log + else: + truncation_info = "KEEP Old" + deduplicated_num = already_fragment + len(nodes_data) - num_fragment + dd_message = "" + if deduplicated_num > 0: + # Duplicated description detected across multiple trucks for the same entity + dd_message = f"dd {deduplicated_num}" + + if dd_message or truncation_info_log: + status_message += ( + f" ({', '.join(filter(None, [truncation_info_log, dd_message]))})" + ) + + # Add message to pipeline satus when merge happens + if already_fragment > 0 or llm_was_used: + logger.info(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) + else: + logger.debug(status_message) + + # 11. Update both graph and vector db node_data = dict( entity_id=entity_name, entity_type=entity_type, @@ -1383,12 +1839,32 @@ async def _merge_nodes_then_upsert( source_id=source_id, file_path=file_path, created_at=int(time.time()), + truncate=truncation_info, ) await knowledge_graph_inst.upsert_node( entity_name, node_data=node_data, ) node_data["entity_name"] = entity_name + if entity_vdb is not None: + entity_vdb_id = compute_mdhash_id(str(entity_name), prefix="ent-") + entity_content = f"{entity_name}\n{description}" + data_for_vdb = { + entity_vdb_id: { + "entity_name": entity_name, + "entity_type": entity_type, + "content": entity_content, + "source_id": source_id, + "file_path": file_path, + } + } + await safe_vdb_operation_with_exception( + operation=lambda payload=data_for_vdb: entity_vdb.upsert(payload), + operation_name="entity_upsert", + entity_name=entity_name, + max_retries=3, + retry_delay=0.1, + ) return node_data @@ -1397,21 +1873,27 @@ async def _merge_edges_then_upsert( tgt_id: str, edges_data: list[dict], knowledge_graph_inst: BaseGraphStorage, + relationships_vdb: BaseVectorStorage | None, + entity_vdb: BaseVectorStorage | None, global_config: dict, pipeline_status: dict = None, pipeline_status_lock=None, llm_response_cache: BaseKVStorage | None = None, added_entities: list = None, # New parameter to track entities added during edge processing + relation_chunks_storage: BaseKVStorage | None = None, + entity_chunks_storage: BaseKVStorage | None = None, ): if src_id == tgt_id: return None + already_edge = None already_weights = [] already_source_ids = [] already_description = [] already_keywords = [] already_file_paths = [] + # 1. Get existing edge data from graph storage if await knowledge_graph_inst.has_edge(src_id, tgt_id): already_edge = await knowledge_graph_inst.get_edge(src_id, tgt_id) # Handle the case where get_edge returns None or missing fields @@ -1445,65 +1927,93 @@ async def _merge_edges_then_upsert( ) ) - # Process edges_data with None checks - weight = sum([dp["weight"] for dp in edges_data] + already_weights) + new_source_ids = [dp["source_id"] for dp in edges_data if dp.get("source_id")] - # Deduplicate by description, keeping first occurrence - unique_edges = {} - for dp in edges_data: - if dp.get("description"): - desc = dp["description"] - if desc not in unique_edges: - unique_edges[desc] = dp + storage_key = make_relation_chunk_key(src_id, tgt_id) + existing_full_source_ids = [] + if relation_chunks_storage is not None: + stored_chunks = await relation_chunks_storage.get_by_id(storage_key) + if stored_chunks and isinstance(stored_chunks, dict): + existing_full_source_ids = [ + chunk_id for chunk_id in stored_chunks.get("chunk_ids", []) if chunk_id + ] - # Sort description by timestamp, then by description length (largest to smallest) when timestamps are the same - sorted_edges = sorted( - unique_edges.values(), - key=lambda x: (x.get("timestamp", 0), -len(x.get("description", ""))), - ) - sorted_descriptions = [dp["description"] for dp in sorted_edges] + if not existing_full_source_ids: + existing_full_source_ids = [ + chunk_id for chunk_id in already_source_ids if chunk_id + ] - # Combine already_description with sorted new descriptions - description_list = already_description + sorted_descriptions + # 2. Merge new source ids with existing ones + full_source_ids = merge_source_ids(existing_full_source_ids, new_source_ids) - num_fragment = len(description_list) - already_fragment = len(already_description) - deduplicated_num = already_fragment + len(edges_data) - num_fragment - if deduplicated_num > 0: - dd_message = f"(dd:{deduplicated_num})" - else: - dd_message = "" - if num_fragment > 0: - # Get summary and LLM usage status - description, llm_was_used = await _handle_entity_relation_summary( - "Relation", - f"({src_id}, {tgt_id})", - description_list, - GRAPH_FIELD_SEP, - global_config, - llm_response_cache, + if relation_chunks_storage is not None and full_source_ids: + await relation_chunks_storage.upsert( + { + storage_key: { + "chunk_ids": full_source_ids, + "count": len(full_source_ids), + } + } ) - # Log based on actual LLM usage - if llm_was_used: - status_message = f"LLMmrg: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}{dd_message}" + # 3. Finalize source_id by applying source ids limit + limit_method = global_config.get("source_ids_limit_method") + max_source_limit = global_config.get("max_source_ids_per_relation") + source_ids = apply_source_ids_limit( + full_source_ids, + max_source_limit, + limit_method, + identifier=f"`{src_id}`~`{tgt_id}`", + ) + limit_method = ( + global_config.get("source_ids_limit_method") or SOURCE_IDS_LIMIT_METHOD_KEEP + ) + + # 4. Only keep edges with source_id in the final source_ids list if in KEEP mode + if limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP: + allowed_source_ids = set(source_ids) + filtered_edges = [] + for dp in edges_data: + source_id = dp.get("source_id") + # Skip relationship fragments sourced from chunks dropped by keep oldest cap + if ( + source_id + and source_id not in allowed_source_ids + and source_id not in existing_full_source_ids + ): + continue + filtered_edges.append(dp) + edges_data = filtered_edges + else: # In FIFO mode, keep all edges - truncation happens at source_ids level only + edges_data = list(edges_data) + + # 5. Check if we need to skip summary due to source_ids limit + if ( + limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP + and len(existing_full_source_ids) >= max_source_limit + and not edges_data + ): + if already_edge: + logger.info( + f"Skipped `{src_id}`~`{tgt_id}`: KEEP old chunks {already_source_ids}/{len(full_source_ids)}" + ) + existing_edge_data = dict(already_edge) + return existing_edge_data else: - status_message = f"Merged: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}{dd_message}" + logger.error( + f"Internal Error: already_node missing for `{src_id}`~`{tgt_id}`" + ) + raise ValueError( + f"Internal Error: already_node missing for `{src_id}`~`{tgt_id}`" + ) - if already_fragment > 0 or llm_was_used: - logger.info(status_message) - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) - else: - logger.debug(status_message) + # 6.1 Finalize source_id + source_id = GRAPH_FIELD_SEP.join(source_ids) - else: - logger.error(f"Edge {src_id} - {tgt_id} has no description") - description = "(no description)" + # 6.2 Finalize weight by summing new edges and existing weights + weight = sum([dp["weight"] for dp in edges_data] + already_weights) - # Split all existing and new keywords into individual terms, then combine and deduplicate + # 6.2 Finalize keywords by merging existing and new keywords all_keywords = set() # Process already_keywords (which are comma-separated) for keyword_str in already_keywords: @@ -1518,26 +2028,194 @@ async def _merge_edges_then_upsert( # Join all unique keywords with commas keywords = ",".join(sorted(all_keywords)) - source_id = GRAPH_FIELD_SEP.join( - set( - [dp["source_id"] for dp in edges_data if dp.get("source_id")] - + already_source_ids - ) - ) - file_path = build_file_path(already_file_paths, edges_data, f"{src_id}-{tgt_id}") + # 7. Deduplicate by description, keeping first occurrence in the same document + unique_edges = {} + for dp in edges_data: + description_value = dp.get("description") + if not description_value: + continue + if description_value not in unique_edges: + unique_edges[description_value] = dp + # Sort description by timestamp, then by description length (largest to smallest) when timestamps are the same + sorted_edges = sorted( + unique_edges.values(), + key=lambda x: (x.get("timestamp", 0), -len(x.get("description", ""))), + ) + sorted_descriptions = [dp["description"] for dp in sorted_edges] + + # Combine already_description with sorted new descriptions + description_list = already_description + sorted_descriptions + if not description_list: + logger.error(f"Relation {src_id}~{tgt_id} has no description") + raise ValueError(f"Relation {src_id}~{tgt_id} has no description") + + # Check for cancellation before LLM summary + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + raise PipelineCancelledException( + "User cancelled during relation summary" + ) + + # 8. Get summary description an LLM usage status + description, llm_was_used = await _handle_entity_relation_summary( + "Relation", + f"({src_id}, {tgt_id})", + description_list, + GRAPH_FIELD_SEP, + global_config, + llm_response_cache, + ) + + # 9. Build file_path within MAX_FILE_PATHS limit + file_paths_list = [] + seen_paths = set() + has_placeholder = False # Track if already_file_paths contains placeholder + + max_file_paths = global_config.get("max_file_paths", DEFAULT_MAX_FILE_PATHS) + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER + ) + + # Collect from already_file_paths, excluding placeholder + for fp in already_file_paths: + # Check if this is a placeholder record + if fp and fp.startswith(f"...{file_path_placeholder}"): # Skip placeholders + has_placeholder = True + continue + if fp and fp not in seen_paths: + file_paths_list.append(fp) + seen_paths.add(fp) + + # Collect from new data + for dp in edges_data: + file_path_item = dp.get("file_path") + if file_path_item and file_path_item not in seen_paths: + file_paths_list.append(file_path_item) + seen_paths.add(file_path_item) + + # Apply count limit + max_file_paths = global_config.get("max_file_paths") + + if len(file_paths_list) > max_file_paths: + limit_method = global_config.get( + "source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP + ) + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER + ) + + # Add + sign to indicate actual file count is higher + original_count_str = ( + f"{len(file_paths_list)}+" if has_placeholder else str(len(file_paths_list)) + ) + + if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: + # FIFO: keep tail (newest), discard head + file_paths_list = file_paths_list[-max_file_paths:] + file_paths_list.append(f"...{file_path_placeholder}...(FIFO)") + else: + # KEEP: keep head (earliest), discard tail + file_paths_list = file_paths_list[:max_file_paths] + file_paths_list.append(f"...{file_path_placeholder}...(KEEP Old)") + + logger.info( + f"Limited `{src_id}`~`{tgt_id}`: file_path {original_count_str} -> {max_file_paths} ({limit_method})" + ) + # Finalize file_path + file_path = GRAPH_FIELD_SEP.join(file_paths_list) + + # 10. Log based on actual LLM usage + num_fragment = len(description_list) + already_fragment = len(already_description) + if llm_was_used: + status_message = f"LLMmrg: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}" + else: + status_message = f"Merged: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}" + + truncation_info = truncation_info_log = "" + if len(source_ids) < len(full_source_ids): + # Add truncation info from apply_source_ids_limit if truncation occurred + truncation_info_log = f"{limit_method} {len(source_ids)}/{len(full_source_ids)}" + if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: + truncation_info = truncation_info_log + else: + truncation_info = "KEEP Old" + + deduplicated_num = already_fragment + len(edges_data) - num_fragment + dd_message = "" + if deduplicated_num > 0: + # Duplicated description detected across multiple trucks for the same entity + dd_message = f"dd {deduplicated_num}" + + if dd_message or truncation_info_log: + status_message += ( + f" ({', '.join(filter(None, [truncation_info_log, dd_message]))})" + ) + + # Add message to pipeline satus when merge happens + if already_fragment > 0 or llm_was_used: + logger.info(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) + else: + logger.debug(status_message) + + # 11. Update both graph and vector db for need_insert_id in [src_id, tgt_id]: - if not (await knowledge_graph_inst.has_node(need_insert_id)): + # Optimization: Use get_node instead of has_node + get_node + existing_node = await knowledge_graph_inst.get_node(need_insert_id) + + if existing_node is None: + # Node doesn't exist - create new node + node_created_at = int(time.time()) node_data = { "entity_id": need_insert_id, "source_id": source_id, "description": description, "entity_type": "UNKNOWN", "file_path": file_path, - "created_at": int(time.time()), + "created_at": node_created_at, + "truncate": "", } await knowledge_graph_inst.upsert_node(need_insert_id, node_data=node_data) + # Update entity_chunks_storage for the newly created entity + if entity_chunks_storage is not None: + chunk_ids = [chunk_id for chunk_id in full_source_ids if chunk_id] + if chunk_ids: + await entity_chunks_storage.upsert( + { + need_insert_id: { + "chunk_ids": chunk_ids, + "count": len(chunk_ids), + } + } + ) + + if entity_vdb is not None: + entity_vdb_id = compute_mdhash_id(need_insert_id, prefix="ent-") + entity_content = f"{need_insert_id}\n{description}" + vdb_data = { + entity_vdb_id: { + "content": entity_content, + "entity_name": need_insert_id, + "source_id": source_id, + "entity_type": "UNKNOWN", + "file_path": file_path, + } + } + await safe_vdb_operation_with_exception( + operation=lambda payload=vdb_data: entity_vdb.upsert(payload), + operation_name="added_entity_upsert", + entity_name=need_insert_id, + max_retries=3, + retry_delay=0.1, + ) + # Track entities added during edge processing if added_entities is not None: entity_data = { @@ -1546,10 +2224,114 @@ async def _merge_edges_then_upsert( "description": description, "source_id": source_id, "file_path": file_path, - "created_at": int(time.time()), + "created_at": node_created_at, } added_entities.append(entity_data) + else: + # Node exists - update its source_ids by merging with new source_ids + updated = False # Track if any update occurred + # 1. Get existing full source_ids from entity_chunks_storage + existing_full_source_ids = [] + if entity_chunks_storage is not None: + stored_chunks = await entity_chunks_storage.get_by_id(need_insert_id) + if stored_chunks and isinstance(stored_chunks, dict): + existing_full_source_ids = [ + chunk_id + for chunk_id in stored_chunks.get("chunk_ids", []) + if chunk_id + ] + + # If not in entity_chunks_storage, get from graph database + if not existing_full_source_ids: + if existing_node.get("source_id"): + existing_full_source_ids = existing_node["source_id"].split( + GRAPH_FIELD_SEP + ) + + # 2. Merge with new source_ids from this relationship + new_source_ids_from_relation = [ + chunk_id for chunk_id in source_ids if chunk_id + ] + merged_full_source_ids = merge_source_ids( + existing_full_source_ids, new_source_ids_from_relation + ) + + # 3. Save merged full list to entity_chunks_storage (conditional) + if ( + entity_chunks_storage is not None + and merged_full_source_ids != existing_full_source_ids + ): + updated = True + await entity_chunks_storage.upsert( + { + need_insert_id: { + "chunk_ids": merged_full_source_ids, + "count": len(merged_full_source_ids), + } + } + ) + + # 4. Apply source_ids limit for graph and vector db + limit_method = global_config.get( + "source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP + ) + max_source_limit = global_config.get("max_source_ids_per_entity") + limited_source_ids = apply_source_ids_limit( + merged_full_source_ids, + max_source_limit, + limit_method, + identifier=f"`{need_insert_id}`", + ) + + # 5. Update graph database and vector database with limited source_ids (conditional) + limited_source_id_str = GRAPH_FIELD_SEP.join(limited_source_ids) + + if limited_source_id_str != existing_node.get("source_id", ""): + updated = True + updated_node_data = { + **existing_node, + "source_id": limited_source_id_str, + } + await knowledge_graph_inst.upsert_node( + need_insert_id, node_data=updated_node_data + ) + + # Update vector database + if entity_vdb is not None: + entity_vdb_id = compute_mdhash_id(need_insert_id, prefix="ent-") + entity_content = ( + f"{need_insert_id}\n{existing_node.get('description', '')}" + ) + vdb_data = { + entity_vdb_id: { + "content": entity_content, + "entity_name": need_insert_id, + "source_id": limited_source_id_str, + "entity_type": existing_node.get("entity_type", "UNKNOWN"), + "file_path": existing_node.get( + "file_path", "unknown_source" + ), + } + } + await safe_vdb_operation_with_exception( + operation=lambda payload=vdb_data: entity_vdb.upsert(payload), + operation_name="existing_entity_update", + entity_name=need_insert_id, + max_retries=3, + retry_delay=0.1, + ) + + # 6. Log once at the end if any update occurred + if updated: + status_message = f"Chunks appended from relation: `{need_insert_id}`" + logger.info(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) + + edge_created_at = int(time.time()) await knowledge_graph_inst.upsert_edge( src_id, tgt_id, @@ -1559,7 +2341,8 @@ async def _merge_edges_then_upsert( keywords=keywords, source_id=source_id, file_path=file_path, - created_at=int(time.time()), + created_at=edge_created_at, + truncate=truncation_info, ), ) @@ -1570,9 +2353,45 @@ async def _merge_edges_then_upsert( keywords=keywords, source_id=source_id, file_path=file_path, - created_at=int(time.time()), + created_at=edge_created_at, + truncate=truncation_info, + weight=weight, ) + # Sort src_id and tgt_id to ensure consistent ordering (smaller string first) + if src_id > tgt_id: + src_id, tgt_id = tgt_id, src_id + + if relationships_vdb is not None: + rel_vdb_id = compute_mdhash_id(src_id + tgt_id, prefix="rel-") + rel_vdb_id_reverse = compute_mdhash_id(tgt_id + src_id, prefix="rel-") + try: + await relationships_vdb.delete([rel_vdb_id, rel_vdb_id_reverse]) + except Exception as e: + logger.debug( + f"Could not delete old relationship vector records {rel_vdb_id}, {rel_vdb_id_reverse}: {e}" + ) + rel_content = f"{keywords}\t{src_id}\n{tgt_id}\n{description}" + vdb_data = { + rel_vdb_id: { + "src_id": src_id, + "tgt_id": tgt_id, + "source_id": source_id, + "content": rel_content, + "keywords": keywords, + "description": description, + "weight": weight, + "file_path": file_path, + } + } + await safe_vdb_operation_with_exception( + operation=lambda payload=vdb_data: relationships_vdb.upsert(payload), + operation_name="relationship_upsert", + entity_name=f"{src_id}-{tgt_id}", + max_retries=3, + retry_delay=0.2, + ) + return edge_data @@ -1588,6 +2407,8 @@ async def merge_nodes_and_edges( pipeline_status: dict = None, pipeline_status_lock=None, llm_response_cache: BaseKVStorage | None = None, + entity_chunks_storage: BaseKVStorage | None = None, + relation_chunks_storage: BaseKVStorage | None = None, current_file_number: int = 0, total_files: int = 0, file_path: str = "unknown_source", @@ -1611,11 +2432,19 @@ async def merge_nodes_and_edges( pipeline_status: Pipeline status dictionary pipeline_status_lock: Lock for pipeline status llm_response_cache: LLM response cache + entity_chunks_storage: Storage tracking full chunk lists per entity + relation_chunks_storage: Storage tracking full chunk lists per relation current_file_number: Current file number for logging total_files: Total files for logging file_path: File path for logging """ + # Check for cancellation at the start of merge + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + raise PipelineCancelledException("User cancelled during merge phase") + # Collect all nodes and edges from all chunks all_nodes = defaultdict(list) all_edges = defaultdict(list) @@ -1652,55 +2481,37 @@ async def merge_nodes_and_edges( async def _locked_process_entity_name(entity_name, entities): async with semaphore: + # Check for cancellation before processing entity + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + raise PipelineCancelledException( + "User cancelled during entity merge" + ) + workspace = global_config.get("workspace", "") namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" async with get_storage_keyed_lock( [entity_name], namespace=namespace, enable_logging=False ): try: - # Graph database operation (critical path, must succeed) + logger.debug(f"Processing entity {entity_name}") entity_data = await _merge_nodes_then_upsert( entity_name, entities, knowledge_graph_inst, + entity_vdb, global_config, pipeline_status, pipeline_status_lock, llm_response_cache, + entity_chunks_storage, ) - # Vector database operation (equally critical, must succeed) - if entity_vdb is not None and entity_data: - data_for_vdb = { - compute_mdhash_id( - entity_data["entity_name"], prefix="ent-" - ): { - "entity_name": entity_data["entity_name"], - "entity_type": entity_data["entity_type"], - "content": f"{entity_data['entity_name']}\n{entity_data['description']}", - "source_id": entity_data["source_id"], - "file_path": entity_data.get( - "file_path", "unknown_source" - ), - } - } - - # Use safe operation wrapper - VDB failure must throw exception - await safe_vdb_operation_with_exception( - operation=lambda: entity_vdb.upsert(data_for_vdb), - operation_name="entity_upsert", - entity_name=entity_name, - max_retries=3, - retry_delay=0.1, - ) - return entity_data except Exception as e: - # Any database operation failure is critical - error_msg = ( - f"Critical error in entity processing for `{entity_name}`: {e}" - ) + error_msg = f"Error processing entity `{entity_name}`: {e}" logger.error(error_msg) # Try to update pipeline status, but don't let status update failure affect main exception @@ -1736,36 +2547,32 @@ async def merge_nodes_and_edges( entity_tasks, return_when=asyncio.FIRST_EXCEPTION ) - # Check if any task raised an exception and ensure all exceptions are retrieved first_exception = None - successful_results = [] + processed_entities = [] for task in done: try: - exception = task.exception() - if exception is not None: - if first_exception is None: - first_exception = exception - else: - successful_results.append(task.result()) - except Exception as e: + result = task.result() + except BaseException as e: if first_exception is None: first_exception = e + else: + processed_entities.append(result) + + if pending: + for task in pending: + task.cancel() + pending_results = await asyncio.gather(*pending, return_exceptions=True) + for result in pending_results: + if isinstance(result, BaseException): + if first_exception is None: + first_exception = result + else: + processed_entities.append(result) - # If any task failed, cancel all pending tasks and raise the first exception if first_exception is not None: - # Cancel all pending tasks - for pending_task in pending: - pending_task.cancel() - # Wait for cancellation to complete - if pending: - await asyncio.wait(pending) - # Re-raise the first exception to notify the caller raise first_exception - # If all tasks completed successfully, collect results - processed_entities = [task.result() for task in entity_tasks] - # ===== Phase 2: Process all relationships concurrently ===== log_message = f"Phase 2: Processing {total_relations_count} relations from {doc_id} (async: {graph_max_async})" logger.info(log_message) @@ -1775,6 +2582,14 @@ async def merge_nodes_and_edges( async def _locked_process_edges(edge_key, edges): async with semaphore: + # Check for cancellation before processing edges + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + raise PipelineCancelledException( + "User cancelled during relation merge" + ) + workspace = global_config.get("workspace", "") namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" sorted_edge_key = sorted([edge_key[0], edge_key[1]]) @@ -1787,83 +2602,30 @@ async def merge_nodes_and_edges( try: added_entities = [] # Track entities added during edge processing - # Graph database operation (critical path, must succeed) + logger.debug(f"Processing relation {sorted_edge_key}") edge_data = await _merge_edges_then_upsert( edge_key[0], edge_key[1], edges, knowledge_graph_inst, + relationships_vdb, + entity_vdb, global_config, pipeline_status, pipeline_status_lock, llm_response_cache, added_entities, # Pass list to collect added entities + relation_chunks_storage, + entity_chunks_storage, # Add entity_chunks_storage parameter ) if edge_data is None: return None, [] - # Vector database operation (equally critical, must succeed) - if relationships_vdb is not None: - data_for_vdb = { - compute_mdhash_id( - edge_data["src_id"] + edge_data["tgt_id"], prefix="rel-" - ): { - "src_id": edge_data["src_id"], - "tgt_id": edge_data["tgt_id"], - "keywords": edge_data["keywords"], - "content": f"{edge_data['src_id']}\t{edge_data['tgt_id']}\n{edge_data['keywords']}\n{edge_data['description']}", - "source_id": edge_data["source_id"], - "file_path": edge_data.get( - "file_path", "unknown_source" - ), - "weight": edge_data.get("weight", 1.0), - } - } - - # Use safe operation wrapper - VDB failure must throw exception - await safe_vdb_operation_with_exception( - operation=lambda: relationships_vdb.upsert(data_for_vdb), - operation_name="relationship_upsert", - entity_name=f"{edge_data['src_id']}-{edge_data['tgt_id']}", - max_retries=3, - retry_delay=0.1, - ) - - # Update added_entities to entity vector database using safe operation wrapper - if added_entities and entity_vdb is not None: - for entity_data in added_entities: - entity_vdb_id = compute_mdhash_id( - entity_data["entity_name"], prefix="ent-" - ) - entity_content = f"{entity_data['entity_name']}\n{entity_data['description']}" - - vdb_data = { - entity_vdb_id: { - "content": entity_content, - "entity_name": entity_data["entity_name"], - "source_id": entity_data["source_id"], - "entity_type": entity_data["entity_type"], - "file_path": entity_data.get( - "file_path", "unknown_source" - ), - } - } - - # Use safe operation wrapper - VDB failure must throw exception - await safe_vdb_operation_with_exception( - operation=lambda data=vdb_data: entity_vdb.upsert(data), - operation_name="added_entity_upsert", - entity_name=entity_data["entity_name"], - max_retries=3, - retry_delay=0.1, - ) - return edge_data, added_entities except Exception as e: - # Any database operation failure is critical - error_msg = f"Critical error in relationship processing for `{sorted_edge_key}`: {e}" + error_msg = f"Error processing relation `{sorted_edge_key}`: {e}" logger.error(error_msg) # Try to update pipeline status, but don't let status update failure affect main exception @@ -1901,40 +2663,36 @@ async def merge_nodes_and_edges( edge_tasks, return_when=asyncio.FIRST_EXCEPTION ) - # Check if any task raised an exception and ensure all exceptions are retrieved first_exception = None - successful_results = [] for task in done: try: - exception = task.exception() - if exception is not None: - if first_exception is None: - first_exception = exception - else: - successful_results.append(task.result()) - except Exception as e: + edge_data, added_entities = task.result() + except BaseException as e: if first_exception is None: first_exception = e + else: + if edge_data is not None: + processed_edges.append(edge_data) + all_added_entities.extend(added_entities) + + if pending: + for task in pending: + task.cancel() + pending_results = await asyncio.gather(*pending, return_exceptions=True) + for result in pending_results: + if isinstance(result, BaseException): + if first_exception is None: + first_exception = result + else: + edge_data, added_entities = result + if edge_data is not None: + processed_edges.append(edge_data) + all_added_entities.extend(added_entities) - # If any task failed, cancel all pending tasks and raise the first exception if first_exception is not None: - # Cancel all pending tasks - for pending_task in pending: - pending_task.cancel() - # Wait for cancellation to complete - if pending: - await asyncio.wait(pending) - # Re-raise the first exception to notify the caller raise first_exception - # If all tasks completed successfully, collect results - for task in edge_tasks: - edge_data, added_entities = task.result() - if edge_data is not None: - processed_edges.append(edge_data) - all_added_entities.extend(added_entities) - # ===== Phase 3: Update full_entities and full_relations storage ===== if full_entities_storage and full_relations_storage and doc_id: try: @@ -2015,6 +2773,14 @@ async def extract_entities( llm_response_cache: BaseKVStorage | None = None, text_chunks_storage: BaseKVStorage | None = None, ) -> list: + # Check for cancellation at the start of entity extraction + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + raise PipelineCancelledException( + "User cancelled during entity extraction" + ) + use_llm_func: callable = global_config["llm_model_func"] entity_extract_max_gleaning = global_config["entity_extract_max_gleaning"] @@ -2182,6 +2948,14 @@ async def extract_entities( async def _process_with_semaphore(chunk): async with semaphore: + # Check for cancellation before processing chunk + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + raise PipelineCancelledException( + "User cancelled during chunk processing" + ) + try: return await _process_single_content(chunk) except Exception as e: @@ -2225,7 +2999,7 @@ async def extract_entities( await asyncio.wait(pending) # Add progress prefix to the exception message - progress_prefix = f"C[{processed_chunks+1}/{total_chunks}]" + progress_prefix = f"C[{processed_chunks + 1}/{total_chunks}]" # Re-raise the original exception with a prefix prefixed_exception = create_prefixed_exception(first_exception, progress_prefix) @@ -2247,7 +3021,7 @@ async def kg_query( hashing_kv: BaseKVStorage | None = None, system_prompt: str | None = None, chunks_vdb: BaseVectorStorage = None, -) -> QueryResult: +) -> QueryResult | None: """ Execute knowledge graph query and return unified QueryResult object. @@ -2264,7 +3038,7 @@ async def kg_query( chunks_vdb: Document chunks vector database Returns: - QueryResult: Unified query result object containing: + QueryResult | None: Unified query result object containing: - content: Non-streaming response text content - response_iterator: Streaming response iterator - raw_data: Complete structured data (including references and metadata) @@ -2275,6 +3049,8 @@ async def kg_query( - only_need_prompt=True: content contains complete prompt - stream=True: response_iterator contains streaming response, raw_data contains complete data - default: content contains LLM response text, raw_data contains complete data + + Returns None when no relevant context could be constructed for the query. """ if not query: return QueryResult(content=PROMPTS["fail_response"]) @@ -2322,7 +3098,8 @@ async def kg_query( ) if context_result is None: - return QueryResult(content=PROMPTS["fail_response"]) + logger.info("[kg_query] No query context could be built; returning no-result.") + return None # Return different content based on query parameters if query_param.only_need_context and not query_param.only_need_prompt: @@ -2676,10 +3453,10 @@ async def _perform_kg_search( ) query_embedding = None if query and (kg_chunk_pick_method == "VECTOR" or chunks_vdb): - embedding_func_config = text_chunks_db.embedding_func - if embedding_func_config and embedding_func_config.func: + actual_embedding_func = text_chunks_db.embedding_func + if actual_embedding_func: try: - query_embedding = await embedding_func_config.func([query]) + query_embedding = await actual_embedding_func([query]) query_embedding = query_embedding[ 0 ] # Extract first embedding from batch result @@ -3083,7 +3860,7 @@ async def _merge_all_chunks( return merged_chunks -async def _build_llm_context( +async def _build_context_str( entities_context: list[dict], relations_context: list[dict], merged_chunks: list[dict], @@ -3183,23 +3960,32 @@ async def _build_llm_context( truncated_chunks ) - # Rebuild text_units_context with truncated chunks + # Rebuild chunks_context with truncated chunks # The actual tokens may be slightly less than available_chunk_tokens due to deduplication logic - text_units_context = [] + chunks_context = [] for i, chunk in enumerate(truncated_chunks): - text_units_context.append( + chunks_context.append( { "reference_id": chunk["reference_id"], "content": chunk["content"], } ) + text_units_str = "\n".join( + json.dumps(text_unit, ensure_ascii=False) for text_unit in chunks_context + ) + reference_list_str = "\n".join( + f"[{ref['reference_id']}] {ref['file_path']}" + for ref in reference_list + if ref["reference_id"] + ) + logger.info( - f"Final context: {len(entities_context)} entities, {len(relations_context)} relations, {len(text_units_context)} chunks" + f"Final context: {len(entities_context)} entities, {len(relations_context)} relations, {len(chunks_context)} chunks" ) # not necessary to use LLM to generate a response - if not entities_context and not relations_context: + if not entities_context and not relations_context and not chunks_context: # Return empty raw data structure when no entities/relations empty_raw_data = convert_to_user_format( [], @@ -3228,16 +4014,7 @@ async def _build_llm_context( chunk_tracking_log.append("?0/0") if chunk_tracking_log: - logger.info(f"chunks S+F/O: {' '.join(chunk_tracking_log)}") - - text_units_str = "\n".join( - json.dumps(text_unit, ensure_ascii=False) for text_unit in text_units_context - ) - reference_list_str = "\n".join( - f"[{ref['reference_id']}] {ref['file_path']}" - for ref in reference_list - if ref["reference_id"] - ) + logger.info(f"Final chunks S+F/O: {' '.join(chunk_tracking_log)}") result = kg_context_template.format( entities_str=entities_str, @@ -3248,7 +4025,7 @@ async def _build_llm_context( # Always return both context and complete data structure (unified approach) logger.debug( - f"[_build_llm_context] Converting to user format: {len(entities_context)} entities, {len(relations_context)} relations, {len(truncated_chunks)} chunks" + f"[_build_context_str] Converting to user format: {len(entities_context)} entities, {len(relations_context)} relations, {len(truncated_chunks)} chunks" ) final_data = convert_to_user_format( entities_context, @@ -3260,7 +4037,7 @@ async def _build_llm_context( relation_id_to_original, ) logger.debug( - f"[_build_llm_context] Final data after conversion: {len(final_data.get('entities', []))} entities, {len(final_data.get('relationships', []))} relationships, {len(final_data.get('chunks', []))} chunks" + f"[_build_context_str] Final data after conversion: {len(final_data.get('entities', []))} entities, {len(final_data.get('relationships', []))} relationships, {len(final_data.get('chunks', []))} chunks" ) return result, final_data @@ -3329,12 +4106,16 @@ async def _build_query_context( query_embedding=search_result["query_embedding"], ) - if not merged_chunks: + if ( + not merged_chunks + and not truncation_result["entities_context"] + and not truncation_result["relations_context"] + ): return None # Stage 4: Build final LLM context with dynamic token processing - # _build_llm_context now always returns tuple[str, dict] - context, raw_data = await _build_llm_context( + # _build_context_str now always returns tuple[str, dict] + context, raw_data = await _build_context_str( entities_context=truncation_result["entities_context"], relations_context=truncation_result["relations_context"], merged_chunks=merged_chunks, @@ -3583,25 +4364,21 @@ async def _find_related_text_unit_from_entities( num_of_chunks = int(max_related_chunks * len(entities_with_chunks) / 2) # Get embedding function from global config - embedding_func_config = text_chunks_db.embedding_func - if not embedding_func_config: + actual_embedding_func = text_chunks_db.embedding_func + if not actual_embedding_func: logger.warning("No embedding function found, falling back to WEIGHT method") kg_chunk_pick_method = "WEIGHT" else: try: - actual_embedding_func = embedding_func_config.func - - selected_chunk_ids = None - if actual_embedding_func: - selected_chunk_ids = await pick_by_vector_similarity( - query=query, - text_chunks_storage=text_chunks_db, - chunks_vdb=chunks_vdb, - num_of_chunks=num_of_chunks, - entity_info=entities_with_chunks, - embedding_func=actual_embedding_func, - query_embedding=query_embedding, - ) + selected_chunk_ids = await pick_by_vector_similarity( + query=query, + text_chunks_storage=text_chunks_db, + chunks_vdb=chunks_vdb, + num_of_chunks=num_of_chunks, + entity_info=entities_with_chunks, + embedding_func=actual_embedding_func, + query_embedding=query_embedding, + ) if selected_chunk_ids == []: kg_chunk_pick_method = "WEIGHT" @@ -3876,24 +4653,21 @@ async def _find_related_text_unit_from_relations( num_of_chunks = int(max_related_chunks * len(relations_with_chunks) / 2) # Get embedding function from global config - embedding_func_config = text_chunks_db.embedding_func - if not embedding_func_config: + actual_embedding_func = text_chunks_db.embedding_func + if not actual_embedding_func: logger.warning("No embedding function found, falling back to WEIGHT method") kg_chunk_pick_method = "WEIGHT" else: try: - actual_embedding_func = embedding_func_config.func - - if actual_embedding_func: - selected_chunk_ids = await pick_by_vector_similarity( - query=query, - text_chunks_storage=text_chunks_db, - chunks_vdb=chunks_vdb, - num_of_chunks=num_of_chunks, - entity_info=relations_with_chunks, - embedding_func=actual_embedding_func, - query_embedding=query_embedding, - ) + selected_chunk_ids = await pick_by_vector_similarity( + query=query, + text_chunks_storage=text_chunks_db, + chunks_vdb=chunks_vdb, + num_of_chunks=num_of_chunks, + entity_info=relations_with_chunks, + embedding_func=actual_embedding_func, + query_embedding=query_embedding, + ) if selected_chunk_ids == []: kg_chunk_pick_method = "WEIGHT" @@ -3985,7 +4759,7 @@ async def naive_query( global_config: dict[str, str], hashing_kv: BaseKVStorage | None = None, system_prompt: str | None = None, -) -> QueryResult: +) -> QueryResult | None: """ Execute naive query and return unified QueryResult object. @@ -3998,11 +4772,13 @@ async def naive_query( system_prompt: System prompt Returns: - QueryResult: Unified query result object containing: + QueryResult | None: Unified query result object containing: - content: Non-streaming response text content - response_iterator: Streaming response iterator - raw_data: Complete structured data (including references and metadata) - is_streaming: Whether this is a streaming result + + Returns None when no relevant chunks are retrieved. """ if not query: @@ -4023,16 +4799,10 @@ async def naive_query( chunks = await _get_vector_context(query, chunks_vdb, query_param, None) if chunks is None or len(chunks) == 0: - # Build empty raw data structure for naive mode - empty_raw_data = convert_to_user_format( - [], # naive mode has no entities - [], # naive mode has no relationships - [], # no chunks - [], # no references - "naive", + logger.info( + "[naive_query] No relevant document chunks found; returning no-result." ) - empty_raw_data["message"] = "No relevant document chunks found." - return QueryResult(content=PROMPTS["fail_response"], raw_data=empty_raw_data) + return None # Calculate dynamic token limit for chunks max_total_tokens = getattr( @@ -4111,10 +4881,10 @@ async def naive_query( "final_chunks_count": len(processed_chunks_with_ref_ids), } - # Build text_units_context from processed chunks with reference IDs - text_units_context = [] + # Build chunks_context from processed chunks with reference IDs + chunks_context = [] for i, chunk in enumerate(processed_chunks_with_ref_ids): - text_units_context.append( + chunks_context.append( { "reference_id": chunk["reference_id"], "content": chunk["content"], @@ -4122,7 +4892,7 @@ async def naive_query( ) text_units_str = "\n".join( - json.dumps(text_unit, ensure_ascii=False) for text_unit in text_units_context + json.dumps(text_unit, ensure_ascii=False) for text_unit in chunks_context ) reference_list_str = "\n".join( f"[{ref['reference_id']}] {ref['file_path']}" diff --git a/lightrag/utils.py b/lightrag/utils.py index 10b24da7..65c1e4bc 100644 --- a/lightrag/utils.py +++ b/lightrag/utils.py @@ -1,6 +1,8 @@ from __future__ import annotations import weakref +import sys + import asyncio import html import csv @@ -40,6 +42,35 @@ from lightrag.constants import ( SOURCE_IDS_LIMIT_METHOD_FIFO, ) +# Precompile regex pattern for JSON sanitization (module-level, compiled once) +_SURROGATE_PATTERN = re.compile(r"[\uD800-\uDFFF\uFFFE\uFFFF]") + + +class SafeStreamHandler(logging.StreamHandler): + """StreamHandler that gracefully handles closed streams during shutdown. + + This handler prevents "ValueError: I/O operation on closed file" errors + that can occur when pytest or other test frameworks close stdout/stderr + before Python's logging cleanup runs. + """ + + def flush(self): + """Flush the stream, ignoring errors if the stream is closed.""" + try: + super().flush() + except (ValueError, OSError): + # Stream is closed or otherwise unavailable, silently ignore + pass + + def close(self): + """Close the handler, ignoring errors if the stream is already closed.""" + try: + super().close() + except (ValueError, OSError): + # Stream is closed or otherwise unavailable, silently ignore + pass + + # Initialize logger with basic configuration logger = logging.getLogger("lightrag") logger.propagate = False # prevent log message send to root logger @@ -47,7 +78,7 @@ logger.setLevel(logging.INFO) # Add console handler if no handlers exist if not logger.handlers: - console_handler = logging.StreamHandler() + console_handler = SafeStreamHandler() console_handler.setLevel(logging.INFO) formatter = logging.Formatter("%(levelname)s: %(message)s") console_handler.setFormatter(formatter) @@ -56,8 +87,32 @@ if not logger.handlers: # Set httpx logging level to WARNING logging.getLogger("httpx").setLevel(logging.WARNING) -# Precompile regex pattern for JSON sanitization (module-level, compiled once) -_SURROGATE_PATTERN = re.compile(r"[\uD800-\uDFFF\uFFFE\uFFFF]") + +def _patch_ascii_colors_console_handler() -> None: + """Prevent ascii_colors from printing flush errors during interpreter exit.""" + + try: + from ascii_colors import ConsoleHandler + except ImportError: + return + + if getattr(ConsoleHandler, "_lightrag_patched", False): + return + + original_handle_error = ConsoleHandler.handle_error + + def _safe_handle_error(self, message: str) -> None: # type: ignore[override] + exc_type, _, _ = sys.exc_info() + if exc_type in (ValueError, OSError) and "close" in message.lower(): + return + original_handle_error(self, message) + + ConsoleHandler.handle_error = _safe_handle_error # type: ignore[assignment] + ConsoleHandler._lightrag_patched = True # type: ignore[attr-defined] + + +_patch_ascii_colors_console_handler() + # Global import for pypinyin with startup-time logging try: @@ -286,8 +341,8 @@ def setup_logger( logger_instance.handlers = [] # Clear existing handlers logger_instance.propagate = False - # Add console handler - console_handler = logging.StreamHandler() + # Add console handler with safe stream handling + console_handler = SafeStreamHandler() console_handler.setFormatter(simple_formatter) console_handler.setLevel(level) logger_instance.addHandler(console_handler) @@ -363,6 +418,7 @@ class EmbeddingFunc: max_token_size: Optional token limit for the embedding model send_dimensions: Whether to inject embedding_dim as a keyword argument """ + embedding_dim: int func: callable max_token_size: int | None = None # Token limit for the embedding model