From b38177de80953a7060ece21bf0e59c8046e91bf9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20MANSUY?= Date: Thu, 4 Dec 2025 19:18:36 +0800 Subject: [PATCH] cherry-pick a9fec267 --- env.example | 12 +- lightrag/constants.py | 19 +- lightrag/lightrag.py | 348 ++++++++++++++++++++++-- lightrag/operate.py | 618 +++++++++++++++++++++++++++++++++++++----- lightrag/utils.py | 179 +++++++----- 5 files changed, 1009 insertions(+), 167 deletions(-) diff --git a/env.example b/env.example index e0b649e3..3529cf58 100644 --- a/env.example +++ b/env.example @@ -23,7 +23,7 @@ WEBUI_DESCRIPTION="Simple and Fast Graph Based RAG System" # WORKING_DIR= ### Tiktoken cache directory (Store cached files in this folder for offline deployment) -# TIKTOKEN_CACHE_DIR=./temp/tiktoken +# TIKTOKEN_CACHE_DIR=/app/data/tiktoken ### Ollama Emulating Model and Tag # OLLAMA_EMULATING_MODEL_NAME=lightrag @@ -73,8 +73,14 @@ ENABLE_LLM_CACHE=true # MAX_RELATION_TOKENS=8000 ### control the maximum tokens send to LLM (include entities, relations and chunks) # MAX_TOTAL_TOKENS=30000 -### control the maximum chunk_ids stored -# MAX_SOURCE_IDS_PER_ENTITY=500 + +### control the maximum chunk_ids stored in vector and graph db +# MAX_SOURCE_IDS_PER_ENTITY=300 +# MAX_SOURCE_IDS_PER_RELATION=300 +### control chunk_ids limitation method: KEEP, FIFO (KEPP: Ingore New Chunks, FIFO: New chunks replace old chunks) +# SOURCE_IDS_LIMIT_METHOD=KEEP +### Maximum number of file paths stored in entity/relation file_path field +# MAX_FILE_PATHS=30 ### maximum number of related chunks per source entity or relation ### The chunk picker uses this value to determine the total number of chunks selected from KG(knowledge graph) diff --git a/lightrag/constants.py b/lightrag/constants.py index f7b5c41f..62ca1888 100644 --- a/lightrag/constants.py +++ b/lightrag/constants.py @@ -13,7 +13,6 @@ DEFAULT_MAX_GRAPH_NODES = 1000 # Default values for extraction settings DEFAULT_SUMMARY_LANGUAGE = "English" # Default language for document processing DEFAULT_MAX_GLEANING = 1 -DEFAULT_MAX_SOURCE_IDS_PER_ENTITY = 500 # Applies to Both Graph + Vector DBs # Number of description fragments to trigger LLM summary DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE = 8 @@ -58,8 +57,24 @@ DEFAULT_HISTORY_TURNS = 0 DEFAULT_MIN_RERANK_SCORE = 0.0 DEFAULT_RERANK_BINDING = "null" -# File path configuration for vector and graph database(Should not be changed, used in Milvus Schema) +# Default source ids limit in meta data for entity and relation +DEFAULT_MAX_SOURCE_IDS_PER_ENTITY = 3 +DEFAULT_MAX_SOURCE_IDS_PER_RELATION = 3 +SOURCE_IDS_LIMIT_METHOD_KEEP = "KEEP" +SOURCE_IDS_LIMIT_METHOD_FIFO = "FIFO" +DEFAULT_SOURCE_IDS_LIMIT_METHOD = SOURCE_IDS_LIMIT_METHOD_KEEP +VALID_SOURCE_IDS_LIMIT_METHODS = { + SOURCE_IDS_LIMIT_METHOD_KEEP, + SOURCE_IDS_LIMIT_METHOD_FIFO, +} +# Default file_path limit in meta data for entity and relation +DEFAULT_MAX_FILE_PATHS = 2 + +# Field length of file_path in Milvus Schema for entity and relation (Should not be changed) +# file_path must store all file paths up to the DEFAULT_MAX_FILE_PATHS limit within the metadata. DEFAULT_MAX_FILE_PATH_LENGTH = 32768 +# Placeholder for more file paths in meta data for entity and relation (Should not be changed) +DEFAULT_FILE_PATH_MORE_PLACEHOLDER = "truncated" # Default temperature for LLM DEFAULT_TEMPERATURE = 1.0 diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 056b5bca..4380a276 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -41,10 +41,14 @@ from lightrag.constants import ( DEFAULT_MAX_PARALLEL_INSERT, DEFAULT_MAX_GRAPH_NODES, DEFAULT_MAX_SOURCE_IDS_PER_ENTITY, + DEFAULT_MAX_SOURCE_IDS_PER_RELATION, DEFAULT_ENTITY_TYPES, DEFAULT_SUMMARY_LANGUAGE, DEFAULT_LLM_TIMEOUT, DEFAULT_EMBEDDING_TIMEOUT, + DEFAULT_SOURCE_IDS_LIMIT_METHOD, + DEFAULT_MAX_FILE_PATHS, + DEFAULT_FILE_PATH_MORE_PLACEHOLDER, ) from lightrag.utils import get_env_value @@ -99,6 +103,9 @@ from lightrag.utils import ( generate_track_id, convert_to_user_format, logger, + subtract_source_ids, + make_relation_chunk_key, + normalize_source_ids_limit_method, ) from lightrag.types import KnowledgeGraph from dotenv import load_dotenv @@ -362,10 +369,40 @@ class LightRAG: """Maximum number of graph nodes to return in knowledge graph queries.""" max_source_ids_per_entity: int = field( - default=get_env_value("MAX_SOURCE_IDS_PER_ENTITY", DEFAULT_MAX_SOURCE_IDS_PER_ENTITY, int) + default=get_env_value( + "MAX_SOURCE_IDS_PER_ENTITY", DEFAULT_MAX_SOURCE_IDS_PER_ENTITY, int + ) ) """Maximum number of source (chunk) ids in entity Grpah + VDB.""" + max_source_ids_per_relation: int = field( + default=get_env_value( + "MAX_SOURCE_IDS_PER_RELATION", + DEFAULT_MAX_SOURCE_IDS_PER_RELATION, + int, + ) + ) + """Maximum number of source (chunk) ids in relation Graph + VDB.""" + + source_ids_limit_method: str = field( + default_factory=lambda: normalize_source_ids_limit_method( + get_env_value( + "SOURCE_IDS_LIMIT_METHOD", + DEFAULT_SOURCE_IDS_LIMIT_METHOD, + str, + ) + ) + ) + """Strategy for enforcing source_id limits: IGNORE_NEW or FIFO.""" + + max_file_paths: int = field( + default=get_env_value("MAX_FILE_PATHS", DEFAULT_MAX_FILE_PATHS, int) + ) + """Maximum number of file paths to store in entity/relation file_path field.""" + + file_path_more_placeholder: str = field(default=DEFAULT_FILE_PATH_MORE_PLACEHOLDER) + """Placeholder text when file paths exceed max_file_paths limit.""" + addon_params: dict[str, Any] = field( default_factory=lambda: { "language": get_env_value( @@ -535,6 +572,18 @@ class LightRAG: embedding_func=self.embedding_func, ) + self.entity_chunks: BaseKVStorage = self.key_string_value_json_storage_cls( # type: ignore + namespace=NameSpace.KV_STORE_ENTITY_CHUNKS, + workspace=self.workspace, + embedding_func=self.embedding_func, + ) + + self.relation_chunks: BaseKVStorage = self.key_string_value_json_storage_cls( # type: ignore + namespace=NameSpace.KV_STORE_RELATION_CHUNKS, + workspace=self.workspace, + embedding_func=self.embedding_func, + ) + self.chunk_entity_relation_graph: BaseGraphStorage = self.graph_storage_cls( # type: ignore namespace=NameSpace.GRAPH_STORE_CHUNK_ENTITY_RELATION, workspace=self.workspace, @@ -594,6 +643,8 @@ class LightRAG: self.text_chunks, self.full_entities, self.full_relations, + self.entity_chunks, + self.relation_chunks, self.entities_vdb, self.relationships_vdb, self.chunks_vdb, @@ -616,6 +667,8 @@ class LightRAG: ("text_chunks", self.text_chunks), ("full_entities", self.full_entities), ("full_relations", self.full_relations), + ("entity_chunks", self.entity_chunks), + ("relation_chunks", self.relation_chunks), ("entities_vdb", self.entities_vdb), ("relationships_vdb", self.relationships_vdb), ("chunks_vdb", self.chunks_vdb), @@ -671,6 +724,13 @@ class LightRAG: logger.debug("No entities found in graph, skipping migration check") return + try: + # Initialize chunk tracking storage after migration + await self._migrate_chunk_tracking_storage() + except Exception as e: + logger.error(f"Error during chunk_tracking migration: {e}") + raise e + # Check if full_entities and full_relations are empty # Get all processed documents to check their entity/relation data try: @@ -711,11 +771,11 @@ class LightRAG: except Exception as e: logger.error(f"Error during migration check: {e}") - # Don't raise the error, just log it to avoid breaking initialization + raise e except Exception as e: logger.error(f"Error in data migration check: {e}") - # Don't raise the error to avoid breaking initialization + raise e async def _migrate_entity_relation_data(self, processed_docs: dict): """Migrate existing entity and relation data to full_entities and full_relations storage""" @@ -814,6 +874,140 @@ class LightRAG: f"Data migration completed: migrated {migration_count} documents with entities/relations" ) + async def _migrate_chunk_tracking_storage(self) -> None: + """Ensure entity/relation chunk tracking KV stores exist and are seeded.""" + + if not self.entity_chunks or not self.relation_chunks: + return + + need_entity_migration = False + need_relation_migration = False + + try: + need_entity_migration = await self.entity_chunks.is_empty() + except Exception as exc: # pragma: no cover - defensive logging + logger.error(f"Failed to check entity chunks storage: {exc}") + need_entity_migration = True + + try: + need_relation_migration = await self.relation_chunks.is_empty() + except Exception as exc: # pragma: no cover - defensive logging + logger.error(f"Failed to check relation chunks storage: {exc}") + need_relation_migration = True + + if not need_entity_migration and not need_relation_migration: + return + + BATCH_SIZE = 500 # Process 500 records per batch + + if need_entity_migration: + try: + nodes = await self.chunk_entity_relation_graph.get_all_nodes() + except Exception as exc: + logger.error(f"Failed to fetch nodes for chunk migration: {exc}") + nodes = [] + + logger.info(f"Starting chunk_tracking data migration: {len(nodes)} nodes") + + # Process nodes in batches + total_nodes = len(nodes) + total_batches = (total_nodes + BATCH_SIZE - 1) // BATCH_SIZE + total_migrated = 0 + + for batch_idx in range(total_batches): + start_idx = batch_idx * BATCH_SIZE + end_idx = min((batch_idx + 1) * BATCH_SIZE, total_nodes) + batch_nodes = nodes[start_idx:end_idx] + + upsert_payload: dict[str, dict[str, object]] = {} + for node in batch_nodes: + entity_id = node.get("entity_id") or node.get("id") + if not entity_id: + continue + + raw_source = node.get("source_id") or "" + chunk_ids = [ + chunk_id + for chunk_id in raw_source.split(GRAPH_FIELD_SEP) + if chunk_id + ] + if not chunk_ids: + continue + + upsert_payload[entity_id] = { + "chunk_ids": chunk_ids, + "count": len(chunk_ids), + } + + if upsert_payload: + await self.entity_chunks.upsert(upsert_payload) + total_migrated += len(upsert_payload) + logger.info( + f"Processed entity batch {batch_idx + 1}/{total_batches}: {len(upsert_payload)} records (total: {total_migrated}/{total_nodes})" + ) + + if total_migrated > 0: + # Persist entity_chunks data to disk + await self.entity_chunks.index_done_callback() + logger.info( + f"Entity chunk_tracking migration completed: {total_migrated} records persisted" + ) + + if need_relation_migration: + try: + edges = await self.chunk_entity_relation_graph.get_all_edges() + except Exception as exc: + logger.error(f"Failed to fetch edges for chunk migration: {exc}") + edges = [] + + logger.info(f"Starting chunk_tracking data migration: {len(edges)} edges") + + # Process edges in batches + total_edges = len(edges) + total_batches = (total_edges + BATCH_SIZE - 1) // BATCH_SIZE + total_migrated = 0 + + for batch_idx in range(total_batches): + start_idx = batch_idx * BATCH_SIZE + end_idx = min((batch_idx + 1) * BATCH_SIZE, total_edges) + batch_edges = edges[start_idx:end_idx] + + upsert_payload: dict[str, dict[str, object]] = {} + for edge in batch_edges: + src = edge.get("source") or edge.get("src_id") or edge.get("src") + tgt = edge.get("target") or edge.get("tgt_id") or edge.get("tgt") + if not src or not tgt: + continue + + raw_source = edge.get("source_id") or "" + chunk_ids = [ + chunk_id + for chunk_id in raw_source.split(GRAPH_FIELD_SEP) + if chunk_id + ] + if not chunk_ids: + continue + + storage_key = make_relation_chunk_key(src, tgt) + upsert_payload[storage_key] = { + "chunk_ids": chunk_ids, + "count": len(chunk_ids), + } + + if upsert_payload: + await self.relation_chunks.upsert(upsert_payload) + total_migrated += len(upsert_payload) + logger.info( + f"Processed relation batch {batch_idx + 1}/{total_batches}: {len(upsert_payload)} records (total: {total_migrated}/{total_edges})" + ) + + if total_migrated > 0: + # Persist relation_chunks data to disk + await self.relation_chunks.index_done_callback() + logger.info( + f"Relation chunk_tracking migration completed: {total_migrated} records persisted" + ) + async def get_graph_labels(self): text = await self.chunk_entity_relation_graph.get_all_labels() return text @@ -1676,6 +1870,8 @@ class LightRAG: pipeline_status=pipeline_status, pipeline_status_lock=pipeline_status_lock, llm_response_cache=self.llm_response_cache, + entity_chunks_storage=self.entity_chunks, + relation_chunks_storage=self.relation_chunks, current_file_number=current_file_number, total_files=total_files, file_path=file_path, @@ -1845,6 +2041,8 @@ class LightRAG: self.text_chunks, self.full_entities, self.full_relations, + self.entity_chunks, + self.relation_chunks, self.llm_response_cache, self.entities_vdb, self.relationships_vdb, @@ -2718,9 +2916,11 @@ class LightRAG: # 4. Analyze entities and relationships that will be affected entities_to_delete = set() - entities_to_rebuild = {} # entity_name -> remaining_chunk_ids + entities_to_rebuild = {} # entity_name -> remaining chunk id list relationships_to_delete = set() - relationships_to_rebuild = {} # (src, tgt) -> remaining_chunk_ids + relationships_to_rebuild = {} # (src, tgt) -> remaining chunk id list + entity_chunk_updates: dict[str, list[str]] = {} + relation_chunk_updates: dict[tuple[str, str], list[str]] = {} try: # Get affected entities and relations from full_entities and full_relations storage @@ -2776,14 +2976,41 @@ class LightRAG: # Process entities for node_data in affected_nodes: node_label = node_data.get("entity_id") - if node_label and "source_id" in node_data: - sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP)) - remaining_sources = sources - chunk_ids + if not node_label: + continue - if not remaining_sources: - entities_to_delete.add(node_label) - elif remaining_sources != sources: - entities_to_rebuild[node_label] = remaining_sources + existing_sources: list[str] = [] + if self.entity_chunks: + stored_chunks = await self.entity_chunks.get_by_id(node_label) + if stored_chunks and isinstance(stored_chunks, dict): + existing_sources = [ + chunk_id + for chunk_id in stored_chunks.get("chunk_ids", []) + if chunk_id + ] + + if not existing_sources and node_data.get("source_id"): + existing_sources = [ + chunk_id + for chunk_id in node_data["source_id"].split( + GRAPH_FIELD_SEP + ) + if chunk_id + ] + + if not existing_sources: + continue + + remaining_sources = subtract_source_ids(existing_sources, chunk_ids) + + if not remaining_sources: + entities_to_delete.add(node_label) + entity_chunk_updates[node_label] = [] + elif remaining_sources != existing_sources: + entities_to_rebuild[node_label] = remaining_sources + entity_chunk_updates[node_label] = remaining_sources + else: + logger.info(f"Untouch entity: {node_label}") async with pipeline_status_lock: log_message = f"Found {len(entities_to_rebuild)} affected entities" @@ -2796,21 +3023,51 @@ class LightRAG: src = edge_data.get("source") tgt = edge_data.get("target") - if src and tgt and "source_id" in edge_data: - edge_tuple = tuple(sorted((src, tgt))) - if ( - edge_tuple in relationships_to_delete - or edge_tuple in relationships_to_rebuild - ): - continue + if not src or not tgt or "source_id" not in edge_data: + continue - sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP)) - remaining_sources = sources - chunk_ids + edge_tuple = tuple(sorted((src, tgt))) + if ( + edge_tuple in relationships_to_delete + or edge_tuple in relationships_to_rebuild + ): + continue - if not remaining_sources: - relationships_to_delete.add(edge_tuple) - elif remaining_sources != sources: - relationships_to_rebuild[edge_tuple] = remaining_sources + existing_sources: list[str] = [] + if self.relation_chunks: + storage_key = make_relation_chunk_key(src, tgt) + stored_chunks = await self.relation_chunks.get_by_id( + storage_key + ) + if stored_chunks and isinstance(stored_chunks, dict): + existing_sources = [ + chunk_id + for chunk_id in stored_chunks.get("chunk_ids", []) + if chunk_id + ] + + if not existing_sources: + existing_sources = [ + chunk_id + for chunk_id in edge_data["source_id"].split( + GRAPH_FIELD_SEP + ) + if chunk_id + ] + + if not existing_sources: + continue + + remaining_sources = subtract_source_ids(existing_sources, chunk_ids) + + if not remaining_sources: + relationships_to_delete.add(edge_tuple) + relation_chunk_updates[edge_tuple] = [] + elif remaining_sources != existing_sources: + relationships_to_rebuild[edge_tuple] = remaining_sources + relation_chunk_updates[edge_tuple] = remaining_sources + else: + logger.info(f"Untouch relation: {edge_tuple}") async with pipeline_status_lock: log_message = ( @@ -2820,6 +3077,45 @@ class LightRAG: pipeline_status["latest_message"] = log_message pipeline_status["history_messages"].append(log_message) + current_time = int(time.time()) + + if entity_chunk_updates and self.entity_chunks: + entity_upsert_payload = {} + entity_delete_ids: set[str] = set() + for entity_name, remaining in entity_chunk_updates.items(): + if not remaining: + entity_delete_ids.add(entity_name) + else: + entity_upsert_payload[entity_name] = { + "chunk_ids": remaining, + "count": len(remaining), + "updated_at": current_time, + } + + if entity_delete_ids: + await self.entity_chunks.delete(list(entity_delete_ids)) + if entity_upsert_payload: + await self.entity_chunks.upsert(entity_upsert_payload) + + if relation_chunk_updates and self.relation_chunks: + relation_upsert_payload = {} + relation_delete_ids: set[str] = set() + for edge_tuple, remaining in relation_chunk_updates.items(): + storage_key = make_relation_chunk_key(*edge_tuple) + if not remaining: + relation_delete_ids.add(storage_key) + else: + relation_upsert_payload[storage_key] = { + "chunk_ids": remaining, + "count": len(remaining), + "updated_at": current_time, + } + + if relation_delete_ids: + await self.relation_chunks.delete(list(relation_delete_ids)) + if relation_upsert_payload: + await self.relation_chunks.upsert(relation_upsert_payload) + except Exception as e: logger.error(f"Failed to process graph analysis results: {e}") raise Exception(f"Failed to process graph dependencies: {e}") from e @@ -2914,6 +3210,8 @@ class LightRAG: global_config=asdict(self), pipeline_status=pipeline_status, pipeline_status_lock=pipeline_status_lock, + entity_chunks_storage=self.entity_chunks, + relation_chunks_storage=self.relation_chunks, ) except Exception as e: diff --git a/lightrag/operate.py b/lightrag/operate.py index 5fa672a4..6b409f21 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -7,7 +7,7 @@ import json_repair from typing import Any, AsyncIterator, overload, Literal from collections import Counter, defaultdict -from .utils import ( +from lightrag.utils import ( logger, compute_mdhash_id, Tokenizer, @@ -26,15 +26,16 @@ from .utils import ( pick_by_weighted_polling, pick_by_vector_similarity, process_chunks_unified, - build_file_path, - truncate_entity_source_id, safe_vdb_operation_with_exception, create_prefixed_exception, fix_tuple_delimiter_corruption, convert_to_user_format, generate_reference_list_from_chunks, + apply_source_ids_limit, + merge_source_ids, + make_relation_chunk_key, ) -from .base import ( +from lightrag.base import ( BaseGraphStorage, BaseKVStorage, BaseVectorStorage, @@ -43,8 +44,8 @@ from .base import ( QueryResult, QueryContextResult, ) -from .prompt import PROMPTS -from .constants import ( +from lightrag.prompt import PROMPTS +from lightrag.constants import ( GRAPH_FIELD_SEP, DEFAULT_MAX_ENTITY_TOKENS, DEFAULT_MAX_RELATION_TOKENS, @@ -53,8 +54,11 @@ from .constants import ( DEFAULT_KG_CHUNK_PICK_METHOD, DEFAULT_ENTITY_TYPES, DEFAULT_SUMMARY_LANGUAGE, + SOURCE_IDS_LIMIT_METHOD_KEEP, + SOURCE_IDS_LIMIT_METHOD_FIFO, + DEFAULT_FILE_PATH_MORE_PLACEHOLDER, ) -from .kg.shared_storage import get_storage_keyed_lock +from lightrag.kg.shared_storage import get_storage_keyed_lock import time from dotenv import load_dotenv @@ -474,8 +478,8 @@ async def _handle_single_relationship_extraction( async def _rebuild_knowledge_from_chunks( - entities_to_rebuild: dict[str, set[str]], - relationships_to_rebuild: dict[tuple[str, str], set[str]], + entities_to_rebuild: dict[str, list[str]], + relationships_to_rebuild: dict[tuple[str, str], list[str]], knowledge_graph_inst: BaseGraphStorage, entities_vdb: BaseVectorStorage, relationships_vdb: BaseVectorStorage, @@ -484,6 +488,8 @@ async def _rebuild_knowledge_from_chunks( global_config: dict[str, str], pipeline_status: dict | None = None, pipeline_status_lock=None, + entity_chunks_storage: BaseKVStorage | None = None, + relation_chunks_storage: BaseKVStorage | None = None, ) -> None: """Rebuild entity and relationship descriptions from cached extraction results with parallel processing @@ -492,8 +498,8 @@ async def _rebuild_knowledge_from_chunks( controlled by llm_model_max_async and using get_storage_keyed_lock for data consistency. Args: - entities_to_rebuild: Dict mapping entity_name -> set of remaining chunk_ids - relationships_to_rebuild: Dict mapping (src, tgt) -> set of remaining chunk_ids + entities_to_rebuild: Dict mapping entity_name -> list of remaining chunk_ids + relationships_to_rebuild: Dict mapping (src, tgt) -> list of remaining chunk_ids knowledge_graph_inst: Knowledge graph storage entities_vdb: Entity vector database relationships_vdb: Relationship vector database @@ -502,6 +508,8 @@ async def _rebuild_knowledge_from_chunks( global_config: Global configuration containing llm_model_max_async pipeline_status: Pipeline status dictionary pipeline_status_lock: Lock for pipeline status + entity_chunks_storage: KV storage maintaining full chunk IDs per entity + relation_chunks_storage: KV storage maintaining full chunk IDs per relation """ if not entities_to_rebuild and not relationships_to_rebuild: return @@ -641,10 +649,11 @@ async def _rebuild_knowledge_from_chunks( chunk_entities=chunk_entities, llm_response_cache=llm_response_cache, global_config=global_config, + entity_chunks_storage=entity_chunks_storage, ) rebuilt_entities_count += 1 status_message = ( - f"Rebuilt `{entity_name}` from {len(chunk_ids)} chunks" + f"Rebuild `{entity_name}` from {len(chunk_ids)} chunks" ) logger.info(status_message) if pipeline_status is not None and pipeline_status_lock is not None: @@ -682,16 +691,11 @@ async def _rebuild_knowledge_from_chunks( chunk_relationships=chunk_relationships, llm_response_cache=llm_response_cache, global_config=global_config, + relation_chunks_storage=relation_chunks_storage, + pipeline_status=pipeline_status, + pipeline_status_lock=pipeline_status_lock, ) rebuilt_relationships_count += 1 - status_message = ( - f"Rebuilt `{src} - {tgt}` from {len(chunk_ids)} chunks" - ) - logger.info(status_message) - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) except Exception as e: failed_relationships_count += 1 status_message = f"Failed to rebuild `{src} - {tgt}`: {e}" @@ -1002,10 +1006,13 @@ async def _rebuild_single_entity( knowledge_graph_inst: BaseGraphStorage, entities_vdb: BaseVectorStorage, entity_name: str, - chunk_ids: set[str], + chunk_ids: list[str], chunk_entities: dict, llm_response_cache: BaseKVStorage, global_config: dict[str, str], + entity_chunks_storage: BaseKVStorage | None = None, + pipeline_status: dict | None = None, + pipeline_status_lock=None, ) -> None: """Rebuild a single entity from cached extraction results""" @@ -1016,7 +1023,11 @@ async def _rebuild_single_entity( # Helper function to update entity in both graph and vector storage async def _update_entity_storage( - final_description: str, entity_type: str, file_paths: set[str] + final_description: str, + entity_type: str, + file_paths: set[str], + source_chunk_ids: list[str], + truncation_info: str = "", ): try: # Update entity in graph storage (critical path) @@ -1024,10 +1035,12 @@ async def _rebuild_single_entity( **current_entity, "description": final_description, "entity_type": entity_type, - "source_id": GRAPH_FIELD_SEP.join(chunk_ids), + "source_id": GRAPH_FIELD_SEP.join(source_chunk_ids), "file_path": GRAPH_FIELD_SEP.join(file_paths) if file_paths else current_entity.get("file_path", "unknown_source"), + "created_at": int(time.time()), + "truncate": truncation_info, } await knowledge_graph_inst.upsert_node(entity_name, updated_entity_data) @@ -1060,9 +1073,33 @@ async def _rebuild_single_entity( logger.error(error_msg) raise # Re-raise exception - # Collect all entity data from relevant chunks + # normalized_chunk_ids = merge_source_ids([], chunk_ids) + normalized_chunk_ids = chunk_ids + + if entity_chunks_storage is not None and normalized_chunk_ids: + await entity_chunks_storage.upsert( + { + entity_name: { + "chunk_ids": normalized_chunk_ids, + "count": len(normalized_chunk_ids), + } + } + ) + + limit_method = ( + global_config.get("source_ids_limit_method") or SOURCE_IDS_LIMIT_METHOD_KEEP + ) + + limited_chunk_ids = apply_source_ids_limit( + normalized_chunk_ids, + global_config["max_source_ids_per_entity"], + limit_method, + identifier=f"`{entity_name}`", + ) + + # Collect all entity data from relevant (limited) chunks all_entity_data = [] - for chunk_id in chunk_ids: + for chunk_id in limited_chunk_ids: if chunk_id in chunk_entities and entity_name in chunk_entities[chunk_id]: all_entity_data.extend(chunk_entities[chunk_id][entity_name]) @@ -1109,13 +1146,19 @@ async def _rebuild_single_entity( final_description = current_entity.get("description", "") entity_type = current_entity.get("entity_type", "UNKNOWN") - await _update_entity_storage(final_description, entity_type, file_paths) + await _update_entity_storage( + final_description, + entity_type, + file_paths, + limited_chunk_ids, + ) return # Process cached entity data descriptions = [] entity_types = [] - file_paths = set() + file_paths_list = [] + seen_paths = set() for entity_data in all_entity_data: if entity_data.get("description"): @@ -1123,7 +1166,35 @@ async def _rebuild_single_entity( if entity_data.get("entity_type"): entity_types.append(entity_data["entity_type"]) if entity_data.get("file_path"): - file_paths.add(entity_data["file_path"]) + file_path = entity_data["file_path"] + if file_path and file_path not in seen_paths: + file_paths_list.append(file_path) + seen_paths.add(file_path) + + # Apply MAX_FILE_PATHS limit + max_file_paths = global_config.get("max_file_paths") + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER + ) + limit_method = global_config.get("source_ids_limit_method") + + original_count = len(file_paths_list) + if original_count > max_file_paths: + if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: + # FIFO: keep tail (newest), discard head + file_paths_list = file_paths_list[-max_file_paths:] + else: + # KEEP: keep head (earliest), discard tail + file_paths_list = file_paths_list[:max_file_paths] + + file_paths_list.append( + f"...{file_path_placeholder}(showing {max_file_paths} of {original_count})..." + ) + logger.info( + f"Limited `{entity_name}`: file_path {original_count} -> {max_file_paths} ({limit_method})" + ) + + file_paths = set(file_paths_list) # Remove duplicates while preserving order description_list = list(dict.fromkeys(descriptions)) @@ -1149,7 +1220,31 @@ async def _rebuild_single_entity( else: final_description = current_entity.get("description", "") - await _update_entity_storage(final_description, entity_type, file_paths) + if len(limited_chunk_ids) < len(normalized_chunk_ids): + truncation_info = ( + f"{limit_method}:{len(limited_chunk_ids)}/{len(normalized_chunk_ids)}" + ) + else: + truncation_info = "" + + await _update_entity_storage( + final_description, + entity_type, + file_paths, + limited_chunk_ids, + truncation_info, + ) + + # Log rebuild completion with truncation info + status_message = f"Rebuild `{entity_name}` from {len(chunk_ids)} chunks" + if truncation_info: + status_message += f" ({truncation_info})" + logger.info(status_message) + # Update pipeline status + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) async def _rebuild_single_relationship( @@ -1157,10 +1252,13 @@ async def _rebuild_single_relationship( relationships_vdb: BaseVectorStorage, src: str, tgt: str, - chunk_ids: set[str], + chunk_ids: list[str], chunk_relationships: dict, llm_response_cache: BaseKVStorage, global_config: dict[str, str], + relation_chunks_storage: BaseKVStorage | None = None, + pipeline_status: dict | None = None, + pipeline_status_lock=None, ) -> None: """Rebuild a single relationship from cached extraction results @@ -1173,9 +1271,33 @@ async def _rebuild_single_relationship( if not current_relationship: return + # normalized_chunk_ids = merge_source_ids([], chunk_ids) + normalized_chunk_ids = chunk_ids + + if relation_chunks_storage is not None and normalized_chunk_ids: + storage_key = make_relation_chunk_key(src, tgt) + await relation_chunks_storage.upsert( + { + storage_key: { + "chunk_ids": normalized_chunk_ids, + "count": len(normalized_chunk_ids), + } + } + ) + + limit_method = ( + global_config.get("source_ids_limit_method") or SOURCE_IDS_LIMIT_METHOD_KEEP + ) + limited_chunk_ids = apply_source_ids_limit( + normalized_chunk_ids, + global_config["max_source_ids_per_relation"], + limit_method, + identifier=f"`{src}`~`{tgt}`", + ) + # Collect all relationship data from relevant chunks all_relationship_data = [] - for chunk_id in chunk_ids: + for chunk_id in limited_chunk_ids: if chunk_id in chunk_relationships: # Check both (src, tgt) and (tgt, src) since relationships can be bidirectional for edge_key in [(src, tgt), (tgt, src)]: @@ -1192,7 +1314,8 @@ async def _rebuild_single_relationship( descriptions = [] keywords = [] weights = [] - file_paths = set() + file_paths_list = [] + seen_paths = set() for rel_data in all_relationship_data: if rel_data.get("description"): @@ -1202,7 +1325,35 @@ async def _rebuild_single_relationship( if rel_data.get("weight"): weights.append(rel_data["weight"]) if rel_data.get("file_path"): - file_paths.add(rel_data["file_path"]) + file_path = rel_data["file_path"] + if file_path and file_path not in seen_paths: + file_paths_list.append(file_path) + seen_paths.add(file_path) + + # Apply count limit + max_file_paths = global_config.get("max_file_paths") + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER + ) + limit_method = global_config.get("source_ids_limit_method") + + original_count = len(file_paths_list) + if original_count > max_file_paths: + if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: + # FIFO: keep tail (newest), discard head + file_paths_list = file_paths_list[-max_file_paths:] + else: + # KEEP: keep head (earliest), discard tail + file_paths_list = file_paths_list[:max_file_paths] + + file_paths_list.append( + f"...{file_path_placeholder}(showing {max_file_paths} of {original_count})..." + ) + logger.info( + f"Limited `{src}`~`{tgt}`: file_path {original_count} -> {max_file_paths} ({limit_method})" + ) + + file_paths = set(file_paths_list) # Remove duplicates while preserving order description_list = list(dict.fromkeys(descriptions)) @@ -1230,6 +1381,13 @@ async def _rebuild_single_relationship( # fallback to keep current(unchanged) final_description = current_relationship.get("description", "") + if len(limited_chunk_ids) < len(normalized_chunk_ids): + truncation_info = ( + f"{limit_method}:{len(limited_chunk_ids)}/{len(normalized_chunk_ids)}" + ) + else: + truncation_info = "" + # Update relationship in graph storage updated_relationship_data = { **current_relationship, @@ -1238,10 +1396,11 @@ async def _rebuild_single_relationship( else current_relationship.get("description", ""), "keywords": combined_keywords, "weight": weight, - "source_id": GRAPH_FIELD_SEP.join(chunk_ids), + "source_id": GRAPH_FIELD_SEP.join(limited_chunk_ids), "file_path": GRAPH_FIELD_SEP.join([fp for fp in file_paths if fp]) if file_paths else current_relationship.get("file_path", "unknown_source"), + "truncate": truncation_info, } await knowledge_graph_inst.upsert_edge(src, tgt, updated_relationship_data) @@ -1287,6 +1446,25 @@ async def _rebuild_single_relationship( logger.error(error_msg) raise # Re-raise exception + # Log rebuild completion with truncation info + status_message = f"Rebuild `{src} - {tgt}` from {len(chunk_ids)} chunks" + if truncation_info: + status_message += f" ({truncation_info})" + # Add truncation info from apply_source_ids_limit if truncation occurred + if len(limited_chunk_ids) < len(normalized_chunk_ids): + truncation_info = ( + f" ({limit_method}:{len(limited_chunk_ids)}/{len(normalized_chunk_ids)})" + ) + status_message += truncation_info + + logger.info(status_message) + + # Update pipeline status + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) + async def _merge_nodes_then_upsert( entity_name: str, @@ -1296,6 +1474,7 @@ async def _merge_nodes_then_upsert( pipeline_status: dict = None, pipeline_status_lock=None, llm_response_cache: BaseKVStorage | None = None, + entity_chunks_storage: BaseKVStorage | None = None, ): """Get existing nodes from knowledge graph use name,if exists, merge data, else create, then upsert.""" already_entity_types = [] @@ -1318,10 +1497,76 @@ async def _merge_nodes_then_upsert( reverse=True, )[0][0] # Get the entity type with the highest count + original_nodes_count = len(nodes_data) + + new_source_ids = [dp["source_id"] for dp in nodes_data if dp.get("source_id")] + + existing_full_source_ids = [] + if entity_chunks_storage is not None: + stored_chunks = await entity_chunks_storage.get_by_id(entity_name) + if stored_chunks and isinstance(stored_chunks, dict): + existing_full_source_ids = [ + chunk_id for chunk_id in stored_chunks.get("chunk_ids", []) if chunk_id + ] + + if not existing_full_source_ids: + existing_full_source_ids = [ + chunk_id for chunk_id in already_source_ids if chunk_id + ] + + full_source_ids = merge_source_ids(existing_full_source_ids, new_source_ids) + + if entity_chunks_storage is not None and full_source_ids: + await entity_chunks_storage.upsert( + { + entity_name: { + "chunk_ids": full_source_ids, + "count": len(full_source_ids), + } + } + ) + + limit_method = global_config.get("source_ids_limit_method") + max_source_limit = global_config.get("max_source_ids_per_entity") + source_ids = apply_source_ids_limit( + full_source_ids, + max_source_limit, + limit_method, + identifier=f"`{entity_name}`", + ) + + # Only apply filtering in KEEP(ignore new) mode + if limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP: + allowed_source_ids = set(source_ids) + filtered_nodes = [] + for dp in nodes_data: + source_id = dp.get("source_id") + # Skip descriptions sourced from chunks dropped by the limitation cap + if ( + source_id + and source_id not in allowed_source_ids + and source_id not in existing_full_source_ids + ): + continue + filtered_nodes.append(dp) + nodes_data = filtered_nodes + else: + # In FIFO mode, keep all node descriptions - truncation happens at source_ids level only + nodes_data = list(nodes_data) + + skip_summary_due_to_limit = ( + limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP + and len(existing_full_source_ids) >= max_source_limit + and not nodes_data + and already_description + ) + # Deduplicate by description, keeping first occurrence unique_nodes = {} for dp in nodes_data: - desc = dp["description"] + desc = dp.get("description") + if not desc: + continue if desc not in unique_nodes: unique_nodes[desc] = dp @@ -1332,17 +1577,31 @@ async def _merge_nodes_then_upsert( ) sorted_descriptions = [dp["description"] for dp in sorted_nodes] + truncation_info = "" + dd_message = "" + # Combine already_description with sorted new sorted descriptions description_list = already_description + sorted_descriptions + deduplicated_num = original_nodes_count - len(sorted_descriptions) + if deduplicated_num > 0: + dd_message = f"dd:{deduplicated_num}" num_fragment = len(description_list) already_fragment = len(already_description) - deduplicated_num = already_fragment + len(nodes_data) - num_fragment - if deduplicated_num > 0: - dd_message = f"(dd:{deduplicated_num})" - else: - dd_message = "" - if num_fragment > 0: + if skip_summary_due_to_limit: + description = ( + already_node.get("description", "(no description)") + if already_node + else "(no description)" + ) + llm_was_used = False + status_message = f"Skip merge for `{entity_name}`: IGNORE_NEW limit reached" + logger.debug(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) + elif num_fragment > 0: # Get summary and LLM usage status description, llm_was_used = await _handle_entity_relation_summary( "Entity", @@ -1355,9 +1614,16 @@ async def _merge_nodes_then_upsert( # Log based on actual LLM usage if llm_was_used: - status_message = f"LLMmrg: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}{dd_message}" + status_message = f"LLMmrg: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}" else: - status_message = f"Merged: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}{dd_message}" + status_message = f"Merged: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}" + + # Add truncation info from apply_source_ids_limit if truncation occurred + if len(source_ids) < len(full_source_ids): + truncation_info = f"{limit_method}:{len(source_ids)}/{len(full_source_ids)}" + + if dd_message or truncation_info: + status_message += f" ({', '.join([truncation_info, dd_message])})" if already_fragment > 0 or llm_was_used: logger.info(status_message) @@ -1372,12 +1638,67 @@ async def _merge_nodes_then_upsert( logger.error(f"Entity {entity_name} has no description") description = "(no description)" - merged_source_ids: set = set([dp["source_id"] for dp in nodes_data] + already_source_ids) - - source_ids = truncate_entity_source_id(merged_source_ids, entity_name, global_config) source_id = GRAPH_FIELD_SEP.join(source_ids) - file_path = build_file_path(already_file_paths, nodes_data, entity_name) + # Build file_path with count limit + if skip_summary_due_to_limit: + # Skip limit, keep original file_path + file_path = GRAPH_FIELD_SEP.join(fp for fp in already_file_paths if fp) + else: + # Collect and apply limit + file_paths_list = [] + seen_paths = set() + + # Get placeholder to filter it out + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER + ) + + # Collect from already_file_paths, excluding placeholder + for fp in already_file_paths: + # Skip placeholders (format: "...{placeholder}(showing X of Y)...") + if ( + fp + and not fp.startswith(f"...{file_path_placeholder}") + and fp not in seen_paths + ): + file_paths_list.append(fp) + seen_paths.add(fp) + + # Collect from new data + for dp in nodes_data: + file_path_item = dp.get("file_path") + if file_path_item and file_path_item not in seen_paths: + file_paths_list.append(file_path_item) + seen_paths.add(file_path_item) + + # Apply count limit + max_file_paths = global_config.get("max_file_paths") + + if len(file_paths_list) > max_file_paths: + limit_method = global_config.get( + "source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP + ) + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER + ) + original_count = len(file_paths_list) + + if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: + # FIFO: keep tail (newest), discard head + file_paths_list = file_paths_list[-max_file_paths:] + else: + # KEEP: keep head (earliest), discard tail + file_paths_list = file_paths_list[:max_file_paths] + + file_paths_list.append( + f"...{file_path_placeholder}(showing {max_file_paths} of {original_count})..." + ) + logger.info( + f"Limited `{entity_name}`: file_path {original_count} -> {max_file_paths} ({limit_method})" + ) + + file_path = GRAPH_FIELD_SEP.join(file_paths_list) node_data = dict( entity_id=entity_name, @@ -1386,6 +1707,7 @@ async def _merge_nodes_then_upsert( source_id=source_id, file_path=file_path, created_at=int(time.time()), + truncate=truncation_info, ) await knowledge_graph_inst.upsert_node( entity_name, @@ -1405,6 +1727,7 @@ async def _merge_edges_then_upsert( pipeline_status_lock=None, llm_response_cache: BaseKVStorage | None = None, added_entities: list = None, # New parameter to track entities added during edge processing + relation_chunks_storage: BaseKVStorage | None = None, ): if src_id == tgt_id: return None @@ -1448,16 +1771,85 @@ async def _merge_edges_then_upsert( ) ) + original_edges_count = len(edges_data) + + new_source_ids = [dp["source_id"] for dp in edges_data if dp.get("source_id")] + + storage_key = make_relation_chunk_key(src_id, tgt_id) + existing_full_source_ids = [] + if relation_chunks_storage is not None: + stored_chunks = await relation_chunks_storage.get_by_id(storage_key) + if stored_chunks and isinstance(stored_chunks, dict): + existing_full_source_ids = [ + chunk_id for chunk_id in stored_chunks.get("chunk_ids", []) if chunk_id + ] + + if not existing_full_source_ids: + existing_full_source_ids = [ + chunk_id for chunk_id in already_source_ids if chunk_id + ] + + full_source_ids = merge_source_ids(existing_full_source_ids, new_source_ids) + + if relation_chunks_storage is not None and full_source_ids: + await relation_chunks_storage.upsert( + { + storage_key: { + "chunk_ids": full_source_ids, + "count": len(full_source_ids), + } + } + ) + + limit_method = global_config.get("source_ids_limit_method") + max_source_limit = global_config.get("max_source_ids_per_relation") + source_ids = apply_source_ids_limit( + full_source_ids, + max_source_limit, + limit_method, + identifier=f"`{src_id}`~`{tgt_id}`", + ) + limit_method = ( + global_config.get("source_ids_limit_method") or SOURCE_IDS_LIMIT_METHOD_KEEP + ) + + # Only apply filtering in IGNORE_NEW mode + if limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP: + allowed_source_ids = set(source_ids) + filtered_edges = [] + for dp in edges_data: + source_id = dp.get("source_id") + # Skip relationship fragments sourced from chunks dropped by the IGNORE_NEW cap + if ( + source_id + and source_id not in allowed_source_ids + and source_id not in existing_full_source_ids + ): + continue + filtered_edges.append(dp) + edges_data = filtered_edges + else: + # In FIFO mode, keep all edge descriptions - truncation happens at source_ids level only + edges_data = list(edges_data) + + skip_summary_due_to_limit = ( + limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP + and len(existing_full_source_ids) >= max_source_limit + and not edges_data + and already_description + ) + # Process edges_data with None checks weight = sum([dp["weight"] for dp in edges_data] + already_weights) # Deduplicate by description, keeping first occurrence unique_edges = {} for dp in edges_data: - if dp.get("description"): - desc = dp["description"] - if desc not in unique_edges: - unique_edges[desc] = dp + description_value = dp.get("description") + if not description_value: + continue + if description_value not in unique_edges: + unique_edges[description_value] = dp # Sort description by timestamp, then by description length (largest to smallest) when timestamps are the same sorted_edges = sorted( @@ -1466,17 +1858,34 @@ async def _merge_edges_then_upsert( ) sorted_descriptions = [dp["description"] for dp in sorted_edges] + truncation_info = "" + dd_message = "" + # Combine already_description with sorted new descriptions description_list = already_description + sorted_descriptions + deduplicated_num = original_edges_count - len(sorted_descriptions) + if deduplicated_num > 0: + dd_message = f"dd:{deduplicated_num}" num_fragment = len(description_list) already_fragment = len(already_description) - deduplicated_num = already_fragment + len(edges_data) - num_fragment - if deduplicated_num > 0: - dd_message = f"(dd:{deduplicated_num})" - else: - dd_message = "" - if num_fragment > 0: + + if skip_summary_due_to_limit: + description = ( + already_edge.get("description", "(no description)") + if already_edge + else "(no description)" + ) + llm_was_used = False + status_message = ( + f"Skip merge for `{src_id}`~`{tgt_id}`: IGNORE_NEW limit reached" + ) + logger.debug(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) + elif num_fragment > 0: # Get summary and LLM usage status description, llm_was_used = await _handle_entity_relation_summary( "Relation", @@ -1489,9 +1898,16 @@ async def _merge_edges_then_upsert( # Log based on actual LLM usage if llm_was_used: - status_message = f"LLMmrg: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}{dd_message}" + status_message = f"LLMmrg: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}" else: - status_message = f"Merged: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}{dd_message}" + status_message = f"Merged: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}" + + # Add truncation info from apply_source_ids_limit if truncation occurred + if len(source_ids) < len(full_source_ids): + truncation_info = f"{limit_method}:{len(source_ids)}/{len(full_source_ids)}" + + if dd_message or truncation_info: + status_message += f" ({', '.join([truncation_info, dd_message])})" if already_fragment > 0 or llm_was_used: logger.info(status_message) @@ -1521,13 +1937,67 @@ async def _merge_edges_then_upsert( # Join all unique keywords with commas keywords = ",".join(sorted(all_keywords)) - source_id = GRAPH_FIELD_SEP.join( - set( - [dp["source_id"] for dp in edges_data if dp.get("source_id")] - + already_source_ids + source_id = GRAPH_FIELD_SEP.join(source_ids) + + # Build file_path with count limit + if skip_summary_due_to_limit: + # Skip limit, keep original file_path + file_path = GRAPH_FIELD_SEP.join(fp for fp in already_file_paths if fp) + else: + # Collect and apply limit + file_paths_list = [] + seen_paths = set() + + # Get placeholder to filter it out + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER ) - ) - file_path = build_file_path(already_file_paths, edges_data, f"{src_id}-{tgt_id}") + + # Collect from already_file_paths, excluding placeholder + for fp in already_file_paths: + # Skip placeholders (format: "...{placeholder}(showing X of Y)...") + if ( + fp + and not fp.startswith(f"...{file_path_placeholder}") + and fp not in seen_paths + ): + file_paths_list.append(fp) + seen_paths.add(fp) + + # Collect from new data + for dp in edges_data: + file_path_item = dp.get("file_path") + if file_path_item and file_path_item not in seen_paths: + file_paths_list.append(file_path_item) + seen_paths.add(file_path_item) + + # Apply count limit + max_file_paths = global_config.get("max_file_paths") + + if len(file_paths_list) > max_file_paths: + limit_method = global_config.get( + "source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP + ) + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER + ) + original_count = len(file_paths_list) + + if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: + # FIFO: keep tail (newest), discard head + file_paths_list = file_paths_list[-max_file_paths:] + else: + # KEEP: keep head (earliest), discard tail + file_paths_list = file_paths_list[:max_file_paths] + + file_paths_list.append( + f"...{file_path_placeholder}(showing {max_file_paths} of {original_count})..." + ) + logger.info( + f"Limited `{src_id}`~`{tgt_id}`: file_path {original_count} -> {max_file_paths} ({limit_method})" + ) + + file_path = GRAPH_FIELD_SEP.join(file_paths_list) for need_insert_id in [src_id, tgt_id]: if not (await knowledge_graph_inst.has_node(need_insert_id)): @@ -1538,6 +2008,7 @@ async def _merge_edges_then_upsert( "entity_type": "UNKNOWN", "file_path": file_path, "created_at": int(time.time()), + "truncate": "", } await knowledge_graph_inst.upsert_node(need_insert_id, node_data=node_data) @@ -1563,6 +2034,7 @@ async def _merge_edges_then_upsert( source_id=source_id, file_path=file_path, created_at=int(time.time()), + truncate=truncation_info, ), ) @@ -1574,6 +2046,7 @@ async def _merge_edges_then_upsert( source_id=source_id, file_path=file_path, created_at=int(time.time()), + truncate=truncation_info, ) return edge_data @@ -1591,6 +2064,8 @@ async def merge_nodes_and_edges( pipeline_status: dict = None, pipeline_status_lock=None, llm_response_cache: BaseKVStorage | None = None, + entity_chunks_storage: BaseKVStorage | None = None, + relation_chunks_storage: BaseKVStorage | None = None, current_file_number: int = 0, total_files: int = 0, file_path: str = "unknown_source", @@ -1614,6 +2089,8 @@ async def merge_nodes_and_edges( pipeline_status: Pipeline status dictionary pipeline_status_lock: Lock for pipeline status llm_response_cache: LLM response cache + entity_chunks_storage: Storage tracking full chunk lists per entity + relation_chunks_storage: Storage tracking full chunk lists per relation current_file_number: Current file number for logging total_files: Total files for logging file_path: File path for logging @@ -1671,13 +2148,14 @@ async def merge_nodes_and_edges( pipeline_status, pipeline_status_lock, llm_response_cache, + entity_chunks_storage, ) # Vector database operation (equally critical, must succeed) if entity_vdb is not None and entity_data: data_for_vdb = { compute_mdhash_id( - entity_data["entity_name"], prefix="ent-" + str(entity_data["entity_name"]), prefix="ent-" ): { "entity_name": entity_data["entity_name"], "entity_type": entity_data["entity_type"], @@ -1689,7 +2167,6 @@ async def merge_nodes_and_edges( } } - logger.debug(f"Inserting {entity_name} in Graph") # Use safe operation wrapper - VDB failure must throw exception await safe_vdb_operation_with_exception( @@ -1804,6 +2281,7 @@ async def merge_nodes_and_edges( pipeline_status_lock, llm_response_cache, added_entities, # Pass list to collect added entities + relation_chunks_storage, ) if edge_data is None: @@ -3338,7 +3816,11 @@ async def _build_query_context( query_embedding=search_result["query_embedding"], ) - if not merged_chunks: + if ( + not merged_chunks + and not truncation_result["entities_context"] + and not truncation_result["relations_context"] + ): return None # Stage 4: Build final LLM context with dynamic token processing diff --git a/lightrag/utils.py b/lightrag/utils.py index 959607e5..bfa3cac4 100644 --- a/lightrag/utils.py +++ b/lightrag/utils.py @@ -15,7 +15,17 @@ from dataclasses import dataclass from datetime import datetime from functools import wraps from hashlib import md5 -from typing import Any, Protocol, Callable, TYPE_CHECKING, List, Optional +from typing import ( + Any, + Protocol, + Callable, + TYPE_CHECKING, + List, + Optional, + Iterable, + Sequence, + Collection, +) import numpy as np from dotenv import load_dotenv @@ -25,7 +35,9 @@ from lightrag.constants import ( DEFAULT_LOG_FILENAME, GRAPH_FIELD_SEP, DEFAULT_MAX_TOTAL_TOKENS, - DEFAULT_MAX_FILE_PATH_LENGTH, + DEFAULT_SOURCE_IDS_LIMIT_METHOD, + VALID_SOURCE_IDS_LIMIT_METHODS, + SOURCE_IDS_LIMIT_METHOD_FIFO, ) # Initialize logger with basic configuration @@ -2464,82 +2476,111 @@ async def process_chunks_unified( return final_chunks -def truncate_entity_source_id(chunk_ids: set, entity_name: str, global_config: dict) -> set: - """Limit chunk_ids, for entities that appear a HUGE no of times (To not break VDB hard upper limits)""" - already_len: int = len(chunk_ids) - max_chunk_ids_per_entity = global_config["max_source_ids_per_entity"] +def normalize_source_ids_limit_method(method: str | None) -> str: + """Normalize the source ID limiting strategy and fall back to default when invalid.""" - if already_len <= max_chunk_ids_per_entity: - return chunk_ids + if not method: + return DEFAULT_SOURCE_IDS_LIMIT_METHOD - logger.warning( - f"Source Ids already exceeds {max_chunk_ids_per_entity } for {entity_name}, " - f"current size: {already_len}, truncating..." - ) - - truncated_chunk_ids = set(list(chunk_ids)[0:max_chunk_ids_per_entity ]) - return truncated_chunk_ids - - - -def build_file_path(already_file_paths, data_list, target): - """Build file path string with UTF-8 byte length limit and deduplication - - Args: - already_file_paths: List of existing file paths - data_list: List of data items containing file_path - target: Target name for logging warnings - - Returns: - str: Combined file paths separated by GRAPH_FIELD_SEP - """ - # set: deduplication - file_paths_set = {fp for fp in already_file_paths if fp} - - # string: filter empty value and keep file order in already_file_paths - file_paths = GRAPH_FIELD_SEP.join(fp for fp in already_file_paths if fp) - - # Check if initial file_paths already exceeds byte length limit - if len(file_paths.encode("utf-8")) >= DEFAULT_MAX_FILE_PATH_LENGTH: + normalized = method.upper() + if normalized not in VALID_SOURCE_IDS_LIMIT_METHODS: logger.warning( - f"Initial file_paths already exceeds {DEFAULT_MAX_FILE_PATH_LENGTH} bytes for {target}, " - f"current size: {len(file_paths.encode('utf-8'))} bytes" + "Unknown SOURCE_IDS_LIMIT_METHOD '%s', falling back to %s", + method, + DEFAULT_SOURCE_IDS_LIMIT_METHOD, + ) + return DEFAULT_SOURCE_IDS_LIMIT_METHOD + + return normalized + + +def merge_source_ids( + existing_ids: Iterable[str] | None, new_ids: Iterable[str] | None +) -> list[str]: + """Merge two iterables of source IDs while preserving order and removing duplicates.""" + + merged: list[str] = [] + seen: set[str] = set() + + for sequence in (existing_ids, new_ids): + if not sequence: + continue + for source_id in sequence: + if not source_id: + continue + if source_id not in seen: + seen.add(source_id) + merged.append(source_id) + + return merged + + +def apply_source_ids_limit( + source_ids: Sequence[str], + limit: int, + method: str, + *, + identifier: str | None = None, +) -> list[str]: + """Apply a limit strategy to a sequence of source IDs.""" + + if limit <= 0: + return [] + + source_ids_list = list(source_ids) + if len(source_ids_list) <= limit: + return source_ids_list + + normalized_method = normalize_source_ids_limit_method(method) + + if normalized_method == SOURCE_IDS_LIMIT_METHOD_FIFO: + truncated = source_ids_list[-limit:] + else: # IGNORE_NEW + truncated = source_ids_list[:limit] + + if identifier and len(truncated) < len(source_ids_list): + logger.debug( + "Source_id truncated: %s | %s keeping %s of %s entries", + identifier, + normalized_method, + len(truncated), + len(source_ids_list), ) - # ignored file_paths - file_paths_ignore = "" - # add file_paths - for dp in data_list: - cur_file_path = dp.get("file_path") - # empty - if not cur_file_path: - continue + return truncated - # skip duplicate item - if cur_file_path in file_paths_set: - continue - # add - file_paths_set.add(cur_file_path) - # check the UTF-8 byte length - new_addition = GRAPH_FIELD_SEP + cur_file_path if file_paths else cur_file_path - if ( - len(file_paths.encode("utf-8")) + len(new_addition.encode("utf-8")) - < DEFAULT_MAX_FILE_PATH_LENGTH - 5 - ): - # append - file_paths += new_addition - else: - # ignore - file_paths_ignore += GRAPH_FIELD_SEP + cur_file_path +def subtract_source_ids( + source_ids: Iterable[str], + ids_to_remove: Collection[str], +) -> list[str]: + """Remove a collection of IDs from an ordered iterable while preserving order.""" - if file_paths_ignore: - logger.warning( - f"File paths exceed {DEFAULT_MAX_FILE_PATH_LENGTH} bytes for {target}, " - f"ignoring file path: {file_paths_ignore}" - ) - return file_paths + removal_set = set(ids_to_remove) + if not removal_set: + return [source_id for source_id in source_ids if source_id] + + return [ + source_id + for source_id in source_ids + if source_id and source_id not in removal_set + ] + + +def make_relation_chunk_key(src: str, tgt: str) -> str: + """Create a deterministic storage key for relation chunk tracking.""" + + return GRAPH_FIELD_SEP.join(sorted((src, tgt))) + + +def parse_relation_chunk_key(key: str) -> tuple[str, str]: + """Parse a relation chunk storage key back into its entity pair.""" + + parts = key.split(GRAPH_FIELD_SEP) + if len(parts) != 2: + raise ValueError(f"Invalid relation chunk key: {key}") + return parts[0], parts[1] def generate_track_id(prefix: str = "upload") -> str: