Optimize the stability of description merging order

This commit is contained in:
yangdx 2025-08-25 17:10:51 +08:00
parent 31f4f96944
commit 9b6de7512d

View file

@ -713,21 +713,6 @@ async def _rebuild_single_entity(
}
)
# Helper function to generate final description with optional LLM summary
async def _generate_final_description(combined_description: str) -> str:
force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
num_fragment = combined_description.count(GRAPH_FIELD_SEP) + 1
if num_fragment >= force_llm_summary_on_merge:
return await _handle_entity_relation_summary(
entity_name,
combined_description,
global_config,
llm_response_cache=llm_response_cache,
)
else:
return combined_description
# Collect all entity data from relevant chunks
all_entity_data = []
for chunk_id in chunk_ids:
@ -763,7 +748,18 @@ async def _rebuild_single_entity(
# Generate description from relationships or fallback to current
if relationship_descriptions:
combined_description = GRAPH_FIELD_SEP.join(relationship_descriptions)
final_description = await _generate_final_description(combined_description)
force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
num_fragment = combined_description.count(GRAPH_FIELD_SEP) + 1
if num_fragment >= force_llm_summary_on_merge:
final_description = await _handle_entity_relation_summary(
entity_name,
combined_description,
global_config,
llm_response_cache=llm_response_cache,
)
else:
final_description = combined_description
else:
final_description = current_entity.get("description", "")
@ -784,6 +780,10 @@ async def _rebuild_single_entity(
if entity_data.get("file_path"):
file_paths.add(entity_data["file_path"])
# Remove duplicates while preserving order
descriptions = list(dict.fromkeys(descriptions))
entity_types = list(dict.fromkeys(entity_types))
# Combine all descriptions
combined_description = (
GRAPH_FIELD_SEP.join(descriptions)
@ -799,7 +799,19 @@ async def _rebuild_single_entity(
)
# Generate final description and update storage
final_description = await _generate_final_description(combined_description)
force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
num_fragment = combined_description.count(GRAPH_FIELD_SEP) + 1
if num_fragment >= force_llm_summary_on_merge:
final_description = await _handle_entity_relation_summary(
entity_name,
combined_description,
global_config,
llm_response_cache=llm_response_cache,
)
else:
final_description = combined_description
await _update_entity_storage(final_description, entity_type, file_paths)
@ -855,7 +867,11 @@ async def _rebuild_single_relationship(
if rel_data.get("file_path"):
file_paths.add(rel_data["file_path"])
# Combine descriptions and keywords
# Remove duplicates while preserving order
descriptions = list(dict.fromkeys(descriptions))
keywords = list(dict.fromkeys(keywords))
# Combine descriptions and keywords (fallback to keep currunt unchanged)
combined_description = (
GRAPH_FIELD_SEP.join(descriptions)
if descriptions
@ -963,22 +979,22 @@ async def _merge_nodes_then_upsert(
key=lambda x: x[1],
reverse=True,
)[0][0]
description = GRAPH_FIELD_SEP.join(
sorted(set([dp["description"] for dp in nodes_data] + already_description))
already_description
+ list(
dict.fromkeys(
[dp["description"] for dp in nodes_data if dp.get("description")]
)
)
)
source_id = GRAPH_FIELD_SEP.join(
set([dp["source_id"] for dp in nodes_data] + already_source_ids)
)
file_path = build_file_path(already_file_paths, nodes_data, entity_name)
force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
num_fragment = description.count(GRAPH_FIELD_SEP) + 1
num_new_fragment = len(set([dp["description"] for dp in nodes_data]))
already_fragment = already_description.count(GRAPH_FIELD_SEP) + 1
if num_fragment > 1:
if num_fragment >= force_llm_summary_on_merge:
status_message = f"LLM merge N: {entity_name} | {num_new_fragment}+{num_fragment-num_new_fragment}"
status_message = f"LLM merge N: {entity_name} | {already_fragment}+{num_fragment-already_fragment}"
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
@ -991,13 +1007,18 @@ async def _merge_nodes_then_upsert(
llm_response_cache,
)
else:
status_message = f"Merge N: {entity_name} | {num_new_fragment}+{num_fragment-num_new_fragment}"
status_message = f"Merge N: {entity_name} | {already_fragment}+{num_fragment-already_fragment}"
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
source_id = GRAPH_FIELD_SEP.join(
set([dp["source_id"] for dp in nodes_data] + already_source_ids)
)
file_path = build_file_path(already_file_paths, nodes_data, entity_name)
node_data = dict(
entity_id=entity_name,
entity_type=entity_type,
@ -1071,15 +1092,41 @@ async def _merge_edges_then_upsert(
# Process edges_data with None checks
weight = sum([dp["weight"] for dp in edges_data] + already_weights)
description = GRAPH_FIELD_SEP.join(
sorted(
set(
already_description
+ list(
dict.fromkeys(
[dp["description"] for dp in edges_data if dp.get("description")]
+ already_description
)
)
)
force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
num_fragment = description.count(GRAPH_FIELD_SEP) + 1
already_fragment = already_description.count(GRAPH_FIELD_SEP) + 1
if num_fragment > 1:
if num_fragment >= force_llm_summary_on_merge:
status_message = f"LLM merge E: {src_id} - {tgt_id} | {already_fragment}+{num_fragment-already_fragment}"
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
description = await _handle_entity_relation_summary(
f"({src_id}, {tgt_id})",
description,
global_config,
llm_response_cache,
)
else:
status_message = f"Merge E: {src_id} - {tgt_id} | {already_fragment}+{num_fragment-already_fragment}"
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
# Split all existing and new keywords into individual terms, then combine and deduplicate
all_keywords = set()
# Process already_keywords (which are comma-separated)
@ -1127,35 +1174,6 @@ async def _merge_edges_then_upsert(
}
added_entities.append(entity_data)
force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
num_fragment = description.count(GRAPH_FIELD_SEP) + 1
num_new_fragment = len(
set([dp["description"] for dp in edges_data if dp.get("description")])
)
if num_fragment > 1:
if num_fragment >= force_llm_summary_on_merge:
status_message = f"LLM merge E: {src_id} - {tgt_id} | {num_new_fragment}+{num_fragment-num_new_fragment}"
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
description = await _handle_entity_relation_summary(
f"({src_id}, {tgt_id})",
description,
global_config,
llm_response_cache,
)
else:
status_message = f"Merge E: {src_id} - {tgt_id} | {num_new_fragment}+{num_fragment-num_new_fragment}"
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
await knowledge_graph_inst.upsert_edge(
src_id,
tgt_id,