From 9b6de7512d647fd854560be467f924fbdf1eb078 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 25 Aug 2025 17:10:51 +0800 Subject: [PATCH] Optimize the stability of description merging order --- lightrag/operate.py | 138 +++++++++++++++++++++++++------------------- 1 file changed, 78 insertions(+), 60 deletions(-) diff --git a/lightrag/operate.py b/lightrag/operate.py index cc06801c..74fdf197 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -713,21 +713,6 @@ async def _rebuild_single_entity( } ) - # Helper function to generate final description with optional LLM summary - async def _generate_final_description(combined_description: str) -> str: - force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] - num_fragment = combined_description.count(GRAPH_FIELD_SEP) + 1 - - if num_fragment >= force_llm_summary_on_merge: - return await _handle_entity_relation_summary( - entity_name, - combined_description, - global_config, - llm_response_cache=llm_response_cache, - ) - else: - return combined_description - # Collect all entity data from relevant chunks all_entity_data = [] for chunk_id in chunk_ids: @@ -763,7 +748,18 @@ async def _rebuild_single_entity( # Generate description from relationships or fallback to current if relationship_descriptions: combined_description = GRAPH_FIELD_SEP.join(relationship_descriptions) - final_description = await _generate_final_description(combined_description) + force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] + num_fragment = combined_description.count(GRAPH_FIELD_SEP) + 1 + + if num_fragment >= force_llm_summary_on_merge: + final_description = await _handle_entity_relation_summary( + entity_name, + combined_description, + global_config, + llm_response_cache=llm_response_cache, + ) + else: + final_description = combined_description else: final_description = current_entity.get("description", "") @@ -784,6 +780,10 @@ async def _rebuild_single_entity( if entity_data.get("file_path"): file_paths.add(entity_data["file_path"]) + # Remove duplicates while preserving order + descriptions = list(dict.fromkeys(descriptions)) + entity_types = list(dict.fromkeys(entity_types)) + # Combine all descriptions combined_description = ( GRAPH_FIELD_SEP.join(descriptions) @@ -799,7 +799,19 @@ async def _rebuild_single_entity( ) # Generate final description and update storage - final_description = await _generate_final_description(combined_description) + force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] + num_fragment = combined_description.count(GRAPH_FIELD_SEP) + 1 + + if num_fragment >= force_llm_summary_on_merge: + final_description = await _handle_entity_relation_summary( + entity_name, + combined_description, + global_config, + llm_response_cache=llm_response_cache, + ) + else: + final_description = combined_description + await _update_entity_storage(final_description, entity_type, file_paths) @@ -855,7 +867,11 @@ async def _rebuild_single_relationship( if rel_data.get("file_path"): file_paths.add(rel_data["file_path"]) - # Combine descriptions and keywords + # Remove duplicates while preserving order + descriptions = list(dict.fromkeys(descriptions)) + keywords = list(dict.fromkeys(keywords)) + + # Combine descriptions and keywords (fallback to keep currunt unchanged) combined_description = ( GRAPH_FIELD_SEP.join(descriptions) if descriptions @@ -963,22 +979,22 @@ async def _merge_nodes_then_upsert( key=lambda x: x[1], reverse=True, )[0][0] + description = GRAPH_FIELD_SEP.join( - sorted(set([dp["description"] for dp in nodes_data] + already_description)) + already_description + + list( + dict.fromkeys( + [dp["description"] for dp in nodes_data if dp.get("description")] + ) + ) ) - source_id = GRAPH_FIELD_SEP.join( - set([dp["source_id"] for dp in nodes_data] + already_source_ids) - ) - file_path = build_file_path(already_file_paths, nodes_data, entity_name) force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] - num_fragment = description.count(GRAPH_FIELD_SEP) + 1 - num_new_fragment = len(set([dp["description"] for dp in nodes_data])) - + already_fragment = already_description.count(GRAPH_FIELD_SEP) + 1 if num_fragment > 1: if num_fragment >= force_llm_summary_on_merge: - status_message = f"LLM merge N: {entity_name} | {num_new_fragment}+{num_fragment-num_new_fragment}" + status_message = f"LLM merge N: {entity_name} | {already_fragment}+{num_fragment-already_fragment}" logger.info(status_message) if pipeline_status is not None and pipeline_status_lock is not None: async with pipeline_status_lock: @@ -991,13 +1007,18 @@ async def _merge_nodes_then_upsert( llm_response_cache, ) else: - status_message = f"Merge N: {entity_name} | {num_new_fragment}+{num_fragment-num_new_fragment}" + status_message = f"Merge N: {entity_name} | {already_fragment}+{num_fragment-already_fragment}" logger.info(status_message) if pipeline_status is not None and pipeline_status_lock is not None: async with pipeline_status_lock: pipeline_status["latest_message"] = status_message pipeline_status["history_messages"].append(status_message) + source_id = GRAPH_FIELD_SEP.join( + set([dp["source_id"] for dp in nodes_data] + already_source_ids) + ) + file_path = build_file_path(already_file_paths, nodes_data, entity_name) + node_data = dict( entity_id=entity_name, entity_type=entity_type, @@ -1071,15 +1092,41 @@ async def _merge_edges_then_upsert( # Process edges_data with None checks weight = sum([dp["weight"] for dp in edges_data] + already_weights) + description = GRAPH_FIELD_SEP.join( - sorted( - set( + already_description + + list( + dict.fromkeys( [dp["description"] for dp in edges_data if dp.get("description")] - + already_description ) ) ) + force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] + num_fragment = description.count(GRAPH_FIELD_SEP) + 1 + already_fragment = already_description.count(GRAPH_FIELD_SEP) + 1 + if num_fragment > 1: + if num_fragment >= force_llm_summary_on_merge: + status_message = f"LLM merge E: {src_id} - {tgt_id} | {already_fragment}+{num_fragment-already_fragment}" + logger.info(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) + description = await _handle_entity_relation_summary( + f"({src_id}, {tgt_id})", + description, + global_config, + llm_response_cache, + ) + else: + status_message = f"Merge E: {src_id} - {tgt_id} | {already_fragment}+{num_fragment-already_fragment}" + logger.info(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) + # Split all existing and new keywords into individual terms, then combine and deduplicate all_keywords = set() # Process already_keywords (which are comma-separated) @@ -1127,35 +1174,6 @@ async def _merge_edges_then_upsert( } added_entities.append(entity_data) - force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] - - num_fragment = description.count(GRAPH_FIELD_SEP) + 1 - num_new_fragment = len( - set([dp["description"] for dp in edges_data if dp.get("description")]) - ) - - if num_fragment > 1: - if num_fragment >= force_llm_summary_on_merge: - status_message = f"LLM merge E: {src_id} - {tgt_id} | {num_new_fragment}+{num_fragment-num_new_fragment}" - logger.info(status_message) - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) - description = await _handle_entity_relation_summary( - f"({src_id}, {tgt_id})", - description, - global_config, - llm_response_cache, - ) - else: - status_message = f"Merge E: {src_id} - {tgt_id} | {num_new_fragment}+{num_fragment-num_new_fragment}" - logger.info(status_message) - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) - await knowledge_graph_inst.upsert_edge( src_id, tgt_id,