From 2ea1fccf1a1b73568e2dae2029cf82ef7a80accd Mon Sep 17 00:00:00 2001 From: yangdx Date: Tue, 21 Oct 2025 04:41:15 +0800 Subject: [PATCH] Refactor deduplication calculation and remove unused variables (cherry picked from commit 1154c5683fd3b9b3c378b0dea3f1791bcf7b09cf) --- lightrag/operate.py | 858 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 688 insertions(+), 170 deletions(-) diff --git a/lightrag/operate.py b/lightrag/operate.py index 29a17e68..325bea25 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -7,7 +7,7 @@ import json_repair from typing import Any, AsyncIterator, overload, Literal from collections import Counter, defaultdict -from .utils import ( +from lightrag.utils import ( logger, compute_mdhash_id, Tokenizer, @@ -26,15 +26,16 @@ from .utils import ( pick_by_weighted_polling, pick_by_vector_similarity, process_chunks_unified, - build_file_path, - truncate_entity_source_id, safe_vdb_operation_with_exception, create_prefixed_exception, fix_tuple_delimiter_corruption, convert_to_user_format, generate_reference_list_from_chunks, + apply_source_ids_limit, + merge_source_ids, + make_relation_chunk_key, ) -from .base import ( +from lightrag.base import ( BaseGraphStorage, BaseKVStorage, BaseVectorStorage, @@ -43,8 +44,8 @@ from .base import ( QueryResult, QueryContextResult, ) -from .prompt import PROMPTS -from .constants import ( +from lightrag.prompt import PROMPTS +from lightrag.constants import ( GRAPH_FIELD_SEP, DEFAULT_MAX_ENTITY_TOKENS, DEFAULT_MAX_RELATION_TOKENS, @@ -53,9 +54,11 @@ from .constants import ( DEFAULT_KG_CHUNK_PICK_METHOD, DEFAULT_ENTITY_TYPES, DEFAULT_SUMMARY_LANGUAGE, - DEFAULT_MAX_CHUNK_IDS_PER_ENTITY, + SOURCE_IDS_LIMIT_METHOD_KEEP, + SOURCE_IDS_LIMIT_METHOD_FIFO, + DEFAULT_FILE_PATH_MORE_PLACEHOLDER, ) -from .kg.shared_storage import get_storage_keyed_lock +from lightrag.kg.shared_storage import get_storage_keyed_lock import time from dotenv import load_dotenv @@ -475,8 +478,8 @@ async def _handle_single_relationship_extraction( async def _rebuild_knowledge_from_chunks( - entities_to_rebuild: dict[str, set[str]], - relationships_to_rebuild: dict[tuple[str, str], set[str]], + entities_to_rebuild: dict[str, list[str]], + relationships_to_rebuild: dict[tuple[str, str], list[str]], knowledge_graph_inst: BaseGraphStorage, entities_vdb: BaseVectorStorage, relationships_vdb: BaseVectorStorage, @@ -485,6 +488,8 @@ async def _rebuild_knowledge_from_chunks( global_config: dict[str, str], pipeline_status: dict | None = None, pipeline_status_lock=None, + entity_chunks_storage: BaseKVStorage | None = None, + relation_chunks_storage: BaseKVStorage | None = None, ) -> None: """Rebuild entity and relationship descriptions from cached extraction results with parallel processing @@ -493,8 +498,8 @@ async def _rebuild_knowledge_from_chunks( controlled by llm_model_max_async and using get_storage_keyed_lock for data consistency. Args: - entities_to_rebuild: Dict mapping entity_name -> set of remaining chunk_ids - relationships_to_rebuild: Dict mapping (src, tgt) -> set of remaining chunk_ids + entities_to_rebuild: Dict mapping entity_name -> list of remaining chunk_ids + relationships_to_rebuild: Dict mapping (src, tgt) -> list of remaining chunk_ids knowledge_graph_inst: Knowledge graph storage entities_vdb: Entity vector database relationships_vdb: Relationship vector database @@ -503,6 +508,8 @@ async def _rebuild_knowledge_from_chunks( global_config: Global configuration containing llm_model_max_async pipeline_status: Pipeline status dictionary pipeline_status_lock: Lock for pipeline status + entity_chunks_storage: KV storage maintaining full chunk IDs per entity + relation_chunks_storage: KV storage maintaining full chunk IDs per relation """ if not entities_to_rebuild and not relationships_to_rebuild: return @@ -642,10 +649,11 @@ async def _rebuild_knowledge_from_chunks( chunk_entities=chunk_entities, llm_response_cache=llm_response_cache, global_config=global_config, + entity_chunks_storage=entity_chunks_storage, ) rebuilt_entities_count += 1 status_message = ( - f"Rebuilt `{entity_name}` from {len(chunk_ids)} chunks" + f"Rebuild `{entity_name}` from {len(chunk_ids)} chunks" ) logger.info(status_message) if pipeline_status is not None and pipeline_status_lock is not None: @@ -683,16 +691,11 @@ async def _rebuild_knowledge_from_chunks( chunk_relationships=chunk_relationships, llm_response_cache=llm_response_cache, global_config=global_config, + relation_chunks_storage=relation_chunks_storage, + pipeline_status=pipeline_status, + pipeline_status_lock=pipeline_status_lock, ) rebuilt_relationships_count += 1 - status_message = ( - f"Rebuilt `{src} - {tgt}` from {len(chunk_ids)} chunks" - ) - logger.info(status_message) - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) except Exception as e: failed_relationships_count += 1 status_message = f"Failed to rebuild `{src} - {tgt}`: {e}" @@ -1003,10 +1006,13 @@ async def _rebuild_single_entity( knowledge_graph_inst: BaseGraphStorage, entities_vdb: BaseVectorStorage, entity_name: str, - chunk_ids: set[str], + chunk_ids: list[str], chunk_entities: dict, llm_response_cache: BaseKVStorage, global_config: dict[str, str], + entity_chunks_storage: BaseKVStorage | None = None, + pipeline_status: dict | None = None, + pipeline_status_lock=None, ) -> None: """Rebuild a single entity from cached extraction results""" @@ -1017,7 +1023,11 @@ async def _rebuild_single_entity( # Helper function to update entity in both graph and vector storage async def _update_entity_storage( - final_description: str, entity_type: str, file_paths: set[str] + final_description: str, + entity_type: str, + file_paths: set[str], + source_chunk_ids: list[str], + truncation_info: str = "", ): try: # Update entity in graph storage (critical path) @@ -1025,10 +1035,12 @@ async def _rebuild_single_entity( **current_entity, "description": final_description, "entity_type": entity_type, - "source_id": GRAPH_FIELD_SEP.join(chunk_ids), + "source_id": GRAPH_FIELD_SEP.join(source_chunk_ids), "file_path": GRAPH_FIELD_SEP.join(file_paths) if file_paths else current_entity.get("file_path", "unknown_source"), + "created_at": int(time.time()), + "truncate": truncation_info, } await knowledge_graph_inst.upsert_node(entity_name, updated_entity_data) @@ -1061,9 +1073,33 @@ async def _rebuild_single_entity( logger.error(error_msg) raise # Re-raise exception - # Collect all entity data from relevant chunks + # normalized_chunk_ids = merge_source_ids([], chunk_ids) + normalized_chunk_ids = chunk_ids + + if entity_chunks_storage is not None and normalized_chunk_ids: + await entity_chunks_storage.upsert( + { + entity_name: { + "chunk_ids": normalized_chunk_ids, + "count": len(normalized_chunk_ids), + } + } + ) + + limit_method = ( + global_config.get("source_ids_limit_method") or SOURCE_IDS_LIMIT_METHOD_KEEP + ) + + limited_chunk_ids = apply_source_ids_limit( + normalized_chunk_ids, + global_config["max_source_ids_per_entity"], + limit_method, + identifier=f"`{entity_name}`", + ) + + # Collect all entity data from relevant (limited) chunks all_entity_data = [] - for chunk_id in chunk_ids: + for chunk_id in limited_chunk_ids: if chunk_id in chunk_entities and entity_name in chunk_entities[chunk_id]: all_entity_data.extend(chunk_entities[chunk_id][entity_name]) @@ -1110,13 +1146,19 @@ async def _rebuild_single_entity( final_description = current_entity.get("description", "") entity_type = current_entity.get("entity_type", "UNKNOWN") - await _update_entity_storage(final_description, entity_type, file_paths) + await _update_entity_storage( + final_description, + entity_type, + file_paths, + limited_chunk_ids, + ) return # Process cached entity data descriptions = [] entity_types = [] - file_paths = set() + file_paths_list = [] + seen_paths = set() for entity_data in all_entity_data: if entity_data.get("description"): @@ -1124,7 +1166,35 @@ async def _rebuild_single_entity( if entity_data.get("entity_type"): entity_types.append(entity_data["entity_type"]) if entity_data.get("file_path"): - file_paths.add(entity_data["file_path"]) + file_path = entity_data["file_path"] + if file_path and file_path not in seen_paths: + file_paths_list.append(file_path) + seen_paths.add(file_path) + + # Apply MAX_FILE_PATHS limit + max_file_paths = global_config.get("max_file_paths") + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER + ) + limit_method = global_config.get("source_ids_limit_method") + + original_count = len(file_paths_list) + if original_count > max_file_paths: + if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: + # FIFO: keep tail (newest), discard head + file_paths_list = file_paths_list[-max_file_paths:] + else: + # KEEP: keep head (earliest), discard tail + file_paths_list = file_paths_list[:max_file_paths] + + file_paths_list.append( + f"...{file_path_placeholder}({limit_method} {max_file_paths}/{original_count})..." + ) + logger.info( + f"Limited `{entity_name}`: file_path {original_count} -> {max_file_paths} ({limit_method})" + ) + + file_paths = set(file_paths_list) # Remove duplicates while preserving order description_list = list(dict.fromkeys(descriptions)) @@ -1150,7 +1220,31 @@ async def _rebuild_single_entity( else: final_description = current_entity.get("description", "") - await _update_entity_storage(final_description, entity_type, file_paths) + if len(limited_chunk_ids) < len(normalized_chunk_ids): + truncation_info = ( + f"{limit_method}:{len(limited_chunk_ids)}/{len(normalized_chunk_ids)}" + ) + else: + truncation_info = "" + + await _update_entity_storage( + final_description, + entity_type, + file_paths, + limited_chunk_ids, + truncation_info, + ) + + # Log rebuild completion with truncation info + status_message = f"Rebuild `{entity_name}` from {len(chunk_ids)} chunks" + if truncation_info: + status_message += f" ({truncation_info})" + logger.info(status_message) + # Update pipeline status + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) async def _rebuild_single_relationship( @@ -1158,10 +1252,13 @@ async def _rebuild_single_relationship( relationships_vdb: BaseVectorStorage, src: str, tgt: str, - chunk_ids: set[str], + chunk_ids: list[str], chunk_relationships: dict, llm_response_cache: BaseKVStorage, global_config: dict[str, str], + relation_chunks_storage: BaseKVStorage | None = None, + pipeline_status: dict | None = None, + pipeline_status_lock=None, ) -> None: """Rebuild a single relationship from cached extraction results @@ -1174,9 +1271,33 @@ async def _rebuild_single_relationship( if not current_relationship: return + # normalized_chunk_ids = merge_source_ids([], chunk_ids) + normalized_chunk_ids = chunk_ids + + if relation_chunks_storage is not None and normalized_chunk_ids: + storage_key = make_relation_chunk_key(src, tgt) + await relation_chunks_storage.upsert( + { + storage_key: { + "chunk_ids": normalized_chunk_ids, + "count": len(normalized_chunk_ids), + } + } + ) + + limit_method = ( + global_config.get("source_ids_limit_method") or SOURCE_IDS_LIMIT_METHOD_KEEP + ) + limited_chunk_ids = apply_source_ids_limit( + normalized_chunk_ids, + global_config["max_source_ids_per_relation"], + limit_method, + identifier=f"`{src}`~`{tgt}`", + ) + # Collect all relationship data from relevant chunks all_relationship_data = [] - for chunk_id in chunk_ids: + for chunk_id in limited_chunk_ids: if chunk_id in chunk_relationships: # Check both (src, tgt) and (tgt, src) since relationships can be bidirectional for edge_key in [(src, tgt), (tgt, src)]: @@ -1193,7 +1314,8 @@ async def _rebuild_single_relationship( descriptions = [] keywords = [] weights = [] - file_paths = set() + file_paths_list = [] + seen_paths = set() for rel_data in all_relationship_data: if rel_data.get("description"): @@ -1203,7 +1325,35 @@ async def _rebuild_single_relationship( if rel_data.get("weight"): weights.append(rel_data["weight"]) if rel_data.get("file_path"): - file_paths.add(rel_data["file_path"]) + file_path = rel_data["file_path"] + if file_path and file_path not in seen_paths: + file_paths_list.append(file_path) + seen_paths.add(file_path) + + # Apply count limit + max_file_paths = global_config.get("max_file_paths") + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER + ) + limit_method = global_config.get("source_ids_limit_method") + + original_count = len(file_paths_list) + if original_count > max_file_paths: + if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: + # FIFO: keep tail (newest), discard head + file_paths_list = file_paths_list[-max_file_paths:] + else: + # KEEP: keep head (earliest), discard tail + file_paths_list = file_paths_list[:max_file_paths] + + file_paths_list.append( + f"...{file_path_placeholder}({limit_method} {max_file_paths}/{original_count})..." + ) + logger.info( + f"Limited `{src}`~`{tgt}`: file_path {original_count} -> {max_file_paths} ({limit_method})" + ) + + file_paths = set(file_paths_list) # Remove duplicates while preserving order description_list = list(dict.fromkeys(descriptions)) @@ -1231,6 +1381,13 @@ async def _rebuild_single_relationship( # fallback to keep current(unchanged) final_description = current_relationship.get("description", "") + if len(limited_chunk_ids) < len(normalized_chunk_ids): + truncation_info = ( + f"{limit_method}:{len(limited_chunk_ids)}/{len(normalized_chunk_ids)}" + ) + else: + truncation_info = "" + # Update relationship in graph storage updated_relationship_data = { **current_relationship, @@ -1239,10 +1396,11 @@ async def _rebuild_single_relationship( else current_relationship.get("description", ""), "keywords": combined_keywords, "weight": weight, - "source_id": GRAPH_FIELD_SEP.join(chunk_ids), + "source_id": GRAPH_FIELD_SEP.join(limited_chunk_ids), "file_path": GRAPH_FIELD_SEP.join([fp for fp in file_paths if fp]) if file_paths else current_relationship.get("file_path", "unknown_source"), + "truncate": truncation_info, } await knowledge_graph_inst.upsert_edge(src, tgt, updated_relationship_data) @@ -1288,15 +1446,36 @@ async def _rebuild_single_relationship( logger.error(error_msg) raise # Re-raise exception + # Log rebuild completion with truncation info + status_message = f"Rebuild `{src} - {tgt}` from {len(chunk_ids)} chunks" + if truncation_info: + status_message += f" ({truncation_info})" + # Add truncation info from apply_source_ids_limit if truncation occurred + if len(limited_chunk_ids) < len(normalized_chunk_ids): + truncation_info = ( + f" ({limit_method}:{len(limited_chunk_ids)}/{len(normalized_chunk_ids)})" + ) + status_message += truncation_info + + logger.info(status_message) + + # Update pipeline status + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) + async def _merge_nodes_then_upsert( entity_name: str, nodes_data: list[dict], knowledge_graph_inst: BaseGraphStorage, + entity_vdb: BaseVectorStorage | None, global_config: dict, pipeline_status: dict = None, pipeline_status_lock=None, llm_response_cache: BaseKVStorage | None = None, + entity_chunks_storage: BaseKVStorage | None = None, ): """Get existing nodes from knowledge graph use name,if exists, merge data, else create, then upsert.""" already_entity_types = [] @@ -1319,10 +1498,74 @@ async def _merge_nodes_then_upsert( reverse=True, )[0][0] # Get the entity type with the highest count + new_source_ids = [dp["source_id"] for dp in nodes_data if dp.get("source_id")] + + existing_full_source_ids = [] + if entity_chunks_storage is not None: + stored_chunks = await entity_chunks_storage.get_by_id(entity_name) + if stored_chunks and isinstance(stored_chunks, dict): + existing_full_source_ids = [ + chunk_id for chunk_id in stored_chunks.get("chunk_ids", []) if chunk_id + ] + + if not existing_full_source_ids: + existing_full_source_ids = [ + chunk_id for chunk_id in already_source_ids if chunk_id + ] + + full_source_ids = merge_source_ids(existing_full_source_ids, new_source_ids) + + if entity_chunks_storage is not None and full_source_ids: + await entity_chunks_storage.upsert( + { + entity_name: { + "chunk_ids": full_source_ids, + "count": len(full_source_ids), + } + } + ) + + limit_method = global_config.get("source_ids_limit_method") + max_source_limit = global_config.get("max_source_ids_per_entity") + source_ids = apply_source_ids_limit( + full_source_ids, + max_source_limit, + limit_method, + identifier=f"`{entity_name}`", + ) + + # Only apply filtering in KEEP(ignore new) mode + if limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP: + allowed_source_ids = set(source_ids) + filtered_nodes = [] + for dp in nodes_data: + source_id = dp.get("source_id") + # Skip descriptions sourced from chunks dropped by the limitation cap + if ( + source_id + and source_id not in allowed_source_ids + and source_id not in existing_full_source_ids + ): + continue + filtered_nodes.append(dp) + nodes_data = filtered_nodes + else: + # In FIFO mode, keep all node descriptions - truncation happens at source_ids level only + nodes_data = list(nodes_data) + + skip_summary_due_to_limit = ( + limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP + and len(existing_full_source_ids) >= max_source_limit + and not nodes_data + and already_description + ) + # Deduplicate by description, keeping first occurrence unique_nodes = {} for dp in nodes_data: - desc = dp["description"] + desc = dp.get("description") + if not desc: + continue if desc not in unique_nodes: unique_nodes[desc] = dp @@ -1333,17 +1576,44 @@ async def _merge_nodes_then_upsert( ) sorted_descriptions = [dp["description"] for dp in sorted_nodes] + truncation_info = "" + dd_message = "" + has_placeholder = False # Initialize to track placeholder in file paths + # Combine already_description with sorted new sorted descriptions description_list = already_description + sorted_descriptions - num_fragment = len(description_list) already_fragment = len(already_description) deduplicated_num = already_fragment + len(nodes_data) - num_fragment if deduplicated_num > 0: - dd_message = f"(dd:{deduplicated_num})" - else: - dd_message = "" - if num_fragment > 0: + dd_message = f"dd:{deduplicated_num}" + + if skip_summary_due_to_limit: + description = ( + already_node.get("description", "(no description)") + if already_node + else "(no description)" + ) + status_message = f"Skip merge for `{entity_name}`: KEEP limit reached" + logger.debug(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) + existing_node_data = dict(already_node or {}) + if not existing_node_data: + existing_node_data = { + "entity_id": entity_name, + "entity_type": entity_type, + "description": description, + "source_id": GRAPH_FIELD_SEP.join(existing_full_source_ids), + "file_path": GRAPH_FIELD_SEP.join(already_file_paths), + "created_at": int(time.time()), + "truncate": "", + } + existing_node_data["entity_name"] = entity_name + return existing_node_data + elif num_fragment > 0: # Get summary and LLM usage status description, llm_was_used = await _handle_entity_relation_summary( "Entity", @@ -1356,9 +1626,26 @@ async def _merge_nodes_then_upsert( # Log based on actual LLM usage if llm_was_used: - status_message = f"LLMmrg: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}{dd_message}" + status_message = f"LLMmrg: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}" else: - status_message = f"Merged: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}{dd_message}" + status_message = f"Merged: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}" + + # Add truncation info from apply_source_ids_limit if truncation occurred + if len(source_ids) < len(full_source_ids): + # Add + sign if has_placeholder is True, indicating actual file count is higher + full_source_count_str = ( + f"{len(full_source_ids)}+" + if has_placeholder + else str(len(full_source_ids)) + ) + truncation_info = ( + f"{limit_method}:{len(source_ids)}/{full_source_count_str}" + ) + + if dd_message or truncation_info: + status_message += ( + f" ({', '.join(filter(None, [truncation_info, dd_message]))})" + ) if already_fragment > 0 or llm_was_used: logger.info(status_message) @@ -1373,12 +1660,68 @@ async def _merge_nodes_then_upsert( logger.error(f"Entity {entity_name} has no description") description = "(no description)" - merged_source_ids: set = set([dp["source_id"] for dp in nodes_data] + already_source_ids) - - source_ids = truncate_entity_source_id(merged_source_ids, entity_name) source_id = GRAPH_FIELD_SEP.join(source_ids) - file_path = build_file_path(already_file_paths, nodes_data, entity_name) + # Build file_path with count limit + if skip_summary_due_to_limit: + # Skip limit, keep original file_path + file_path = GRAPH_FIELD_SEP.join(fp for fp in already_file_paths if fp) + else: + # Collect and apply limit + file_paths_list = [] + seen_paths = set() + has_placeholder = False # Track if already_file_paths contains placeholder + + # Get placeholder to filter it out + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER + ) + + # Collect from already_file_paths, excluding placeholder + for fp in already_file_paths: + # Check if this is a placeholder record + if fp and fp.startswith(f"...{file_path_placeholder}"): + has_placeholder = True + continue + # Skip placeholders (format: "...{placeholder}(showing X of Y)...") + if fp and fp not in seen_paths: + file_paths_list.append(fp) + seen_paths.add(fp) + + # Collect from new data + for dp in nodes_data: + file_path_item = dp.get("file_path") + if file_path_item and file_path_item not in seen_paths: + file_paths_list.append(file_path_item) + seen_paths.add(file_path_item) + + # Apply count limit + max_file_paths = global_config.get("max_file_paths") + + if len(file_paths_list) > max_file_paths: + limit_method = global_config.get( + "source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP + ) + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER + ) + original_count = len(file_paths_list) + + if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: + # FIFO: keep tail (newest), discard head + file_paths_list = file_paths_list[-max_file_paths:] + else: + # KEEP: keep head (earliest), discard tail + file_paths_list = file_paths_list[:max_file_paths] + + file_paths_list.append( + f"...{file_path_placeholder}({limit_method} {max_file_paths}/{original_count})..." + ) + logger.info( + f"Limited `{entity_name}`: file_path {original_count} -> {max_file_paths} ({limit_method})" + ) + + file_path = GRAPH_FIELD_SEP.join(file_paths_list) node_data = dict( entity_id=entity_name, @@ -1387,12 +1730,32 @@ async def _merge_nodes_then_upsert( source_id=source_id, file_path=file_path, created_at=int(time.time()), + truncate=truncation_info, ) await knowledge_graph_inst.upsert_node( entity_name, node_data=node_data, ) node_data["entity_name"] = entity_name + if entity_vdb is not None: + entity_vdb_id = compute_mdhash_id(str(entity_name), prefix="ent-") + entity_content = f"{entity_name}\n{description}" + data_for_vdb = { + entity_vdb_id: { + "entity_name": entity_name, + "entity_type": entity_type, + "content": entity_content, + "source_id": source_id, + "file_path": file_path, + } + } + await safe_vdb_operation_with_exception( + operation=lambda payload=data_for_vdb: entity_vdb.upsert(payload), + operation_name="entity_upsert", + entity_name=entity_name, + max_retries=3, + retry_delay=0.1, + ) return node_data @@ -1401,15 +1764,19 @@ async def _merge_edges_then_upsert( tgt_id: str, edges_data: list[dict], knowledge_graph_inst: BaseGraphStorage, + relationships_vdb: BaseVectorStorage | None, + entity_vdb: BaseVectorStorage | None, global_config: dict, pipeline_status: dict = None, pipeline_status_lock=None, llm_response_cache: BaseKVStorage | None = None, added_entities: list = None, # New parameter to track entities added during edge processing + relation_chunks_storage: BaseKVStorage | None = None, ): if src_id == tgt_id: return None + already_edge = None already_weights = [] already_source_ids = [] already_description = [] @@ -1449,16 +1816,83 @@ async def _merge_edges_then_upsert( ) ) + new_source_ids = [dp["source_id"] for dp in edges_data if dp.get("source_id")] + + storage_key = make_relation_chunk_key(src_id, tgt_id) + existing_full_source_ids = [] + if relation_chunks_storage is not None: + stored_chunks = await relation_chunks_storage.get_by_id(storage_key) + if stored_chunks and isinstance(stored_chunks, dict): + existing_full_source_ids = [ + chunk_id for chunk_id in stored_chunks.get("chunk_ids", []) if chunk_id + ] + + if not existing_full_source_ids: + existing_full_source_ids = [ + chunk_id for chunk_id in already_source_ids if chunk_id + ] + + full_source_ids = merge_source_ids(existing_full_source_ids, new_source_ids) + + if relation_chunks_storage is not None and full_source_ids: + await relation_chunks_storage.upsert( + { + storage_key: { + "chunk_ids": full_source_ids, + "count": len(full_source_ids), + } + } + ) + + limit_method = global_config.get("source_ids_limit_method") + max_source_limit = global_config.get("max_source_ids_per_relation") + source_ids = apply_source_ids_limit( + full_source_ids, + max_source_limit, + limit_method, + identifier=f"`{src_id}`~`{tgt_id}`", + ) + limit_method = ( + global_config.get("source_ids_limit_method") or SOURCE_IDS_LIMIT_METHOD_KEEP + ) + + # Only apply filtering in KEEP(ignore new) mode + if limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP: + allowed_source_ids = set(source_ids) + filtered_edges = [] + for dp in edges_data: + source_id = dp.get("source_id") + # Skip relationship fragments sourced from chunks dropped by keep oldest cap + if ( + source_id + and source_id not in allowed_source_ids + and source_id not in existing_full_source_ids + ): + continue + filtered_edges.append(dp) + edges_data = filtered_edges + else: + # In FIFO mode, keep all edge descriptions - truncation happens at source_ids level only + edges_data = list(edges_data) + + skip_summary_due_to_limit = ( + limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP + and len(existing_full_source_ids) >= max_source_limit + and not edges_data + and already_description + ) + # Process edges_data with None checks weight = sum([dp["weight"] for dp in edges_data] + already_weights) # Deduplicate by description, keeping first occurrence unique_edges = {} for dp in edges_data: - if dp.get("description"): - desc = dp["description"] - if desc not in unique_edges: - unique_edges[desc] = dp + description_value = dp.get("description") + if not description_value: + continue + if description_value not in unique_edges: + unique_edges[description_value] = dp # Sort description by timestamp, then by description length (largest to smallest) when timestamps are the same sorted_edges = sorted( @@ -1467,6 +1901,10 @@ async def _merge_edges_then_upsert( ) sorted_descriptions = [dp["description"] for dp in sorted_edges] + truncation_info = "" + dd_message = "" + has_placeholder = False # Initialize to track placeholder in file paths + # Combine already_description with sorted new descriptions description_list = already_description + sorted_descriptions @@ -1474,10 +1912,36 @@ async def _merge_edges_then_upsert( already_fragment = len(already_description) deduplicated_num = already_fragment + len(edges_data) - num_fragment if deduplicated_num > 0: - dd_message = f"(dd:{deduplicated_num})" - else: - dd_message = "" - if num_fragment > 0: + dd_message = f"dd:{deduplicated_num}" + + if skip_summary_due_to_limit: + description = ( + already_edge.get("description", "(no description)") + if already_edge + else "(no description)" + ) + status_message = f"Skip merge for `{src_id}`~`{tgt_id}`: KEEP limit reached" + logger.debug(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) + existing_edge_data = dict(already_edge or {}) + if not existing_edge_data: + existing_edge_data = { + "description": description, + "keywords": GRAPH_FIELD_SEP.join(already_keywords), + "source_id": GRAPH_FIELD_SEP.join(existing_full_source_ids), + "file_path": GRAPH_FIELD_SEP.join(already_file_paths), + "weight": sum(already_weights) if already_weights else 0.0, + "truncate": "", + "created_at": int(time.time()), + } + existing_edge_data.setdefault("created_at", int(time.time())) + existing_edge_data["src_id"] = src_id + existing_edge_data["tgt_id"] = tgt_id + return existing_edge_data + elif num_fragment > 0: # Get summary and LLM usage status description, llm_was_used = await _handle_entity_relation_summary( "Relation", @@ -1490,9 +1954,26 @@ async def _merge_edges_then_upsert( # Log based on actual LLM usage if llm_was_used: - status_message = f"LLMmrg: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}{dd_message}" + status_message = f"LLMmrg: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}" else: - status_message = f"Merged: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}{dd_message}" + status_message = f"Merged: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}" + + # Add truncation info from apply_source_ids_limit if truncation occurred + if len(source_ids) < len(full_source_ids): + # Add + sign if has_placeholder is True, indicating actual file count is higher + full_source_count_str = ( + f"{len(full_source_ids)}+" + if has_placeholder + else str(len(full_source_ids)) + ) + truncation_info = ( + f"{limit_method}:{len(source_ids)}/{full_source_count_str}" + ) + + if dd_message or truncation_info: + status_message += ( + f" ({', '.join(filter(None, [truncation_info, dd_message]))})" + ) if already_fragment > 0 or llm_was_used: logger.info(status_message) @@ -1522,26 +2003,103 @@ async def _merge_edges_then_upsert( # Join all unique keywords with commas keywords = ",".join(sorted(all_keywords)) - source_id = GRAPH_FIELD_SEP.join( - set( - [dp["source_id"] for dp in edges_data if dp.get("source_id")] - + already_source_ids + source_id = GRAPH_FIELD_SEP.join(source_ids) + + # Build file_path with count limit + if skip_summary_due_to_limit: + # Skip limit, keep original file_path + file_path = GRAPH_FIELD_SEP.join(fp for fp in already_file_paths if fp) + else: + # Collect and apply limit + file_paths_list = [] + seen_paths = set() + has_placeholder = False # Track if already_file_paths contains placeholder + + # Get placeholder to filter it out + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER ) - ) - file_path = build_file_path(already_file_paths, edges_data, f"{src_id}-{tgt_id}") + + # Collect from already_file_paths, excluding placeholder + for fp in already_file_paths: + # Check if this is a placeholder record + if fp and fp.startswith(f"...{file_path_placeholder}"): + has_placeholder = True + continue + # Skip placeholders (format: "...{placeholder}(showing X of Y)...") + if fp and fp not in seen_paths: + file_paths_list.append(fp) + seen_paths.add(fp) + + # Collect from new data + for dp in edges_data: + file_path_item = dp.get("file_path") + if file_path_item and file_path_item not in seen_paths: + file_paths_list.append(file_path_item) + seen_paths.add(file_path_item) + + # Apply count limit + max_file_paths = global_config.get("max_file_paths") + + if len(file_paths_list) > max_file_paths: + limit_method = global_config.get( + "source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP + ) + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER + ) + original_count = len(file_paths_list) + + if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: + # FIFO: keep tail (newest), discard head + file_paths_list = file_paths_list[-max_file_paths:] + else: + # KEEP: keep head (earliest), discard tail + file_paths_list = file_paths_list[:max_file_paths] + + file_paths_list.append( + f"...{file_path_placeholder}({limit_method} {max_file_paths}/{original_count})..." + ) + logger.info( + f"Limited `{src_id}`~`{tgt_id}`: file_path {original_count} -> {max_file_paths} ({limit_method})" + ) + + file_path = GRAPH_FIELD_SEP.join(file_paths_list) for need_insert_id in [src_id, tgt_id]: if not (await knowledge_graph_inst.has_node(need_insert_id)): + node_created_at = int(time.time()) node_data = { "entity_id": need_insert_id, "source_id": source_id, "description": description, "entity_type": "UNKNOWN", "file_path": file_path, - "created_at": int(time.time()), + "created_at": node_created_at, + "truncate": "", } await knowledge_graph_inst.upsert_node(need_insert_id, node_data=node_data) + if entity_vdb is not None: + entity_vdb_id = compute_mdhash_id(need_insert_id, prefix="ent-") + entity_content = f"{need_insert_id}\n{description}" + vdb_data = { + entity_vdb_id: { + "content": entity_content, + "entity_name": need_insert_id, + "source_id": source_id, + "entity_type": "UNKNOWN", + "file_path": file_path, + } + } + await safe_vdb_operation_with_exception( + operation=lambda payload=vdb_data: entity_vdb.upsert(payload), + operation_name="added_entity_upsert", + entity_name=need_insert_id, + max_retries=3, + retry_delay=0.1, + ) + # Track entities added during edge processing if added_entities is not None: entity_data = { @@ -1550,10 +2108,11 @@ async def _merge_edges_then_upsert( "description": description, "source_id": source_id, "file_path": file_path, - "created_at": int(time.time()), + "created_at": node_created_at, } added_entities.append(entity_data) + edge_created_at = int(time.time()) await knowledge_graph_inst.upsert_edge( src_id, tgt_id, @@ -1563,7 +2122,8 @@ async def _merge_edges_then_upsert( keywords=keywords, source_id=source_id, file_path=file_path, - created_at=int(time.time()), + created_at=edge_created_at, + truncate=truncation_info, ), ) @@ -1574,9 +2134,41 @@ async def _merge_edges_then_upsert( keywords=keywords, source_id=source_id, file_path=file_path, - created_at=int(time.time()), + created_at=edge_created_at, + truncate=truncation_info, + weight=weight, ) + if relationships_vdb is not None: + rel_vdb_id = compute_mdhash_id(src_id + tgt_id, prefix="rel-") + rel_vdb_id_reverse = compute_mdhash_id(tgt_id + src_id, prefix="rel-") + try: + await relationships_vdb.delete([rel_vdb_id, rel_vdb_id_reverse]) + except Exception as e: + logger.debug( + f"Could not delete old relationship vector records {rel_vdb_id}, {rel_vdb_id_reverse}: {e}" + ) + rel_content = f"{keywords}\t{src_id}\n{tgt_id}\n{description}" + vdb_data = { + rel_vdb_id: { + "src_id": src_id, + "tgt_id": tgt_id, + "source_id": source_id, + "content": rel_content, + "keywords": keywords, + "description": description, + "weight": weight, + "file_path": file_path, + } + } + await safe_vdb_operation_with_exception( + operation=lambda payload=vdb_data: relationships_vdb.upsert(payload), + operation_name="relationship_upsert", + entity_name=f"{src_id}-{tgt_id}", + max_retries=3, + retry_delay=0.2, + ) + return edge_data @@ -1592,6 +2184,8 @@ async def merge_nodes_and_edges( pipeline_status: dict = None, pipeline_status_lock=None, llm_response_cache: BaseKVStorage | None = None, + entity_chunks_storage: BaseKVStorage | None = None, + relation_chunks_storage: BaseKVStorage | None = None, current_file_number: int = 0, total_files: int = 0, file_path: str = "unknown_source", @@ -1615,6 +2209,8 @@ async def merge_nodes_and_edges( pipeline_status: Pipeline status dictionary pipeline_status_lock: Lock for pipeline status llm_response_cache: LLM response cache + entity_chunks_storage: Storage tracking full chunk lists per entity + relation_chunks_storage: Storage tracking full chunk lists per relation current_file_number: Current file number for logging total_files: Total files for logging file_path: File path for logging @@ -1662,49 +2258,22 @@ async def merge_nodes_and_edges( [entity_name], namespace=namespace, enable_logging=False ): try: - logger.info(f"Inserting {entity_name} in Graph") - # Graph database operation (critical path, must succeed) + logger.debug(f"Processing entity {entity_name}") entity_data = await _merge_nodes_then_upsert( entity_name, entities, knowledge_graph_inst, + entity_vdb, global_config, pipeline_status, pipeline_status_lock, llm_response_cache, + entity_chunks_storage, ) - # Vector database operation (equally critical, must succeed) - if entity_vdb is not None and entity_data: - data_for_vdb = { - compute_mdhash_id( - str(entity_data["entity_name"]), prefix="ent-" - ): { - "entity_name": entity_data["entity_name"], - "entity_type": entity_data["entity_type"], - "content": f"{entity_data['entity_name']}\n{entity_data['description']}", - "source_id": entity_data["source_id"], - "file_path": entity_data.get( - "file_path", "unknown_source" - ), - } - } - - - logger.info(f"Inserting {entity_name} in Graph") - # Use safe operation wrapper - VDB failure must throw exception - await safe_vdb_operation_with_exception( - operation=lambda: entity_vdb.upsert(data_for_vdb), - operation_name="entity_upsert", - entity_name=entity_name, - max_retries=3, - retry_delay=0.1, - ) - return entity_data except Exception as e: - # Any database operation failure is critical error_msg = ( f"Critical error in entity processing for `{entity_name}`: {e}" ) @@ -1794,82 +2363,28 @@ async def merge_nodes_and_edges( try: added_entities = [] # Track entities added during edge processing - # Graph database operation (critical path, must succeed) + logger.debug(f"Processing relation {sorted_edge_key}") edge_data = await _merge_edges_then_upsert( edge_key[0], edge_key[1], edges, knowledge_graph_inst, + relationships_vdb, + entity_vdb, global_config, pipeline_status, pipeline_status_lock, llm_response_cache, added_entities, # Pass list to collect added entities + relation_chunks_storage, ) if edge_data is None: return None, [] - # Vector database operation (equally critical, must succeed) - if relationships_vdb is not None: - data_for_vdb = { - compute_mdhash_id( - edge_data["src_id"] + edge_data["tgt_id"], prefix="rel-" - ): { - "src_id": edge_data["src_id"], - "tgt_id": edge_data["tgt_id"], - "keywords": edge_data["keywords"], - "content": f"{edge_data['src_id']}\t{edge_data['tgt_id']}\n{edge_data['keywords']}\n{edge_data['description']}", - "source_id": edge_data["source_id"], - "file_path": edge_data.get( - "file_path", "unknown_source" - ), - "weight": edge_data.get("weight", 1.0), - } - } - - # Use safe operation wrapper - VDB failure must throw exception - await safe_vdb_operation_with_exception( - operation=lambda: relationships_vdb.upsert(data_for_vdb), - operation_name="relationship_upsert", - entity_name=f"{edge_data['src_id']}-{edge_data['tgt_id']}", - max_retries=3, - retry_delay=0.1, - ) - - # Update added_entities to entity vector database using safe operation wrapper - if added_entities and entity_vdb is not None: - for entity_data in added_entities: - entity_vdb_id = compute_mdhash_id( - entity_data["entity_name"], prefix="ent-" - ) - entity_content = f"{entity_data['entity_name']}\n{entity_data['description']}" - - vdb_data = { - entity_vdb_id: { - "content": entity_content, - "entity_name": entity_data["entity_name"], - "source_id": entity_data["source_id"], - "entity_type": entity_data["entity_type"], - "file_path": entity_data.get( - "file_path", "unknown_source" - ), - } - } - - # Use safe operation wrapper - VDB failure must throw exception - await safe_vdb_operation_with_exception( - operation=lambda data=vdb_data: entity_vdb.upsert(data), - operation_name="added_entity_upsert", - entity_name=entity_data["entity_name"], - max_retries=3, - retry_delay=0.1, - ) - return edge_data, added_entities except Exception as e: - # Any database operation failure is critical error_msg = f"Critical error in relationship processing for `{sorted_edge_key}`: {e}" logger.error(error_msg) @@ -2254,7 +2769,7 @@ async def kg_query( hashing_kv: BaseKVStorage | None = None, system_prompt: str | None = None, chunks_vdb: BaseVectorStorage = None, -) -> QueryResult: +) -> QueryResult | None: """ Execute knowledge graph query and return unified QueryResult object. @@ -2271,7 +2786,7 @@ async def kg_query( chunks_vdb: Document chunks vector database Returns: - QueryResult: Unified query result object containing: + QueryResult | None: Unified query result object containing: - content: Non-streaming response text content - response_iterator: Streaming response iterator - raw_data: Complete structured data (including references and metadata) @@ -2282,6 +2797,8 @@ async def kg_query( - only_need_prompt=True: content contains complete prompt - stream=True: response_iterator contains streaming response, raw_data contains complete data - default: content contains LLM response text, raw_data contains complete data + + Returns None when no relevant context could be constructed for the query. """ if not query: return QueryResult(content=PROMPTS["fail_response"]) @@ -2329,7 +2846,8 @@ async def kg_query( ) if context_result is None: - return QueryResult(content=PROMPTS["fail_response"]) + logger.info("[kg_query] No query context could be built; returning no-result.") + return None # Return different content based on query parameters if query_param.only_need_context and not query_param.only_need_prompt: @@ -3336,7 +3854,11 @@ async def _build_query_context( query_embedding=search_result["query_embedding"], ) - if not merged_chunks: + if ( + not merged_chunks + and not truncation_result["entities_context"] + and not truncation_result["relations_context"] + ): return None # Stage 4: Build final LLM context with dynamic token processing @@ -3992,7 +4514,7 @@ async def naive_query( global_config: dict[str, str], hashing_kv: BaseKVStorage | None = None, system_prompt: str | None = None, -) -> QueryResult: +) -> QueryResult | None: """ Execute naive query and return unified QueryResult object. @@ -4005,11 +4527,13 @@ async def naive_query( system_prompt: System prompt Returns: - QueryResult: Unified query result object containing: + QueryResult | None: Unified query result object containing: - content: Non-streaming response text content - response_iterator: Streaming response iterator - raw_data: Complete structured data (including references and metadata) - is_streaming: Whether this is a streaming result + + Returns None when no relevant chunks are retrieved. """ if not query: @@ -4030,16 +4554,10 @@ async def naive_query( chunks = await _get_vector_context(query, chunks_vdb, query_param, None) if chunks is None or len(chunks) == 0: - # Build empty raw data structure for naive mode - empty_raw_data = convert_to_user_format( - [], # naive mode has no entities - [], # naive mode has no relationships - [], # no chunks - [], # no references - "naive", + logger.info( + "[naive_query] No relevant document chunks found; returning no-result." ) - empty_raw_data["message"] = "No relevant document chunks found." - return QueryResult(content=PROMPTS["fail_response"], raw_data=empty_raw_data) + return None # Calculate dynamic token limit for chunks max_total_tokens = getattr(